2023年8月1日发(作者:)

RxJS应⽤场景1——简化轮询逻辑(将琐碎的异步逻辑抽象为⼀个输⼊输出流)RxJS 应⽤场景1——简化轮询逻辑从实际应⽤场景中抽出⼀段逻辑,作为说明。逻辑如下:调⽤⽣成接⼝,拿到key,5秒后调⽤进度条接⼝轮询进度(数据太多),UI渲染进度,当进100时停⽌轮询。所以要抽象出⼀个⽅法demoX,负责调⽤⽣成接⼝,并轮询进度条接⼝,最后返回接⼝响应信息(进度或,接⼝报错信息)。整理⼀下这个⽅法需要实现的功能:1. 收集第⼀个接⼝和第⼆个接⼝的报错信息并返回2. 收集这个⽅法中出现的错误信息并返回3. 调⽤第⼀个接⼝拿到key 延迟5秒后轮询进度接⼝1秒1次4. 假如轮询时接⼝没有返回,⽽1秒⼜有新的轮询产⽣,丢掉之前的轮询5. 当轮询结果返回,⽅法就返回6. ⽅法中出现报错停⽌轮询,当进度为100时停⽌轮询7. ⽅法返回的是⼀个可观察对象,有新值时会通知订阅者很显然,原⽣Promise 不够强⼤,这个⽅法要返回⼀个可观察者对象,它可以发出N个值,⾄到取消订阅。demoX这个⽅法收集了所有报错信息及轮询结果给订阅者,由订阅者来决定后续操作,这些不是我们关⼼的。step 1封装⼀个_http ⽅法⽤来发起http请求: import { of, race } from "rxjs"; import { delay } from "rxjs/operators";

class _http { static post(url: string, data: any) { return race( of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)), (baseURL + url, data, { "Content-Type": "application/json;charset=UTF-8", token:xxxxxxxxx }) ); } }这是⼀个简单的封装,封装了请求头,及超时处理,还可以做统⼀的错误处理,及完成回调等。如下所⽰:import { ajax } from "rxjs/ajax"; import { of, race } from "rxjs"; import { delay, catchError, finalize } from "rxjs/operators";

class _http { static post(url: string, data: any) { return race( of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)), (baseURL + url, data, { "Content-Type": "application/json;charset=UTF-8", token }).pipe( catchError(({message}:any)=> of({code:'err',msg:message})), finalize(()=>('complete')) ) ); } }只⽤简单的封装就可以, demoX(this: any, param: any) { return _http("/api/create", { // step 2 _http 请求create 接⼝,拿到完整响应, residentialIdList: param }).pipe(

map(({ response: rs }: any) => {// step 3 通过map 操作符处理⼀下响应数据, const { code, msg } = rs; if (code !== 0) { throw Error(msg); // 对⾮ 0 code 码处理,抛出错误信息,最后由catchError 操作符进⾏集中处理 } return ; }), delay(5000), // 延迟5秒后调⽤进度接⼝ mergeMap(key => { // step 4 将接⼝数据(拿到的key)合并到进度请求逻辑中

return interval(1000).pipe( // step5 每1秒产⽣⼀个值, switchMap(() => // step 6 抛掉interval发出的值,将流切到内部的请求 _http("/api/progress", {// step 7 请求进度接⼝ key: key, type: 2 }).pipe( map(({ response: rs }: any) => { // step 8 处理进度接⼝响应 const { code, msg } = rs; if (code !== 0) { // 对⾮ 0 code值 抛错,由后续catchError 操作符集中处理, throw Error(msg); } return rs; }) ) ) ); }), catchError(({ message }: any) => { //step 9 集中处理错误信息,返回⼀个值,包含各种错误信息,可能是接⼝报错,或是语法报错,或是逻辑报错 return of({ code: "err", msg: message }); }), takeWhile( // step 10 只有要有错误抛出或当进度为100时,要停⽌轮询进度接⼝,并取消相关订阅 ({ data, code }: any) => code !== 0 || data < 100 || data > t ), distinct((o: any) => ) // step 11 假如当前值和上次发出的值相同,则不发出新的值,订阅者不会收到 ); }现在demoX 封装完闭,下⾯来调⽤它:(param).subscribe((rs: any) => { const { code, msg } = rs; if (code !== 0) { alert(msg); return; } t = ; })这个例⼦可以看出来,RxJS 可以将琐碎的异步逻辑抽像为⼀个输⼊输出流,可以通过各种操作符对流进⾏加⼯,最后输出,简洁、⾼效。需要注意的是⼀个输⼊输出流内部理论上可以包含⽆数的输⼊输出流,什么时候把流合到内部,或把内部流切到外部这个根据实际需要调⽤相应的操作符来操作,具体参见RxJS 官⽅⽂档。

2023年8月1日发(作者:)

RxJS应⽤场景1——简化轮询逻辑(将琐碎的异步逻辑抽象为⼀个输⼊输出流)RxJS 应⽤场景1——简化轮询逻辑从实际应⽤场景中抽出⼀段逻辑,作为说明。逻辑如下:调⽤⽣成接⼝,拿到key,5秒后调⽤进度条接⼝轮询进度(数据太多),UI渲染进度,当进100时停⽌轮询。所以要抽象出⼀个⽅法demoX,负责调⽤⽣成接⼝,并轮询进度条接⼝,最后返回接⼝响应信息(进度或,接⼝报错信息)。整理⼀下这个⽅法需要实现的功能:1. 收集第⼀个接⼝和第⼆个接⼝的报错信息并返回2. 收集这个⽅法中出现的错误信息并返回3. 调⽤第⼀个接⼝拿到key 延迟5秒后轮询进度接⼝1秒1次4. 假如轮询时接⼝没有返回,⽽1秒⼜有新的轮询产⽣,丢掉之前的轮询5. 当轮询结果返回,⽅法就返回6. ⽅法中出现报错停⽌轮询,当进度为100时停⽌轮询7. ⽅法返回的是⼀个可观察对象,有新值时会通知订阅者很显然,原⽣Promise 不够强⼤,这个⽅法要返回⼀个可观察者对象,它可以发出N个值,⾄到取消订阅。demoX这个⽅法收集了所有报错信息及轮询结果给订阅者,由订阅者来决定后续操作,这些不是我们关⼼的。step 1封装⼀个_http ⽅法⽤来发起http请求: import { of, race } from "rxjs"; import { delay } from "rxjs/operators";

class _http { static post(url: string, data: any) { return race( of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)), (baseURL + url, data, { "Content-Type": "application/json;charset=UTF-8", token:xxxxxxxxx }) ); } }这是⼀个简单的封装,封装了请求头,及超时处理,还可以做统⼀的错误处理,及完成回调等。如下所⽰:import { ajax } from "rxjs/ajax"; import { of, race } from "rxjs"; import { delay, catchError, finalize } from "rxjs/operators";

class _http { static post(url: string, data: any) { return race( of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)), (baseURL + url, data, { "Content-Type": "application/json;charset=UTF-8", token }).pipe( catchError(({message}:any)=> of({code:'err',msg:message})), finalize(()=>('complete')) ) ); } }只⽤简单的封装就可以, demoX(this: any, param: any) { return _http("/api/create", { // step 2 _http 请求create 接⼝,拿到完整响应, residentialIdList: param }).pipe(

map(({ response: rs }: any) => {// step 3 通过map 操作符处理⼀下响应数据, const { code, msg } = rs; if (code !== 0) { throw Error(msg); // 对⾮ 0 code 码处理,抛出错误信息,最后由catchError 操作符进⾏集中处理 } return ; }), delay(5000), // 延迟5秒后调⽤进度接⼝ mergeMap(key => { // step 4 将接⼝数据(拿到的key)合并到进度请求逻辑中

return interval(1000).pipe( // step5 每1秒产⽣⼀个值, switchMap(() => // step 6 抛掉interval发出的值,将流切到内部的请求 _http("/api/progress", {// step 7 请求进度接⼝ key: key, type: 2 }).pipe( map(({ response: rs }: any) => { // step 8 处理进度接⼝响应 const { code, msg } = rs; if (code !== 0) { // 对⾮ 0 code值 抛错,由后续catchError 操作符集中处理, throw Error(msg); } return rs; }) ) ) ); }), catchError(({ message }: any) => { //step 9 集中处理错误信息,返回⼀个值,包含各种错误信息,可能是接⼝报错,或是语法报错,或是逻辑报错 return of({ code: "err", msg: message }); }), takeWhile( // step 10 只有要有错误抛出或当进度为100时,要停⽌轮询进度接⼝,并取消相关订阅 ({ data, code }: any) => code !== 0 || data < 100 || data > t ), distinct((o: any) => ) // step 11 假如当前值和上次发出的值相同,则不发出新的值,订阅者不会收到 ); }现在demoX 封装完闭,下⾯来调⽤它:(param).subscribe((rs: any) => { const { code, msg } = rs; if (code !== 0) { alert(msg); return; } t = ; })这个例⼦可以看出来,RxJS 可以将琐碎的异步逻辑抽像为⼀个输⼊输出流,可以通过各种操作符对流进⾏加⼯,最后输出,简洁、⾼效。需要注意的是⼀个输⼊输出流内部理论上可以包含⽆数的输⼊输出流,什么时候把流合到内部,或把内部流切到外部这个根据实际需要调⽤相应的操作符来操作,具体参见RxJS 官⽅⽂档。