Rx是一种基于观察者模式的编程范式,推崇的是纯函数式的编程,目的是降低异步编程的复杂度。现在有多个语言版本,这里主要介绍前端的rxjs,它内置在angular,但真正用它的其实不多。
这里比较五层的逻辑:数据1变化执行操作2,然后操作2会引发操作3,.... 1>2>3>4>5,其中操作3获得的数据变化时,需要重新执行4+5,操作2变化需要重新执行3+4+5,也就是会影响后续操作,而不影响前面的操作,*所有的操作都是异步: indexdb数据读取,ajax请求,延时等待
1. 上古时期Callback hell.
这个就不详细说明了,众所周知,很久以前,大家都是讲回调函数作为参数传入另一个函数,成功或者失败时进行调用,最后的结果类似于
func1(1,(err,data)=>{ func2(data,(err,data)=>{ func3(data,(err,data)=>{ func4(data,(err,data)=>{ func5(data,(err)=>{ console.log(err) }) }) }) }) })2. Promise/ async await
async doSometing(data){ let res = await func1(data); res = await func2(res) res = await func3(res) res = await func4(res) res = await func5(res) return res } doSomething(1).then(console.log)callback模式会导致缩进随着层级的增加不断增加,当然可以把回调函数也在外面,但这样看代码理解流程的时候需要跳来跳去很糟糕,维护也就不那么简单了。
Promise+async/await,看上去可以把各个方法操作拆开,业务流程很清晰的写在一起
但是上面两种都有个问题,如果回调永远没有响应,或者一个promise永远没有resolve,你也没办法取消。
或许有人会说,给每个方法加个超时呗。
但其实本质问题还是无法取消,对于某些复杂场景,用户利用系统返回候,一个个方法还在后面跑,就很可能带来Bug,【如果是多页应用还好,跳转就会刷新】
3. Rxjs的Observale方案
其实Observable在webapi中是存在的,例如MutationObservable,它是将持续的变化得数据源封装成可观察对象,然后在订阅发生时推送数据-
在此之前的解决办法一般是:1. 自制loop,不停地循环比较;2. 修改拦截,例如vue,设置值时调用对应的更新方法;3. 事件发布,每次更新发布一个事件,由监听者来处理;
观察者模式和监听者模式其实优点相似,不同的是,观察者是观察的某个数据源的数据,监听者往往监听的是事件,数据是一个payload携带而来。事件需要双方统一,然后可以零散分布,观察者往往需要导入数据源,同时二者可以转化。因为观察者模式是将数据为主,所以可以进行一系列的转化操作。
推荐使用场景:
1. 系统内部或者功能内部使用观察者,这样可以耦合度更高,可以使用各种转化pipe,内部新增功能只需要新建数据流或者重新组合数据流即可,各个小模块还对上层数据源进行转化,在各个节点都可以持续优化或者改动,只要保证输出一直即可;
2. 系统之间,使用消息订阅模式,这样可以更好的解耦,而且系统之间的联系不会太复杂,这样需要检查的事件名也就不多了(事件重命名,事件新增,事件删除,做影响分析也会更少)
下面就重写下上面的连续方法吧,其实有点像回调;
方式一:
function doSequence(initData){ const ob1 = do1$(initData).subscribe(res=>{ const ob2 = do2$(res).subscribe(res=>{ const ob3 = do3$(res).subscribe(res=>{ const ob4 = do4$(res).subscribe(res=>{ const ob5 = do5$(res).subscribe(res=>{ }) ob4.add(ob5) }) ob3.add(ob4) }) ob2.add(ob3) }) ob1.add(ob2) }) .... ob1.unsubscribe() }可能你觉得这有点像回调地狱,但其实它可以通过定义外部的Subject来进行转化,需要注意,do1$这种事一个函数,它的返回值是一个可观察对象;
方式二
const sub1 = new Subject() function do1(data){ ajax(data).subscribe(v=>{ sub1.next(v) }) } ...do2 do3 do4 do5 function doSomething(initDate){ do1(initData) const s1 = sub1.subscribe(e=>{ do2(e) }) const s2 = sub2.subscribe(e=>{ do3(e) }) const s3 = sub3.subscribe(e=>{ do4(e) }) const s4 = sub4.subscribe(e=>{ do5(e) }) return ()=>[s1,s2,s3,s4].forEach(el=>el.unsubscribe()) }可能你还觉得者与async、await比没什么好的,但比较 比较一下,就会发现,不管是方式一(add)还是方式二 (return)都多了点东西
它的存在就是为了取消观察,取消观察有什么作用呢?让我们看看Observable构造一个每秒推个数据的可观察对象(数据源):
new Observale(suber=>{
let i=0;
const timer = setInterval(()=>suber.next(i++),1000)
return ()=>clearInterval(timer)
})
构造函数接收一个数据源生产函数,返回一个销毁函数,这两个函数在被订阅和被取消订阅时调用(共享的数据源会在第一次被观察和取消到没有观察者时分别调用);
所以,可观察对象都有一个取消操作的,只要将它作为可观察对象的初始化函数的返回值就可以了。
看上面的方式一,订阅者可以将另一个订阅加入到自己的流程中,这样自己被摧毁时,也能够摧毁子流程
对于方式二,则是简单的全部取消,哪怕s3、s4的订阅者没有拿到过数据。
如果是Promise来实现取消,可能得这样:
let cancel = ()=>{} async doSometing(data){ let cancelled = false; cancel = ()=>cancelled = true; let res = await func1(data); if(cancelled) return res = await func2(res) if(cancelled) return res = await func3(res) if(cancelled) return res = await func4(res) if(cancelled) return res = await func5(res) return res } doSomething(1).then(console.log) cancel()其实这样看起来和方式二差不多了,此外,每个方法还是没法取消。
4. Observable的优缺点:
a. 大量的subscribe, 无意中会形成若干的闭包,不小心处理和取消,会造成内存泄漏
b. 转化思维的代价不小,传统的方式是直线型的,一步步来,但Observable是纯异步思维,所以简单的场景使用,是真的画蛇添足没啥用
c. 但是如果场景足够复杂,且数据变动的先后顺序会有不同结果等待,以及更好的体验,肯定是有很好的作用的,下面举一个小栗子:
5. 高清流的发布:
场景:1,默认不发布高清流,仅发布低码流,但是有人订阅时,需要发布高清流,发布后订阅者进行流切换,
// 发布端; // 预定义数据源,一个融合了本地设备-用户喜好数据源的当前设备数据源 // 它会在用户插拔设备,或者修改偏好设备时推送新的设备数据,包括了校验和授权申请 // 更改时需要重新发布流 import { devicePref$ } from "xx" // 用户的其它配置,例如尺寸或者清晰度,不需要重新发布,但需要更新配置 import { resolutionPref$ } from "xx" // 一个封装好的数据发布客户端, import {Client} from "xx" export function publishHighResolution$(token,server){ return new Observabel(suber=>{ const client = Client.init(server,token) let published = false; devicePref$.subscribe(deviceId=>{ if(!published) client.publishWithDeviceId(deviceId) else client.replaceStreamWithDeviceId(deviceId) }) const sub = resolutionPref$.subscribe(pref=>{ client.changeConfig(deviceId) }) ... suber.add(sub) // 推送状态数据 client.state$.subscribe(state=>suber.next(state)) // 返回清理方法 return client.destroy.bind(client) }) } // 业务端,或者是进入房间的初始化方法中: function watchHighResPublishRequest(){ let publisher =null generateRequestSource$().subscribe(request=>{ if(request){ // 有人订阅且没发布则进行发布 if(!publisher) { publisher = publishHighResolution$(request.token,request.server).subscribe(state=>{ reportPublishState(state) }) } }else if(publisher){ // 没人订阅则取消发布 publisher.unsubscribe() } }) }有兴趣的化,可以尝试用纯回调或者消息发布订阅的方式再写一下逻辑,有空下一篇将讲讲如何将可观察对象与流结合起来,形成一种树结构,实现较为简单的回收和订阅管理