Rx是一种基于观察者模式的编程范式,推崇的是纯函数式的编程,目的是降低异步编程的复杂度。现在有多个语言版本,这里主要介绍前端的rxjs,它内置在angular,但真正用它的其实不多。
这里比较五层的逻辑:数据1变化执行操作2,然后操作2会引发操作3,.... 1>2>3>4>5,其中操作3获得的数据变化时,需要重新执行4+5,操作2变化需要重新执行3+4+5,也就是会影响后续操作,而不影响前面的操作,*所有的操作都是异步: indexdb数据读取,ajax请求,延时等待
1. 上古时期Callback hell.
这个就不详细说明了,众所周知,很久以前,大家都是讲回调函数作为参数传入另一个函数,成功或者失败时进行调用,最后的结果类似于
func1(1,(err,data)=>{
    func2(data,(err,data)=>{
        func3(data,(err,data)=>{
            func4(data,(err,data)=>{
                func5(data,(err)=>{
                    console.log(err)
                })
            })
        })
    })
})
2. Promise/ async await
async doSometing(data){
    let res = await func1(data);
    res = await func2(res)
    res = await func3(res)
    res = await func4(res)
    res = await func5(res)
    return res
} 
doSomething(1).then(console.log)
callback模式会导致缩进随着层级的增加不断增加,当然可以把回调函数也在外面,但这样看代码理解流程的时候需要跳来跳去很糟糕,维护也就不那么简单了。
Promise+async/await,看上去可以把各个方法操作拆开,业务流程很清晰的写在一起
但是上面两种都有个问题,如果回调永远没有响应,或者一个promise永远没有resolve,你也没办法取消。
或许有人会说,给每个方法加个超时呗。
但其实本质问题还是无法取消,对于某些复杂场景,用户利用系统返回候,一个个方法还在后面跑,就很可能带来Bug,【如果是多页应用还好,跳转就会刷新】
3. Rxjs的Observale方案
其实Observable在webapi中是存在的,例如MutationObservable,它是将持续的变化得数据源封装成可观察对象,然后在订阅发生时推送数据-
在此之前的解决办法一般是:1. 自制loop,不停地循环比较;2. 修改拦截,例如vue,设置值时调用对应的更新方法;3. 事件发布,每次更新发布一个事件,由监听者来处理;
观察者模式和监听者模式其实优点相似,不同的是,观察者是观察的某个数据源的数据,监听者往往监听的是事件,数据是一个payload携带而来。事件需要双方统一,然后可以零散分布,观察者往往需要导入数据源,同时二者可以转化。因为观察者模式是将数据为主,所以可以进行一系列的转化操作。
推荐使用场景:
1. 系统内部或者功能内部使用观察者,这样可以耦合度更高,可以使用各种转化pipe,内部新增功能只需要新建数据流或者重新组合数据流即可,各个小模块还对上层数据源进行转化,在各个节点都可以持续优化或者改动,只要保证输出一直即可;
2. 系统之间,使用消息订阅模式,这样可以更好的解耦,而且系统之间的联系不会太复杂,这样需要检查的事件名也就不多了(事件重命名,事件新增,事件删除,做影响分析也会更少)
下面就重写下上面的连续方法吧,其实有点像回调;
方式一:
function doSequence(initData){
  const ob1 =  do1$(initData).subscribe(res=>{
      const ob2 =   do2$(res).subscribe(res=>{
           const ob3 =  do3$(res).subscribe(res=>{
               const ob4 =  do4$(res).subscribe(res=>{
                   const ob5 =  do5$(res).subscribe(res=>{
                    
                    })
                  ob4.add(ob5)
               })
              ob3.add(ob4)
            })
          ob2.add(ob3)
        })
       ob1.add(ob2)
    })
      ....
      ob1.unsubscribe()
}
可能你觉得这有点像回调地狱,但其实它可以通过定义外部的Subject来进行转化,需要注意,do1$这种事一个函数,它的返回值是一个可观察对象;
方式二
const sub1 = new Subject()
function do1(data){
    ajax(data).subscribe(v=>{
        sub1.next(v)
    })
}
...do2 do3 do4 do5
function doSomething(initDate){
    do1(initData)
   const s1 = sub1.subscribe(e=>{
        do2(e)
    })
    const s2 =   sub2.subscribe(e=>{
        do3(e)
    })
   const s3 =  sub3.subscribe(e=>{
        do4(e)
    })
   const s4 =  sub4.subscribe(e=>{
        do5(e)
    })
    return ()=>[s1,s2,s3,s4].forEach(el=>el.unsubscribe())
}
可能你还觉得者与async、await比没什么好的,但比较 比较一下,就会发现,不管是方式一(add)还是方式二 (return)都多了点东西
它的存在就是为了取消观察,取消观察有什么作用呢?让我们看看Observable构造一个每秒推个数据的可观察对象(数据源):
new Observale(suber=>{
    let i=0;
    const timer = setInterval(()=>suber.next(i++),1000)
    return ()=>clearInterval(timer)
})
构造函数接收一个数据源生产函数,返回一个销毁函数,这两个函数在被订阅和被取消订阅时调用(共享的数据源会在第一次被观察和取消到没有观察者时分别调用);
所以,可观察对象都有一个取消操作的,只要将它作为可观察对象的初始化函数的返回值就可以了。
看上面的方式一,订阅者可以将另一个订阅加入到自己的流程中,这样自己被摧毁时,也能够摧毁子流程
对于方式二,则是简单的全部取消,哪怕s3、s4的订阅者没有拿到过数据。
如果是Promise来实现取消,可能得这样:
let cancel = ()=>{}
async doSometing(data){
    let cancelled = false;
    cancel = ()=>cancelled = true;
    let res = await func1(data);
    if(cancelled) return
    res = await func2(res)
    if(cancelled) return
    res = await func3(res)
    if(cancelled) return
    res = await func4(res)
    if(cancelled) return
    res = await func5(res)
    return res
} 
doSomething(1).then(console.log)
 cancel()
其实这样看起来和方式二差不多了,此外,每个方法还是没法取消。
4. Observable的优缺点:
a. 大量的subscribe, 无意中会形成若干的闭包,不小心处理和取消,会造成内存泄漏
b. 转化思维的代价不小,传统的方式是直线型的,一步步来,但Observable是纯异步思维,所以简单的场景使用,是真的画蛇添足没啥用
c. 但是如果场景足够复杂,且数据变动的先后顺序会有不同结果等待,以及更好的体验,肯定是有很好的作用的,下面举一个小栗子:
5. 高清流的发布:
场景:1,默认不发布高清流,仅发布低码流,但是有人订阅时,需要发布高清流,发布后订阅者进行流切换,
// 发布端;
// 预定义数据源,一个融合了本地设备-用户喜好数据源的当前设备数据源
// 它会在用户插拔设备,或者修改偏好设备时推送新的设备数据,包括了校验和授权申请
//  更改时需要重新发布流
import { devicePref$ } from "xx"
//  用户的其它配置,例如尺寸或者清晰度,不需要重新发布,但需要更新配置
import { resolutionPref$ } from "xx"

// 一个封装好的数据发布客户端,
import {Client} from "xx"

export function publishHighResolution$(token,server){
    return new Observabel(suber=>{
        const client = Client.init(server,token)
        let published = false;
        devicePref$.subscribe(deviceId=>{
           if(!published)
             client.publishWithDeviceId(deviceId)
           else 
             client.replaceStreamWithDeviceId(deviceId)
        })
                const sub = resolutionPref$.subscribe(pref=>{
                       client.changeConfig(deviceId)
                })
                ...
                suber.add(sub)
        // 推送状态数据
        client.state$.subscribe(state=>suber.next(state))
        // 返回清理方法
        return client.destroy.bind(client)
    })
}  // 业务端,或者是进入房间的初始化方法中:

function watchHighResPublishRequest(){
    let publisher =null
    generateRequestSource$().subscribe(request=>{
        if(request){
            // 有人订阅且没发布则进行发布
            if(!publisher)  {
                publisher =  publishHighResolution$(request.token,request.server).subscribe(state=>{
                    reportPublishState(state)
                })              }
       }else if(publisher){
           // 没人订阅则取消发布
           publisher.unsubscribe()
       }
    })
}
有兴趣的化,可以尝试用纯回调或者消息发布订阅的方式再写一下逻辑,有空下一篇将讲讲如何将可观察对象与流结合起来,形成一种树结构,实现较为简单的回收和订阅管理