RAC-信号与订阅者

一.信号

1.信号是啥

RAC文档 中关于RACSignal的描述:

A signal, represented by the RACSignal class, is a push-driven stream.
Signals generally represent data that will be delivered in the future. As work is performed or data is received, values are sent on the signal, which pushes them out to any subscribers. Users must subscribe to a signal in order to access its values.

  • 信号是一种 “推送” 类型的流。
  • 信号代表着将来要传输的数据,值通过信号传递给订阅者;
  • 用户须订阅信号才能接收到这些值。

2.冷热信号

信号分为冷信号热信号两种:

  • 冷信号是被动的,被订阅之后才能发送消息;
  • 热信号是主动的,即使未被订阅也能不断推送新消息;
  • 冷信号只能1-1,对应一个订阅者,当有新订阅者时消息会重新发送一遍;
  • 热信号可以1-N,可以有多个订阅者,信号可与多个订阅者共享信息;
#2.1.冷信号
  • RACDynamicSignal
  • RACEmptySignal
  • RACErrorSignal
  • RACReturnSignal
  • RACChannelTerminal
#2.2.热信号
  • RACSubject
  • RACBehaviorSubject
  • RACGroupedSignal
  • RACReplaySubject

二.订阅者

关于RACSubscriber的描述:

A subscriber is anything that is waiting or capable of waiting for events from a signal. Within RAC, a subscriber is represented as any object that conforms to the RACSubscriber protocol.

Subscriptions retain their signals, and are automatically disposed of when the signal completes or errors. Subscriptions can also be disposed of manually.

  • 订阅者表示正在等待信号发送事件的对象。
  • 当信号有新数据时,通过这个订阅者发送新值。
  • 订阅者在信号发送完成或者错误事件时会自动销毁。

这些关于信号订阅者的描述很容易让人联想到APNs,两者机制很相似:

  • iOS设备提前注册推送服务(订阅);
  • 苹果的推送服务器推送消息(事件);
  • iOS设备接收到消息并处理自己的业务(响应)。

iOS设备就是订阅者,苹果的 APNs 服务作为一个整体而被视为信号流。二者是 N 对 1 的关系。

三.冷信号

#示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@property (nonatomic, strong) id<RACSubscriber> subscriber;

- (void)coldSignal {
//1.创建信号
RACSignal *s1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// ^didSubscribe
// 这里封装业务代码
_subscriber = subscriber;
return [RACDisposable disposableWithBlock:^{
//清理资源
}];
}];
//2.订阅信号
[s1 subscribeNext:^(id x) {
//4.接收数据
NSLog(@"+++received value1:%@",x);
}];
//3.订阅信号
[s1 subscribeNext:^(id x) {
//5.接收数据
NSLog(@"+++received value2:%@",x);
}];
[_subscriber sendNext:@"x"];
}

日志:

1
+++received value2:x

1.创建信号

#1.1.类方法:

调用RACSignal的类方法创建一个信号:

1
2
3
4
5
6
RACSignal *s1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
//^didSubscribe中执行任务
return [RACDisposable disposableWithBlock:^{
//清理资源
}];
}];

此方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/// Creates a new signal. This is the preferred way to create a new signal
/// operation or behavior.
///
/// Events can be sent to new subscribers immediately in the `didSubscribe`
/// block, but the subscriber will not be able to dispose of the signal until
/// a RACDisposable is returned from `didSubscribe`. In the case of infinite
/// signals, this won't _ever_ happen if events are sent immediately.
///
/// To ensure that the signal is disposable, events can be scheduled on the
/// +[RACScheduler currentScheduler] (so that they're deferred, not sent
/// immediately), or they can be sent in the background. The RACDisposable
/// returned by the `didSubscribe` block should cancel any such scheduling or
/// asynchronous work.
///
/// didSubscribe - Called when the signal is subscribed to. The new subscriber is
/// passed in. You can then manually control the <RACSubscriber> by
/// sending it -sendNext:, -sendError:, and -sendCompleted,
/// as defined by the operation you're implementing. This block
/// should return a RACDisposable which cancels any ongoing work
/// triggered by the subscription, and cleans up any resources or
/// disposables created as part of it. When the disposable is
/// disposed of, the signal must not send any more events to the
/// `subscriber`. If no cleanup is necessary, return nil.
///
/// **Note:** The `didSubscribe` block is called every time a new subscriber
/// subscribes. Any side effects within the block will thus execute once for each
/// subscription, not necessarily on one thread, and possibly even
/// simultaneously!
+ (RACSignal<ValueType> *)createSignal:(RACDisposable * _Nullable (^)(id<RACSubscriber> subscriber))didSubscribe RAC_WARN_UNUSED_RESULT;

方法的参数为didSubscribe,返回值为新的信号。

#1.2.didSubscribe:

didSubscribe是我们定义的处理业务的Block,在创建信号时作为参数,被传入并保存在信号内;

didSubscribe的参数为实现了RACSubscriber协议的订阅者,在信号被订阅后可用于接收数据;

didSubscribe的返回值是一个用于清理资源的RACDisposable对象,无需清理资源时可返回 nil;

didSubscribe在创建信号时被作为参数传入并保存在RACSignal实例的_didSubscribe属性内:

1
2
3
4
5
6
7
8
9
10
11
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {

RACDynamicSignal *signal = [[self alloc] init];
// 保存block为属性
signal->_didSubscribe = [didSubscribe copy];

return [signal setNameWithFormat:@"+createSignal:"];
}

#1.3.冷信号

执行完上面的类方法之后,我们已经成功创建了一个信号。但新创建的信号默认是信号,还不能立刻用它发送数据,因为:

发送数据需要用到sendNext方法;

sendNext方法的调用者为subscriber,即订阅者;

所以,我们还缺少一个订阅者,这就是所谓的冷信号需要被订阅之后才能发送数据

2.订阅信号

订阅信号的语句为:

1
2
3
4
//2.订阅信号
[s1 subscribeNext:^(id x) {
NSLog(@"+++received value:%@",x);
}];

其中subscribeNext:方法的实现为:

1
2
3
4
5
6
7
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
// 1.创建订阅者
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
// 2.调用 ^didSubscribe
return [self subscribe:o];
}

#2.1.保存事件的block

首先,subscribeNext:内会创建一个订阅者,且冷信号每次被订阅都会创建新的订阅者,多次订阅只会保留最后一次订阅关系,即信号被多次订阅后,sendNext:通过订阅者发送数据时,只有最后一个订阅者会收到数据回调;

其次,方法将传进来的nexterrorcompleted 三个 block 保存到此订阅者对象中(属性):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@interface RACSubscriber ()

// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

+ (instancetype)subscriberWithNext:(void (^)(id x))next
error:(void (^)(NSError *error))error
completed:(void (^)(void))completed
{
RACSubscriber *subscriber = [[self alloc] init];

subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];

return subscriber;
}

这三个 block 分别代表了信号的三种事件:

  • next 表示将要发送数据;
  • error 表示出现错误;
  • completed 表示信号已完成。

调用完subscribeNext:方法后,就成功创建了订阅者对象,也就是之前的didSubscribe block 所缺少的那个参数。

至此,信号具备了发送数据的完整前提条件!

#2.2.调用didSubscribe block

创建订阅者后,继续调用self.didSubscribe(subscriber),subscriber参数就是刚新建的订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
...
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
//调用之前创建和保存起来的*didSubscribe* block,传入订阅者参数
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}

这里 self.didSubscribe 属性指向的正是之前在创建信号时我们自定义的didSubscribe,所以 self.didSubscribe(subscriber) 就是调用我们自己的didSubscribe block。

即:订阅信号后,信号的didSubscribe会自动调用一次

3.发送消息

订阅完成后自动创建了订阅者,接下来就可以向订阅者发送事件数据了。信号内有三种消息:

  • next
  • error
  • completed

三种消息对应的接口分别为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/// Sends the next value to subscribers.
///
/// value - The value to send. This can be `nil`.
- (void)sendNext:(nullable id)value;

/// Sends the error to subscribers.
///
/// error - The error to send. This can be `nil`.
///
/// This terminates the subscription, and invalidates the subscriber (such that
/// it cannot subscribe to anything else in the future).
- (void)sendError:(nullable NSError *)error;

/// Sends completed to subscribers.
///
/// This terminates the subscription, and invalidates the subscriber (such that
/// it cannot subscribe to anything else in the future).
- (void)sendCompleted;

其实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;

nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];

if (errorBlock == nil) return;
errorBlock(e);
}
}

- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];

if (completedBlock == nil) return;
completedBlock();
}
}

向订阅者发送nexterrorcompleted事件,实际上是调用之前保存在订阅者对象中的三种 block 属性。

注意:信号向订阅者发送了error或者completed后,订阅关系随即结束,订阅者后续不会再收到任何事件和数据。

#冷信号使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@interface HelloRACController ()
@property (nonatomic, strong) id<RACSubscriber> subscriber;
@end

@implementation HelloRACController

- (void)viewDidLoad {
[super viewDidLoad];
//1.信号的使用
//1.1.创建信号,此时信号为冷信号,被订阅之后才能发送消息
RACSignal *s1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {

//当有订阅者订阅信号,就会调用此block
_subscriber = subscriber;

//1.3.发送信号
[subscriber sendNext:@"Hello from s1~"];

//回抛错误
[subscriber sendError:[NSError errorWithDomain:@"NetError" code:1022 userInfo:@{@"code":@(1022)}]];

//如果不再发送数据,发送信号完成
[subscriber sendCompleted];

//返回一个信号,内部会自动调用[RACDisposable disposable]取消订阅。
return [RACDisposable disposableWithBlock:^{
NSLog(@"++s1 Disposed~");
}];
}];

//1.2.订阅信号(被订阅后,如果向订阅者发送了信号,则此时subscribeNext的block被调用)
[s1 subscribeNext:^(id x) {
NSLog(@"~~第1个订阅消息:%@",x);
}];

[s1 subscribeNext:^(id x) {
NSLog(@"~~第2个订阅消息:%@",x);
} error:^(NSError * error) {
NSLog(@"+++error:%@",error.description);
} completed:^{
NSLog(@"++s1 completed");
}];

[_subscriber sendNext:@"Hello2 from s1~"];
}
@end

日志:

1
2
3
4
5
~~第1个订阅消息:Hello from s1~
++s1 Disposed~
~~第2个订阅消息:Hello from s1~
+++error:Error Domain=NetError Code=1022 "(null)" UserInfo={code=1022}
++s1 Disposed~

结合日志来分析,有几点可以得到印证:

1.调用完 [s1 subscribeNext:] 订阅信号之后,s1的 block 会立刻执行。

2.订阅者调用 [subscriber sendNext:] 后订阅者的subscribeNext block 会立刻执行。同理调用 [subscriber sendError:] 或者 [subscriber sendCompleted] 后,对应的errorcompleted回调也会立刻执行。

3.block 内调用 [subscriber sendError:] 或者 [subscriber sendCompleted] 之后,信号s1会立刻调用其RACDisposable的 block 清理资源。

4.一旦信号调用RACDisposable之后,它后续将不能再接收任何事件。比如示例中第二个订阅者调用了 sendError 之后,s1 自动调用了RACDisposable,后续调用 setCompletedcompleted回调不再执行,并且后面的 [_subscriber sendNext:] 也不会有任何响应或日志输出。

四.热信号

热信号本身不需要订阅也能发送消息,因为它们一般都直接或间接地实现了<RACSubscriber>协议,此协议定义了以下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@protocol RACSubscriber <NSObject>
@required

/// Sends the next value to subscribers.
///
/// value - The value to send. This can be `nil`.
- (void)sendNext:(nullable id)value;

/// Sends the error to subscribers.
///
/// error - The error to send. This can be `nil`.
///
/// This terminates the subscription, and invalidates the subscriber (such that
/// it cannot subscribe to anything else in the future).
- (void)sendError:(nullable NSError *)error;

/// Sends completed to subscribers.
///
/// This terminates the subscription, and invalidates the subscriber (such that
/// it cannot subscribe to anything else in the future).
- (void)sendCompleted;

/// Sends the subscriber a disposable that represents one of its subscriptions.
///
/// A subscriber may receive multiple disposables if it gets subscribed to
/// multiple signals; however, any error or completed events must terminate _all_
/// subscriptions.
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

以上几个方法主要用来向订阅者发送数据或更新状态。

热信号正是实现了此协议,才能在没有订阅者的情况下也能主动发送数据。以RACSubject为例:

1
2
3
4
5
6
/// A subject can be thought of as a signal that you can manually control by
/// sending next, completed, and error.
///
/// They're most helpful in bridging the non-RAC world to RAC, since they let you
/// manually control the sending of events.
@interface RACSubject<ValueType> : RACSignal<ValueType> <RACSubscriber>

继承自RACSignal,说明它是一种信号;实现了<RACSubscriber>协议,说明它能在不被订阅的情况下主动发送数据。

#示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)hotSignal {
RACSubject *sub = [RACSubject subject];
[sub sendNext:@"1"];

[sub subscribeNext:^(id x) {
NSLog(@"+++sub1:%@",x);
}];

[sub sendNext:@"2"];

[sub subscribeNext:^(id x) {
NSLog(@"+++sub2:%@",x);
}];

[sub sendNext:@"3"];
}

日志:

1
2
3
+++sub1:2
+++sub1:3
+++sub2:3

1.从[sub sendNext:@"1"]可以看到,sub对象不需要被订阅也能发送消息,只是此时没订阅者接收消息而已;

2.当订阅了两次之后,发送数据3时收到两次回调,也说明了热信号是1~N的关系,信号与N个订阅者共享信息。

下面将介绍RACSubject的实现:

1.创建信号

创建方法如下:

1
RACSubject *sub = [RACSubject subject];

其实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
+ (instancetype)subject {
return [[self alloc] init];
}

- (instancetype)init {
self = [super init];
if (self == nil) return nil;

_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];

return self;
}

注意这个_subscribers数组,后面会有重要作用。

2.订阅信号

1
2
3
4
5
6
[sub subscribeNext:^(id x) {
NSLog(@"+++sub1:%@",x);
}];
[sub subscribeNext:^(id x) {
NSLog(@"+++sub2:%@",x);
}];

subscribeNext:的实现为:

1
2
3
4
5
6
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);

RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}

和冷信号一样,这里会先创建一个订阅者对象,随后调用subscribe:方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
// 将订阅者添加进数组
[subscribers addObject:subscriber];
}
// 略。。
return disposable;
}

方法将传进来的订阅者添加进之前提到的subscribers数组中,等待随后的调用。

3.发送消息

1
[sub sendNext:@"1"];

sendNext:的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}

for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}

sub对象从subscribers数组中取出所有的订阅者,依次向其发送数据~

4.用作代理

RACSubject的实现并不难理解,它与订阅者之间是1~N的关系并可主动发送消息,常用来代替OC中的delegate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@interface ViewControllerII : UIViewController

@property (nonatomic, strong) RACSubject *delegate;

@end

@implementation ViewControllerII

- (void)viewDidLoad {
[super viewDidLoad];
}

- (IBAction)onDismiss:(id)sender
{
//RAC回调
if (self.delegate) {
[self.delegate sendNext:@"++ViewControllerII Closed~"];
}
//关闭
[self.navigationController popViewControllerAnimated:YES];
}

在某个页面设置代理:

1
2
3
4
5
6
7
8
9
10
11
12
- (IBAction)onAction2:(id)sender {
ViewControllerII *controller = [[UIStoryboard storyboardWithName:@"Main" bundle:nil]
instantiateViewControllerWithIdentifier:@"ViewControllerII"];
//设置代理信号
controller.delegate = [RACSubject subject];

//订阅代理
[controller.delegate subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[self.navigationController pushViewController:controller animated:YES];
}

五.组播

Signals are cold by default, meaning that they start doing work each time a new subscription is added. This behavior is usually desirable, because it means that data will be freshly recalculated for each subscriber, but it can be problematic if the signal has side effects or the work is expensive (for example, sending a network request).

信号有多个订阅者时,每被订阅一次其block都自动触发一次,这种情况有时会有副作用。比如block内封装了耗时操作,每次订阅都单独触发一遍显然浪费性能。理想的情况是多个订阅者都订阅完成后,block只触发一次而所有订阅者都收到回调。这种场景RAC考虑到了,此时RACMulticastConnection就派上用场了。

A multicast connection encapsulates the idea of sharing one subscription to a signal to many subscribers. This is most often needed if the subscription to the underlying signal involves side-effects or shouldn’t be called more than once.

组播,用于在多个订阅者之间共享对某个信号的订阅,在发送者和每一订阅者之间实现1~N的连接。

1.示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- (void)viewDidLoad {
[super viewDidLoad];

RACSignal *s2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"Hello from s2~"];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"++s2 Disposed~");
}];
}];

//连接类(单次订阅之后不会立刻收到回调,所有订阅完成后调用 connect 时才会先后触发)
RACMulticastConnection *cnn = [s2 publish];

//订阅连接类信号
[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第1个订阅消息:%@",x);
}];

[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第2个订阅消息:%@",x);
}];

[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第3个订阅消息:%@",x);
}];

[cnn connect];
}

输出日志:

1
2
3
4
~~第1个订阅消息:Hello from s2~
~~第2个订阅消息:Hello from s2~
~~第3个订阅消息:Hello from s2~
++s2 Disposed~

调用[cnn connect]后三个 block 都触发了且依次触发。

2.实现

  • 创建多播实例
1
RACMulticastConnection *cnn = [s2 publish];

来看看publish都做了什么:

1
2
3
4
5
6
7
8
9
10
11
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}

- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}

publish内创建了一个RACSubject对象,随后将其作为参数传入RACMulticastConnection的构造函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@interface RACMulticastConnection () {
RACSubject *_signal;
int32_t volatile _hasConnected;
}

@property (nonatomic, readonly, strong) RACSignal *sourceSignal;
@property (strong) RACSerialDisposable *serialDisposable;
@end

- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);

self = [super init];

_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;

return self;
}

构造函数将源信号s2保存为sourceSignal,将刚才创建的subject保存为signal,之后返回一个多播实例。

  • 订阅信号
1
2
3
4
5
6
7
8
9
10
11
[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第1个订阅消息:%@",x);
}];

[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第2个订阅消息:%@",x);
}];

[cnn.signal subscribeNext:^(id x) {
NSLog(@"~~第3个订阅消息:%@",x);
}];

这里订阅的不再是s2,而是刚才的RACSubject类型属性signal,此订阅的内部实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);

RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}

// RACSubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
// 将订阅者保存到数组中
[subscribers addObject:subscriber];
}
// 略。。。

return disposable;
}

即订阅signal时,先创建新的订阅者,随后将订阅者添加到多播实例的数组subscribers中。此时还没有调用s2didSubscribe!

  • 连接 connect
1
[cnn connect];

connect的内部实现:

1
2
3
4
5
6
7
8
9
10
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

if (shouldConnect) {
// 这里的
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}

return self.serialDisposable;
}

调用了self.sourceSignalsubscribe,参数为之前 RACSubject 类型的属性_signal(信号订阅了信号):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 对subscriber做了变换
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
// 回调s2的^didSubscribe
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

return disposable;
}

subscribe:内对_signal做了变换,调了self.didSubscribe(subscriber)回调了s2didSubscribe

1
2
3
4
5
6
7
8
9
RACSignal *s2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// 此 subscriber 为 RACSubject 类型
[subscriber sendNext:@"Hello from s2~"];
[subscriber sendCompleted];

return [RACDisposable disposableWithBlock:^{
NSLog(@"++s2 Disposed~");
}];
}];

didSubscribe内随即通过sendNext:向订阅者(即_signal)发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}

for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}

_signalsendNext:方法遍历之前保存在subscribers数组中的订阅者,向其一一发送数据,三个订阅者依次收到回调~

小结:多播主要是在内部产生RACSubject类型的热信号_signal,进而将多个订阅者保存在其数组中并最终一一向其发送数据。

六.结束语

以上就是关于RAC中信号订阅者的分析和简单使用。实践中可以结合RAC其他功能一起使用,后续文章中会继续研究~


相关参考:

#©RAC-Github


RAC-信号与订阅者
https://davidlii.cn/2021/05/25/RAC-signal.html
作者
Davidli
发布于
2021年5月25日
许可协议