Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rxjs] Subject、BehaviorSubject、ReplaySubject、AsyncSubject #23

Open
varHarrie opened this issue Jul 9, 2018 · 0 comments
Open

Comments

@varHarrie
Copy link
Owner

varHarrie commented Jul 9, 2018

Subject、BehaviorSubject、ReplaySubject、AsyncSubject

Subject

SubjectObservable(可观察对象),每一个Subject都有可以被subscribe(订阅),但不会创建新的执行环境,只会把新的Observer注册到内部维护着的Observer清单

SubjectObserver(观察者),是一个由next()error()complete()方法组成的对象,可以subscribe(订阅)一个Observable,并从其中接受到推送的值。

每次Subject接收到值时,都遍历Observer清单,并推送该值。

  • Subject可以被订阅:

    const subject = new Rx.Subject()
    
    subject.subscribe((value) => console.log('A value: ', value))
    subject.subscribe((value) => console.log('B value: ', value))
    
    subject.next(1)
    // A value: 1
    // B value: 1
    subject.next(2)
    // A value: 2
    // B value: 2
  • Subject可以订阅Observable,接受并转发值:

    const observable = Rx.Observable.from([1, 2])
    
    const subject = new Rx.Subject()
    
    subject.subscribe((value) => console.log('A value: ', value))
    subject.subscribe((value) => console.log('B value: ', value))
    
    observable.subscribe(subject)
    // A value: 1
    // B value: 1
    // A value: 2
    // B value: 2

BehaviorSubject

BehaviorSubjectSubject的子类,拥有初始值,并且总是保存着一个最新值,一旦被订阅立即向订阅者发送最新值

const subject = new Rx.BehaviorSubject(0) // 初始值为0

subject.subscribe((value) => console.log('A value: ', value))

subject.next(1)
// A value: 0
// A value: 1
subject.next(2)
// A value: 2
// B value: 2

subject.subscribe((value) => console.log('B value: ', value))

subject.next(3)
// A value: 3
// B value: 3

ReplaySubject

ReplaySubject也是Subject的子类,被订阅时立即向订阅者推送最新指定数量的值

const subject = new Rx.ReplaySubject(2) // 回放2个

subject.subscribe((value) => console.log('A value: ', value))

subject.next(1)
// A value: 1
subject.next(2)
// A value: 2
subject.next(3)
// A value: 3

subject.subscribe((value) => console.log('B value: ', value))
// B value: 2
// B value: 3

subject.next(4)
// A value: 4
// B value: 4

AsyncSubject

AsyncSubject也是Subject的子类,仅会在complete()之后,向订阅者推送最后一个值

const subject = new Rx.AsyncSubject()

subject.subscribe((value) => console.log('A value: ', value))

subject.next(1)
subject.next(2)

subject.subscribe((value) => console.log('B value: ', value))

subject.next(3)
subject.complete()
// A value: 3
// B value: 3

multicast、refCount、publish、share

在开头我们提到Subject就是一个Obserable,但新的订阅者会共用同一个执行环境:

const source = Rx.Observable.interval(1000)
								.take(3)
const subject = new Rx.Subject()

subject.subscribe((value) => console.log('A value: ', value))

source.subscribe(subject)

setTimeot(() => {
  subject.subscribe((value) => console.log('B value: ', value))
}, 1000)

// A value: 0
// A value: 1
// B value: 1
// A value: 2
// B value: 2

B在1秒钟之后订阅,所以不会接受到第一个值0

  • multicast

    仔细看上面的例子,虽然定义的变量不多,但是这段代码的订阅关系还是略显复杂,这时候我们可以用multicast操作符来简化:

    const source = Rx.Observable.interval(1000)
                    .take(3)
                    .multicast(new Rx.Subject())
    
    source.subscribe((value) => console.log('A value: ', value))
    source.connect()
    
    setTimeout(() => {
      source.subscribe((value) => console.log('B value: ', value))
    }, 1000)

    这样一来,所有的订阅者都可以直接订阅source,其中的multicast用于挂载一个Subject,并返回一个ConnectableObservable ,拥有connect()方法。注意的是,必须等到connect()调用之后,Subject才真正订阅source并开始推送。

    connect()方法会返回一个subscription,可调用其unsubscribe进行退订。

  • refCount

    refCount必须搭配multicast使用,用于创建一个只要有第一个订阅就会自动connect()Observable,并且当订阅数变为0后会自动终止推送。

    const source = Rx.Observable.interval(1000) // 没有加take(3)
    								.multicast(new Rx.Subject())
    								.refCount()
    
    let subscriptionA
    let subscriptionB
    
    subscriptionA = source.subscribe((value) => console.log('A value: ', value))
    // 订阅数 0 => 1,自动connect,开始推送
    
    setTimeout(() => {
      subscriptionB = source.subscribe((value) => console.log('B value: ', value))
      // 订阅数 1 => 2
    }, 1000)
    
    setTimeout(() => {
      subscriptionA.unsubscribe()
      // 订阅数 2 => 1
      subscriptionB.unsubscribe()
      // 订阅数 1 => 0,终止推送
    }, 5000)
  • publish

    publishmulticast的简化写法:

    // Subject => publish
    const source = Rx.Observable.interval(1000).multicast(new Rx.Subject()).refCount()
    const source = Rx.Observable.interval(1000).publish().refCount()
    
    // BehaviorSubject => publishBehavior
    const source = Rx.Observable.interval(1000).multicast(new Rx.BehaviorSubject(0)).refCount()
    const source = Rx.Observable.interval(1000).publishBehavior(0).refCount()
    
    // ReplaySubject => publishReplay
    const source = Rx.Observable.interval(1000).multicast(new Rx.ReplaySubject(3)).refCount()
    const source = Rx.Observable.interval(1000).publishReplay(3).refCount()
    
    // AsyncSubject => publishLast
    const source = Rx.Observable.interval(1000).multicast(new Rx.AsyncSubject()).refCount()
    const source = Rx.Observable.interval(1000).publishLast(3).refCount()
  • share

    sharepublish+refCount的简化写法:

    // publish + refCount => share
    const source = Rx.Observable.interval(1000).publish().refCount()
    const source = Rx.Observable.interval(1000).share()
    
    // publishReplay + refCount => shareReplay
    const source = Rx.Observable.interval(1000).publishReplay(3).refCount()
    const source = Rx.Observable.interval(1000).shareReplay(3)
    
    // 没有shareBehavior和shareAsync、shareLast

参考资料

30 天精通 RxJS

@varHarrie varHarrie changed the title Subject、BehaviorSubject、ReplaySubject、AsyncSubject [Rxjs] Subject、BehaviorSubject、ReplaySubject、AsyncSubject Jul 9, 2018
@varHarrie varHarrie added this to the Posts milestone Aug 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant