前言 项目中往往有多个信号
在执行任务,而信号之间通常要按一定的方式或顺序进行组合
。比如:
先要用户登录成功才能加载通讯录;
请求头像和用户信息都完成后才刷新详情页。
RAC 针对诸如这些场景,设计了一套实用的API供我们组合信号:
concat 1 2 /// Subscribes to `signal ` when the source signal completes. - (RACSignal *)concat:(RACSignal *)signal RAC_WARN_UNUSED_RESULT;
作用: 拼接信号流,将信号1
与信号2
拼接成新的信号,信号1
状态为Completed
时才开始执行信号2
中的任务。无论信号1
和信号2
谁先发送数据,新信号的订阅者最终都只会按信号1
先信号2
后的顺序收到回调。
#示例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 - (void)concat { // concat: concat左边的在前,右边的在后,二者发送数据后按照前后顺序分别触发一次新信号订阅者回调 RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal1 block" ); [subscriber sendNext:@"1.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"1.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal2 block" ); [subscriber sendNext:@"2.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"2.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal3 = [signal1 concat:signal2]; [signal3 subscribeNext:^(id x) { NSLog (@"+++++received data:%@" ,x); }]; }
日志:
1 2 3 4 5 6 +++call signal1 block +++++received data :1.1 +++++received data :1.2 +++call signal2 block +++++received data :2.1 +++++received data :2.2
说明:
1.信号1
和信号2
发送几次数据,信号3
订阅者就收到几次数据;
2.信号3
订阅者收到数据的顺序是确定的,按照拼接时的顺序来:信号1
先,信号2
后;
3.concat
类似于NSOperation
的依赖关系,信号2
依赖信号1
,所以信号2
要知道信号1
的状态;
4.信号1
在发送完数据之后记得sendCompleted
,否则信号2
的^didSubscribe
不会触发,也就不会执行任务;
#实现原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 - (RACSignal * )concat :(RACSignal * )signal { return [[RACSignal createSignal :^ (id <RACSubscriber > subscriber ) { RACCompoundDisposable * compoundDisposable = [[RACCompoundDisposable alloc ] init]; RACDisposable * sourceDisposable = [self subscribeNext :^ (id x ) { [subscriber sendNext :x ]; } error :^ (NSError *error ) { [subscriber sendError :error ]; } completed :^ { RACDisposable * concattedDisposable = [signal subscribe :subscriber ]; [compoundDisposable addDisposable :concattedDisposable ]; }]; [compoundDisposable addDisposable :sourceDisposable ]; return compoundDisposable ; }] setNameWithFormat :@"[%@] -concat: %@" , self .name , signal ]; }- (RACDisposable * )subscribe :(id < RACSubscriber > )subscriber { NSCParameterAssert (subscriber != nil ); ... if (self .didSubscribe != NULL ) { RACDisposable * schedulingDisposable = [RACScheduler .subscriptionScheduler schedule :^ { RACDisposable * innerDisposable = self .didSubscribe (subscriber ); [disposable addDisposable :innerDisposable]; }]; [disposable addDisposable :schedulingDisposable ]; } return disposable ; }
代码中有注释,具体整理如下:
then 1 2 3 4 5 6 7 8 9 10 - (RACSignal *)then:(RACSignal * (^)(void ))block RAC_WARN_UNUSED_RESULT;
作用: 先执行信号1
但过滤掉信号1
的数据,再执行信号2
的任务并获取其发送的数据。
#示例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 - (void )then { RACSignal * signal1 = [RACSignal createSignal :^ RACDisposable * (id <RACSubscriber > subscriber ) { NSLog (@"+++call signal1 block" ); [subscriber sendNext :@"1.1" ]; [[RACScheduler mainThreadScheduler ] afterDelay :2 schedule :^ { [subscriber sendNext :@"1.2" ]; [subscriber sendCompleted ]; }]; return nil ; }]; RACSignal * signal2 = [RACSignal createSignal :^ RACDisposable * (id < RACSubscriber > subscriber ) { NSLog (@"+++call signal2 block" ); [subscriber sendNext :@"2.1" ]; [[RACScheduler mainThreadScheduler ] afterDelay :2 schedule :^ { [subscriber sendNext :@"2.2" ]; [subscriber sendCompleted ]; }]; return nil ; }]; RACSignal * signal3 = [signal1 then :^ RACSignal * { NSLog (@"++++call signal3 then" ); return signal2 ; }]; [signal3 subscribeNext :^ (id x ) { NSLog (@"++++received data:%@" ,x ); }]; }
日志:
1 2 3 4 5 +++call signal1 block ++++call signal3 then +++call signal2 block ++++received data :2.1 ++++received data :2.2
说明:
1.信号1
发送的数据自动被过滤掉,信号3
订阅者不会收到数据信号1
的数据回调;
2.信号2
发送几次数据,信号3
的订阅者就收到几次数据;
3.信号2
总是在信号1
完成之后,才会开始执行自己的任务;
#实现原理:
1 2 3 4 5 6 7 8 - (RACSignal *)then :(RACSignal * (^)(void))block { NSCParameterAssert(block != nil ); return [[[self ignoreValues] concat:[RACSignal defer:block]] setNameWithFormat:@"[%@] -then:" , self .name]; }
1.先通过ignoreValues
对信号1
进行了过滤,忽略掉了它的sendNext:
事件,只关注其error
和completed
事件:
1 2 3 4 5 6 7 8 9 10 11 /// Ignores all `next` s from the receiver. /// /// Returns a signal which only passes through `error` or `completed` events from /// the receiver. - (RACSignal *)ignoreValues RAC_WARN_UNUSED_RESULT ; - (RACSignal *)ignoreValues { return [[self filter: ^(id _) { return NO ; }] setNameWithFormat: @"[%@] -ignoreValues" , self .name]; }
2.再通过[RACSignal defer:block]]
,将信号2
封装到一个新信号中:
1 2 3 4 5 6 7 8 9 10 11 12 + (RACSignal<ValueType> *)defer:(RACSignal<ValueType> * (^)(void ))block RAC_WARN_UNUSED_RESULT; + (RACSignal *)defer:(RACSignal<id> * (^)(void ))block { NSCParameterAssert(block != NULL); return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { return [block () subscribe:subscriber]; }] setNameWithFormat:@"+defer:" ]; }
3.最后用concat
方法将两个新信号进行拼接,先执行信号1
的任务,再开始信号2
的任务。
所以,从本质上来说,then
最终调用的还是concat
~
merge 1 2 3 4 5 6 + (RACSignal <ValueType > * )merge:(id< NSFastEnumeration > )signals RAC_WARN_UNUSED_RESULT ;
作用: 将n个信号整合为新信号x
,信号之间不存在依赖关系,任何一个信号发送数据,信号x的订阅者都会收到回调,且遵循FIFO原则,谁先发送数据则订阅者就先收到谁的数据。
#示例3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 - (void)merge { // merge 谁先sendNext就先收到谁的数据 RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal1 block" ); [subscriber sendNext:@"1.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"1.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal2 block" ); [subscriber sendNext:@"2.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"2.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal3 = [RACSignal merge:@[signal1,signal2]]; [signal3 subscribeNext:^(id x) { NSLog (@"+++++received data:%@" ,x); }]; }
日志:
1 2 3 4 5 6 +++call signal1 block +++++received data :1.1 +++call signal2 block +++++received data :2.1 +++++received data :1.2 +++++received data :2.2
说明:
1.merge
将信号1
和信号2
整合到信号3
中,1、2之间不会相互影响;
2.信号1
和信号2
发送新数据时,信号3
都能收到新数据;
#实现原理
从最底层的实现上来讲,merge使用了RAC的-bind:
方法,接下来一步步来看:
这里我们调用了merge
,将信号1
和信号2
合并,来看看方法内部具体做了啥:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 + (RACSignal * )merge:(id< NSFastEnumeration > )signals { NSMutableArray * copiedSignals = [[NSMutableArray alloc] init ]; for (RACSignal * signal in signals) { [copiedSignals addObject:signal]; } return [[[RACSignal createSignal:^ RACDisposable * (id< RACSubscriber > subscriber) { for (RACSignal * signal in copiedSignals) { [subscriber sendNext:signal]; } [subscriber sendCompleted]; return nil ; }] flatten] setNameWithFormat:@"+merge: %@" , copiedSignals]; }
这里的信号4
看上去是一个挺简单的信号,主要作用就是向订阅者发送信号1
和信号2
。信号4
调用了flatten
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 - (__kindof RACStream *)flatten { // 我将此方法的参数命名为 mapBlock 方便后续解释此处代码 return [[self flattenMap: ^(id value) { return value; }] setNameWithFormat: @"[%@] -flatten" , self .name]; } - (__kindof RACStream *)flattenMap: (__kindof RACStream * (^)(id value))block { Class class = self .class ; return [[self bind: ^{ //bind 的参数为一个block 此block类型为:RACSignalBindBlock (^)(void),即无参数,返回一个RACSignalBindBlock类型的bindingBlock return ^(id value, BOOL *stop) { id stream = block(value) ?: [class empty]; NSCAssert([stream isKindOfClass: RACStream.class ], @"Value returned from -flattenMap: is not a stream: %@" , stream); return stream; }; }] setNameWithFormat: @"[%@] -flattenMap:" , self .name]; }
可以看到,flatten
方法内部最终是调用了bind
方法,即信号4
调用了bind
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 - (RACSignal * )bind :(RACSignalBindBlock (^ )(void ))block { NSCParameterAssert (block != NULL ); return [[RACSignal createSignal :^ (id <RACSubscriber > subscriber ) { RACSignalBindBlock bindingBlock = block (); __block volatile int32_t signalCount = 1 ; RACCompoundDisposable * compoundDisposable = [RACCompoundDisposable compoundDisposable ]; void (^ completeSignal )(RACDisposable * ) = ^ (RACDisposable *finishedDisposable ) { if (OSAtomicDecrement 32Barrier(& signalCount ) == 0 ) { [subscriber sendCompleted ]; [compoundDisposable dispose ]; } else { [compoundDisposable removeDisposable :finishedDisposable ]; } }; void (^ addSignal )(RACSignal * ) = ^ (RACSignal *signal ) { OSAtomicIncrement 32Barrier(& signalCount ); RACSerialDisposable * selfDisposable = [[RACSerialDisposable alloc ] init]; [compoundDisposable addDisposable :selfDisposable ]; RACDisposable * disposable = [signal subscribeNext :^ (id x ) { [subscriber sendNext :x ]; } error :^ (NSError *error ) { [compoundDisposable dispose ]; [subscriber sendError :error ]; } completed :^ { @autoreleasepool { completeSignal (selfDisposable ); } }]; selfDisposable .disposable = disposable ; }; @autoreleasepool { RACSerialDisposable * selfDisposable = [[RACSerialDisposable alloc ] init]; [compoundDisposable addDisposable :selfDisposable ]; RACDisposable * bindingDisposable = [self subscribeNext :^ (id x ) { if (compoundDisposable .disposed ) return ; BOOL stop = NO ; id signal = bindingBlock (x , & stop ); @autoreleasepool { if (signal != nil ) addSignal (signal ); if (signal == nil || stop ) { [selfDisposable dispose ]; completeSignal (selfDisposable ); } } } error :^ (NSError *error ) { [compoundDisposable dispose ]; [subscriber sendError :error ]; } completed :^ { @autoreleasepool { completeSignal (selfDisposable ); } }]; selfDisposable .disposable = bindingDisposable ; } return compoundDisposable ; }] setNameWithFormat :@"[%@] -bind:" , self .name ]; }
bind方法里的代码逻辑,结合我在上面的标注来看:
1.信号4
调用bind
方法后,bind
内部先创建了一个新的信号并沿着调用路径层层往上返回给调用者,最终这个新信号的接收者正是我们在最开始执行”[RACSignal merge:@[signal1,signal2]]”时的signal3
,即信号3
。在示例3中订阅信号3
之后,触发其^didSubscribe3
,也就是bind
方法内新建信号后面的那个大 block。
2.^didSubscribe3
先调用了bind
的 block参数,返回了一个bindingBlock
备用;
3.紧接着订阅了信号4
,从而触发^didSubscribe4
;
4.^didSubscribe4
通过 for循环将信号1
和信号2
作为数据发送给订阅者^sendNext4
;
5.^sendNext4
中调用bindingBlock
,其参数为信号4
传来的信号1
和信号2
;
6.bindingBlock
内调用了block(value)
,即回调了flattenMap
方法的 mapBlock,最终返回了value
(信号1和信号2);
7.^sendNext4
继续调用addSignal(信号1/信号2)
;
8.addSignal()
内部实现是订阅传进来的信号,即订阅了信号1
和信号2
;
9.到这一步bind
已经万事俱备,只等接收数据了。当信号1
和信号2
产生新数据时,^sendNext1/^sendNext2
会自动触发;
10.^sendNext1/^sendNext2
内通过^didSubscribe3
将数据回调给订阅者,因为^didSubscribe3
本身代表的是信号3的订阅,所以信号1
和信号2
的数据最终传递给了信号3
的订阅者,即我们在示例3中定义好的 block 中;
小结: 以上就是merge
方法的具体实现,简单来说就是merge
操作得到的新信号,通过bind
方法在内部订阅了被merge
的子信号;子信号产生一份数据时新信号就会收到一份新数据。所以想要更好的理解merge
方法,就要先理解bind
方法。
附赠我用思维导图整理的merge
实现:
zip 1 2 3 4 5 6 7 8 9 10 11 + (RACSignal <RACTuple *> * )zip:(id< NSFastEnumeration > )signals RAC_WARN_UNUSED_RESULT ;
作用: 将n个信号打包进一个新信号中,当所有子信号都sendNext
之后,新信号将接收到的数据组合在一个元组
中,值在元组中的排序按照打包信号时的顺序来。
#示例4:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 - (void)zip { // zip: 合并n个信号的值到一个元组中,哪个信号在前哪个信号的值就在元组中靠前 RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal1 block" ); [subscriber sendNext:@"1" ]; [subscriber sendCompleted]; return nil; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal2 block" ); [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{ [subscriber sendNext:@"2.1" ]; }]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"2.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal3 = [RACSignal zip:@[signal1,signal2]]; [signal3 subscribeNext:^(id x) { NSLog (@"+++++received data:%@" ,x); }]; }
日志:
1 2 3 4 5 6 +++call signal1 block +++call signal2 block +++++received data :<RACTuple: 0x7ff33e4bbab0 > ( 1 , "2.1" )
说明:
1.信号3
的订阅者要在组合中的信号都sendNext:
之后才会收到回调的元组;
2.信号3
元组中数据的顺序与组合时信号的顺序排列一致;
3.信号3
的订阅者是一次性的,收到一次数据后,不论源信号再发送几次数据,它都不再接收。
zip
可用来整合网络请求,例如当需要同时发送N个请求,只有这N个请求都成功后,才将这N个的结果整合起来继续往下处理。
combineLatest 1 2 3 4 5 6 7 8 9 10 11 12 + (RACSignal <RACTuple *> * )combineLatest:(id< NSFastEnumeration > )signals RAC_WARN_UNUSED_RESULT ;
作用: 将多个信号的最新值
组合到新信号的一个元组
中,直到所有的信号都sendNext
之后,新信号的订阅者才能收到此元组。
#示例5:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 - (void)combineLatest { RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal1 block" ); [subscriber sendNext:@"1" ]; [subscriber sendCompleted]; return nil; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal2 block" ); [subscriber sendNext:@"2.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"2.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal3 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber > subscriber) { NSLog (@"+++call signal3 block" ); [subscriber sendNext:@"3.1" ]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subscriber sendNext:@"3.2" ]; [subscriber sendCompleted]; }]; return nil; }]; RACSignal *signal4 = [RACSignal combineLatest:@[signal1,signal2,signal3]]; [signal4 subscribeNext:^(id x) { NSLog (@"+++++received data:%@" ,x); }]; }
日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 02 :30 :08.894 +++call signal1 block 02 :30 :08.895 +++call signal2 block 02 :30 :08.895 +++call signal3 block 02 :30 :08.896 +++++received data :<RACTuple: 0x7ff33e49e300 > ( 1 , "2.1" , "3.1" ) 02 :30 :10.895 +++++received data :<RACTuple: 0x7ff33e4db8d0 > ( 1 , "2.2" , "3.1" ) 02 :30 :11.094 +++++received data :<RACTuple: 0x7ff33e6e79f0 > ( 1 , "2.2" , "3.2" )
这里需要说明的是:
1.信号4
订阅者的元组中数据的顺序,是按照组合时的顺序来的;
2.信号4
的订阅者第一次触发是在所有信号都发送了一遍sendNext:
之后;
3.之后如果再有某个信号sendNext:
,信号4
都会再次触发并返回最新值组成的元组,这是它与zip
的最大不同;
结束语 以上就是RAC中几种常用组合信号的方法,可以根据业务需求任意组合操作,很强大、方便。
相关参考:
#©RAC-Github