RAC-信号的组合

前言

项目中往往有多个信号在执行任务,而信号之间通常要按一定的方式或顺序进行组合。比如:

  • 先要用户登录成功才能加载通讯录;
  • 请求头像和用户信息都完成后才刷新详情页。

RAC 针对诸如这些场景,设计了一套实用的API供我们组合信号:

concat

1
2
/// Subscribes to `signal` when the source signal completes.
- (RACSignal *)concat:(RACSignal *)signal RAC_WARN_UNUSED_RESULT;

作用: 拼接信号流,将信号1信号2拼接成新的信号,信号1状态为Completed时才开始执行信号2中的任务。无论信号1信号2谁先发送数据,新信号的订阅者最终都只会按信号1信号2后的顺序收到回调。

#示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- (void)concat {
// concat: concat左边的在前,右边的在后,二者发送数据后按照前后顺序分别触发一次新信号订阅者回调
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal1 block");
[subscriber sendNext:@"1.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"1.2"];
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal2 block");
[subscriber sendNext:@"2.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"2.2"];
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal *signal3 = [signal1 concat:signal2];
[signal3 subscribeNext:^(id x) {
NSLog(@"+++++received data:%@",x);
}];
}

日志:

1
2
3
4
5
6
+++call signal1 block
+++++received data:1.1
+++++received data:1.2
+++call signal2 block
+++++received data:2.1
+++++received data:2.2

说明:

1.信号1信号2发送几次数据,信号3订阅者就收到几次数据;

2.信号3订阅者收到数据的顺序是确定的,按照拼接时的顺序来:信号1先,信号2后;

3.concat类似于NSOperation的依赖关系,信号2依赖信号1,所以信号2要知道信号1的状态;

4.信号1在发送完数据之后记得sendCompleted,否则信号2^didSubscribe不会触发,也就不会执行任务;

#实现原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
- (RACSignal *)concat:(RACSignal *)signal {
// 1.拼接时返回新的信号3
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
// 2.订阅信号3时立刻订阅信号1
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
// 3.信号1 sendNext 后向信号3 的订阅者发送数据
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
// 4.信号1 sendConpleted 后调用信号2的 didSubscribe()
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
[compoundDisposable addDisposable:concattedDisposable];
}];

[compoundDisposable addDisposable:sourceDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
...
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
// 调用信号2的 didSubscribe()
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}

代码中有注释,具体整理如下:

RAC_CONCAT

then

1
2
3
4
5
6
7
8
9
10
/// Ignores all `next`s from the receiver, waits for the receiver to complete,
/// then subscribes to a new signal.
///
/// block - A block which will create or obtain a new signal to subscribe to,
/// executed only after the receiver completes. This block must not be
/// nil, and it must not return a nil signal.
///
/// Returns a signal which will pass through the events of the signal created in
/// `block`. If the receiver errors out, the returned signal will error as well.
- (RACSignal *)then:(RACSignal * (^)(void))block RAC_WARN_UNUSED_RESULT;

作用: 先执行信号1但过滤掉信号1的数据,再执行信号2的任务并获取其发送的数据。

#示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
- (void)then {
// then:信号1先执行,信号2后执行,最后只获取信号2的数据
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// 1
NSLog(@"+++call signal1 block");
[subscriber sendNext:@"1.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"1.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable * (id<RACSubscriber> subscriber) {
// 3
NSLog(@"+++call signal2 block");
[subscriber sendNext:@"2.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"2.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal3 = [signal1 then:^RACSignal * {
// 2
NSLog(@"++++call signal3 then");
return signal2;
}];

[signal3 subscribeNext:^(id x) {
// 4
NSLog(@"++++received data:%@",x);
}];
}

日志:

1
2
3
4
5
+++call signal1 block
++++call signal3 then
+++call signal2 block
++++received data:2.1
++++received data:2.2

说明:

1.信号1发送的数据自动被过滤掉,信号3订阅者不会收到数据信号1的数据回调;

2.信号2发送几次数据,信号3的订阅者就收到几次数据;

3.信号2总是在信号1完成之后,才会开始执行自己的任务;

#实现原理:

1
2
3
4
5
6
7
8
- (RACSignal *)then:(RACSignal * (^)(void))block {
NSCParameterAssert(block != nil);

return [[[self
ignoreValues]
concat:[RACSignal defer:block]]
setNameWithFormat:@"[%@] -then:", self.name];
}

1.先通过ignoreValues信号1进行了过滤,忽略掉了它的sendNext:事件,只关注其errorcompleted事件:

1
2
3
4
5
6
7
8
9
10
11
/// Ignores all `next`s from the receiver.
///
/// Returns a signal which only passes through `error` or `completed` events from
/// the receiver.
- (RACSignal *)ignoreValues RAC_WARN_UNUSED_RESULT;

- (RACSignal *)ignoreValues {
return [[self filter:^(id _) {
return NO;
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}

2.再通过[RACSignal defer:block]],将信号2封装到一个新信号中:

1
2
3
4
5
6
7
8
9
10
11
12
/// Defers creation of a signal until the signal's actually subscribed to.
///
/// This can be used to effectively turn a hot signal into a cold signal.
+ (RACSignal<ValueType> *)defer:(RACSignal<ValueType> * (^)(void))block RAC_WARN_UNUSED_RESULT;

+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}

3.最后用concat方法将两个新信号进行拼接,先执行信号1的任务,再开始信号2的任务。

所以,从本质上来说,then最终调用的还是concat~

merge

1
2
3
4
5
6
/// Sends the latest `next` from any of the signals.
///
/// Returns a signal that passes through values from each of the given signals,
/// and sends `completed` when all of them complete. If any signal sends an error,
/// the returned signal sends `error` immediately.
+ (RACSignal<ValueType> *)merge:(id<NSFastEnumeration>)signals RAC_WARN_UNUSED_RESULT;

作用: 将n个信号整合为新信号x,信号之间不存在依赖关系,任何一个信号发送数据,信号x的订阅者都会收到回调,且遵循FIFO原则,谁先发送数据则订阅者就先收到谁的数据。

#示例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (void)merge {
// merge 谁先sendNext就先收到谁的数据
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal1 block");
[subscriber sendNext:@"1.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"1.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal2 block");
[subscriber sendNext:@"2.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"2.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal3 = [RACSignal merge:@[signal1,signal2]];

[signal3 subscribeNext:^(id x) {
NSLog(@"+++++received data:%@",x);
}];
}

日志:

1
2
3
4
5
6
+++call signal1 block
+++++received data:1.1
+++call signal2 block
+++++received data:2.1
+++++received data:1.2
+++++received data:2.2

说明:

1.merge信号1信号2整合到信号3中,1、2之间不会相互影响;

2.信号1信号2发送新数据时,信号3都能收到新数据;

#实现原理

从最底层的实现上来讲,merge使用了RAC的-bind:方法,接下来一步步来看:

1
RACSignal *signal3 = [RACSignal merge:@[signal1,signal2]];

这里我们调用了merge,将信号1信号2合并,来看看方法内部具体做了啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
// 1.将信号1和信号2保存到一个数组中
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}

return [[[RACSignal // 2.创建信号4
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) { //将此block命名为^didSubscribe4
for (RACSignal *signal in copiedSignals) {
// 发送信号1和信号2(注意这与我们自己使用时发送的字符串之类的不同,它发送的是信号类型)
[subscriber sendNext:signal];
}

[subscriber sendCompleted];
return nil;
}]
flatten]// 3.信号4调用了 flatten 方法
setNameWithFormat:@"+merge: %@", copiedSignals];
}

这里的信号4看上去是一个挺简单的信号,主要作用就是向订阅者发送信号1信号2信号4调用了flatten

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (__kindof RACStream *)flatten {
//我将此方法的参数命名为 mapBlock 方便后续解释此处代码
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}

- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
Class class = self.class;

return [[self bind:^{
//bind的参数为一个block 此block类型为:RACSignalBindBlock (^)(void),即无参数,返回一个RACSignalBindBlock类型的bindingBlock
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

可以看到,flatten方法内部最终是调用了bind方法,即信号4调用了bind方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {//将此block命名为^didSubscribe3
// 1.创建并返回新信号(即示例中的信号3)
RACSignalBindBlock bindingBlock = block();
// 2.获取bind方法的参数 block 的返回值“bindingBlock”

__block volatile int32_t signalCount = 1; // indicates self

RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

// 定义completeSignal block,处理信号完成事件
void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
[compoundDisposable removeDisposable:finishedDisposable];
}
};

// 定义addSignal block,处理传进来的信号参数
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
OSAtomicIncrement32Barrier(&signalCount);

RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

// 8.订阅传进来的信号(信号1和信号2)
RACDisposable *disposable = [signal subscribeNext:^(id x) { //将此block命名为^sendNext1/^sendNext2
// 9.信号1和信号2发送数据时,向信号3的订阅者subscriber发送数据
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(selfDisposable);
}
}];

selfDisposable.disposable = disposable;
};

@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

// 3.订阅信号4 这里的 self 是信号4
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {// 将此block命名为^sendNext4

// 4.信号4 ^didSubscribe4 中[sendNext(信号1/信号2))]触发此处回调
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;


BOOL stop = NO;
// 5.调用bindingBlock,参数x为信号4发送的数据(信号1和信号2)
id signal = bindingBlock(x, &stop);

// 6.bindingBlock中调用了 mapBlock(value),最终返回了value,即信号1和信号2

@autoreleasepool {
// 7.执行addSignal()
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(selfDisposable);
}
}];

selfDisposable.disposable = bindingDisposable;
}

return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}

bind方法里的代码逻辑,结合我在上面的标注来看:

1.信号4调用bind方法后,bind内部先创建了一个新的信号并沿着调用路径层层往上返回给调用者,最终这个新信号的接收者正是我们在最开始执行”[RACSignal merge:@[signal1,signal2]]”时的signal3,即信号3。在示例3中订阅信号3之后,触发其^didSubscribe3,也就是bind方法内新建信号后面的那个大 block。

2.^didSubscribe3先调用了bind的 block参数,返回了一个bindingBlock备用;

3.紧接着订阅了信号4,从而触发^didSubscribe4

4.^didSubscribe4通过 for循环将信号1信号2作为数据发送给订阅者^sendNext4

5.^sendNext4中调用bindingBlock,其参数为信号4传来的信号1信号2

6.bindingBlock内调用了block(value),即回调了flattenMap方法的 mapBlock,最终返回了value(信号1和信号2);

7.^sendNext4继续调用addSignal(信号1/信号2)

8.addSignal()内部实现是订阅传进来的信号,即订阅了信号1信号2

9.到这一步bind已经万事俱备,只等接收数据了。当信号1信号2产生新数据时,^sendNext1/^sendNext2会自动触发;

10.^sendNext1/^sendNext2内通过^didSubscribe3将数据回调给订阅者,因为^didSubscribe3本身代表的是信号3的订阅,所以信号1信号2的数据最终传递给了信号3的订阅者,即我们在示例3中定义好的 block 中;

小结: 以上就是merge方法的具体实现,简单来说就是merge操作得到的新信号,通过bind方法在内部订阅了被merge的子信号;子信号产生一份数据时新信号就会收到一份新数据。所以想要更好的理解merge方法,就要先理解bind方法。

附赠我用思维导图整理的merge实现:
信号绑定

zip

1
2
3
4
5
6
7
8
9
10
11
/// Zips the values in the given signals to create RACTuples.
///
/// The first value of each signals will be combined, then the second value, and
/// so forth, until at least one of the signals is exhausted.
///
/// signals - The signals to combine. If this collection is empty, the returned
/// signal will be empty.
///
/// Returns a new signal containing RACTuples of the zipped values from the
/// signals.
+ (RACSignal<RACTuple *> *)zip:(id<NSFastEnumeration>)signals RAC_WARN_UNUSED_RESULT;

作用: 将n个信号打包进一个新信号中,当所有子信号都sendNext之后,新信号将接收到的数据组合在一个元组中,值在元组中的排序按照打包信号时的顺序来。

#示例4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- (void)zip {
// zip: 合并n个信号的值到一个元组中,哪个信号在前哪个信号的值就在元组中靠前
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal1 block");
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal2 block");

[[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
[subscriber sendNext:@"2.1"];
}];

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"2.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal3 = [RACSignal zip:@[signal1,signal2]];

[signal3 subscribeNext:^(id x) {
NSLog(@"+++++received data:%@",x);
}];
}

日志:

1
2
3
4
5
6
+++call signal1 block
+++call signal2 block
+++++received data:<RACTuple: 0x7ff33e4bbab0> (
1,
"2.1"
)

说明:

1.信号3的订阅者要在组合中的信号都sendNext:之后才会收到回调的元组;

2.信号3元组中数据的顺序与组合时信号的顺序排列一致;

3.信号3的订阅者是一次性的,收到一次数据后,不论源信号再发送几次数据,它都不再接收。

zip可用来整合网络请求,例如当需要同时发送N个请求,只有这N个请求都成功后,才将这N个的结果整合起来继续往下处理。

combineLatest

1
2
3
4
5
6
7
8
9
10
11
12
/// Combines the latest values from the given signals into RACTuples, once all
/// the signals have sent at least one `next`.
///
/// Any additional `next`s will result in a new RACTuple with the latest values
/// from all signals.
///
/// signals - The signals to combine. If this collection is empty, the returned
/// signal will immediately complete upon subscription.
///
/// Returns a signal which sends RACTuples of the combined values, forwards any
/// `error` events, and completes when all input signals complete.
+ (RACSignal<RACTuple *> *)combineLatest:(id<NSFastEnumeration>)signals RAC_WARN_UNUSED_RESULT;

作用: 将多个信号的最新值组合到新信号的一个元组中,直到所有的信号都sendNext之后,新信号的订阅者才能收到此元组。

#示例5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
- (void)combineLatest {
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal1 block");
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal2 block");
[subscriber sendNext:@"2.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"2.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal3 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"+++call signal3 block");
[subscriber sendNext:@"3.1"];
[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[subscriber sendNext:@"3.2"];
[subscriber sendCompleted];
}];
return nil;
}];

RACSignal *signal4 = [RACSignal combineLatest:@[signal1,signal2,signal3]];

[signal4 subscribeNext:^(id x) {
NSLog(@"+++++received data:%@",x);
}];
}

日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
02:30:08.894 +++call signal1 block
02:30:08.895 +++call signal2 block
02:30:08.895 +++call signal3 block
02:30:08.896 +++++received data:<RACTuple: 0x7ff33e49e300> (
1,
"2.1",
"3.1"
)
02:30:10.895 +++++received data:<RACTuple: 0x7ff33e4db8d0> (
1,
"2.2",
"3.1"
)
02:30:11.094 +++++received data:<RACTuple: 0x7ff33e6e79f0> (
1,
"2.2",
"3.2"
)

这里需要说明的是:

1.信号4订阅者的元组中数据的顺序,是按照组合时的顺序来的;

2.信号4的订阅者第一次触发是在所有信号都发送了一遍sendNext:之后;

3.之后如果再有某个信号sendNext:信号4都会再次触发并返回最新值组成的元组,这是它与zip的最大不同;

结束语

以上就是RAC中几种常用组合信号的方法,可以根据业务需求任意组合操作,很强大、方便。


相关参考:

#©RAC-Github


RAC-信号的组合
https://davidlii.cn/2021/06/01/RAC-order.html
作者
Davidli
发布于
2021年6月1日
许可协议