ReactiveCocoa 中 RACScheduler 是如何封装GCD的

前端之家收集整理的这篇文章主要介绍了ReactiveCocoa 中 RACScheduler 是如何封装GCD的前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。


原文地址:http://www.jianshu.com/p/980ffdf3ed8c



前言

在使用ReactiveCocoa 过程中,Josh AbernathyJustin Spahr-Summers两位大神为了能让RAC的使用者更畅快的在沉浸在FRP的世界里,更好的进行并发编程,于是就对GCD进行了一次封装,并与RAC的各大组件进行了完美的整合。

自从有了RACScheduler以后,使整个RAC并发编程的代码里面更加和谐统一,更加顺手,更加“ReactiveCocoa”。

目录

  • 1.RACScheduler是如何封装GCD的
  • 2.RACSequence的一些子类
  • 3.RACScheduler是如何“取消”并发任务的
  • 4.RACScheduler是如何和RAC其他组件进行完美整合的

一. RACScheduler是如何封装GCD的

RACScheduler在ReactiveCocoa中到底是干嘛的呢?处于什么地位呢?官方给出的定义如下:

  1. Schedulers are used to control when and where work is performed

RACScheduler在ReactiveCocoa中是用来控制一个任务,何时何地被执行。它主要是用来解决ReactiveCocoa中并发编程的问题的。

RACScheduler的实质是对GCD的封装,底层就是GCD实现的。

要分析RACScheduler,先来回顾一下GCD。


众所周知,在GCD中,Dispatch Queue主要分为2类,Serial Dispatch Queue 和 Concurrent Dispatch Queue 。其中Serial Dispatch Queue是等待现在执行中处理结束的队列,Concurrent Dispatch Queue是不等待现在执行中处理结束的队列。

生成Dispatch Queue的方法也有2种,第一种方式是通过GCD的API生成Dispatch Queue。

生成Serial Dispatch Queue

  1. dispatch_queue_t serialDispatchQueue = dispatch_queue_create("com.gcd.SerialDispatchQueue",DISPATCH_QUEUE_SERIAL);

生成Concurrent Dispatch Queue

  1. dispatch_queue_t concurrentDispatchQueue = dispatch_queue_create("com.gcd.ConcurrentDispatchQueue",DISPATCH_QUEUE_CONCURRENT);

第二种方法是直接获取系统提供的Dispatch Queue。系统提供的也分为2类,Main Dispatch Queue 和 Global Dispatch Queue。Main Dispatch Queue 对应着是Serial Dispatch Queue,Global Dispatch Queue 对应着是Concurrent Dispatch Queue。

Global Dispatch Queue主要分为8种。

首先是以下4种,分别是优先级对应Qos的情况。

  1. - DISPATCH_QUEUE_PRIORITY_HIGH: QOS_CLASS_USER_INITIATED
  2. - DISPATCH_QUEUE_PRIORITY_DEFAULT: QOS_CLASS_DEFAULT
  3. - DISPATCH_QUEUE_PRIORITY_LOW: QOS_CLASS_UTILITY
  4. - DISPATCH_QUEUE_PRIORITY_BACKGROUND: QOS_CLASS_BACKGROUND

其次是,是否支持 overcommit。加上上面4个优先级,所以一共8种Global Dispatch Queue。带有 overcommit 的队列表示每当有任务提交时,系统都会新开一个线程处理,这样就不会造成某个线程过载(overcommit)。

回到RACScheduler中来,RACScheduler既然是对GCD的封装,那么上述说的这些类型也都有其一一对应的封装。

  1. typedef enum : long {
  2. RACSchedulerPriorityHigh = DISPATCH_QUEUE_PRIORITY_HIGH,RACSchedulerPriorityDefault = DISPATCH_QUEUE_PRIORITY_DEFAULT,RACSchedulerPriorityLow = DISPATCH_QUEUE_PRIORITY_LOW,RACSchedulerPriorityBackground = DISPATCH_QUEUE_PRIORITY_BACKGROUND,} RACSchedulerPriority;

首先是RACScheduler中的优先级,这里只封装了4种,也是分别对应GCD中的DISPATCH_QUEUE_PRIORITY_HIGH,DISPATCH_QUEUE_PRIORITY_DEFAULT,DISPATCH_QUEUE_PRIORITY_LOW,DISPATCH_QUEUE_PRIORITY_BACKGROUND。

RACScheduler有6个类方法,都是用来生成一个queue的。

  1. + (RACScheduler *)immediateScheduler;
  2. + (RACScheduler *)mainThreadScheduler;
  3.  
  4. + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
  5. + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
  6. + (RACScheduler *)scheduler;
  7.  
  8. + (RACScheduler *)currentScheduler;

接下来依次分析一下它们的底层实现。


1. immediateScheduler

  1. + (instancetype)immediateScheduler {
  2. static dispatch_once_t onceToken;
  3. static RACScheduler *immediateScheduler;
  4. dispatch_once(&onceToken,^{
  5. immediateScheduler = [[RACImmediateScheduler alloc] init];
  6. });
  7.  
  8. return immediateScheduler;
  9. }

immediateScheduler底层实现就是生成了一个RACImmediateScheduler的单例。

RACImmediateScheduler 是继承自RACScheduler。

  1. @interface RACImmediateScheduler : RACScheduler
  2. @end

在RACScheduler中,每个种类的RACScheduler都会有一个name属性,名字也算是他们的标示。RACImmediateScheduler的name是@"com.ReactiveCocoa.RACScheduler.immediateScheduler"

RACImmediateScheduler的作用和它的名字一样,是立即执行闭包里面的任务。

  1. - (RACDisposable *)schedule:(void (^)(void))block {
  2. NSCParameterAssert(block != NULL);
  3.  
  4. block();
  5. return nil;
  6. }
  7.  
  8. - (RACDisposable *)after:(NSDate *)date schedule:(NSCParameterAssert(date != nil);
  9. NSCParameterAssert(block != NULL);
  10.  
  11. [NSThread sleepUntilDate:date];
  12. block();
  13.  
  14. return nil;
  15. }

在schedule:方法中,直接调用执行入参block( )闭包。在after: schedule:方法中,线程先睡眠,直到date的时刻,再醒过来执行入参block( )闭包。

  1. - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(NSCAssert(NO,@"+[RACScheduler immediateScheduler] does not support %@.",NSStringFromSelector(_cmd));
  2. 当然RACImmediateScheduler是不可能支持after: repeatingEvery: withLeeway: schedule:方法的。因为它的定义就是立即执行的,不应该repeat

  3. - (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
  4.     for (__block NSUInteger remaining = 1; remaining > 0; remaining--) {
  5.         recursiveBlock(^{
  6.             remaining++;
  7.         });
  8.     }
  9.      RACImmediateSchedulerscheduleRecursiveBlock:方法中只要recursiveBlock闭包存在,就会无限递归调用执行,除非recursiveBlock不存在了。

  10.  
    2. mainThreadScheduler
  11.  
    mainThreadScheduler也是一个类型是RACTargetQueueScheduler的单例。

  12. instancetype)mainThreadScheduler {
  13.     static dispatch_once_t onceToken;
  14.     static RACScheduler *mainThreadScheduler;
  15.     dispatch_once(&onceToken,^{
  16.         mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
  17.     });
  18.     return mainThreadScheduler;
  19. }
  20. mainThreadScheduler的名字是@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler"

  21. RACTargetQueueScheduler继承自RACQueueScheduler

  22. RACTargetQueueScheduler : RACQueueScheduler
  23. - (id)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue;
  24.  RACTargetQueueScheduler中,只有一个初始化方法

  25.  
    - (dispatch_queue_t)targetQueue {
  26.     NSCParameterAssert(targetQueue != NULL);
  27.     if (name == nil) {
  28.         name = [NSString stringWithFormat:@"com.ReactiveCocoa.RACTargetQueueScheduler(%s)",dispatch_queue_get_label(targetQueue)];
  29.     }
  30.     dispatch_queue_t queue = dispatch_queue_create(name.UTF8String,DISPATCH_QUEUE_SERIAL);
  31.     if (queue == NULL) return nil;
  32.     dispatch_set_target_queue(queue,targetQueue);
  33.     return [super initWithName:name queue:queue];
  34. }
  35.  

    先新建了一个queuename是@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler",类型是Serial Dispatch Queue 类型的,然后调用dispatch_set_target_queue方法

  36.  

    所以重点就在dispatch_set_target_queue方法里面了。

  37.  

    dispatch_set_target_queue方法主要有两个目的:一是设置dispatch_queue_create创建队列的优先级,二是建立队列的执行阶层。

  38.  
    • 当使用dispatch_queue_create创建队列的时候,不管是串行还是并行,它们的优先级都是DISPATCH_QUEUE_PRIORITY_DEFAULT级别,而这个API就是可以设置队列的优先级。
  39.  

    举个例子:

  40.  
    dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue",DISPATCH_QUEUE_SERIAL);
  41. dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH,0);
  42. //注意:被设置优先级的队列是第一个参数。
  43. dispatch_set_target_queue(serialQueue,globalQueue);
  44.  

    通过上面的代码,就把将serailQueue设置成DISPATCH_QUEUE_PRIORITY_HIGH

  45.  
    • 使用这个dispatch_set_target_queue方法可以设置队列执行阶层,例如dispatch_set_target_queue(queue,targetQueue);
      这样设置时,相当于将queue指派给targetQueue,如果targetQueue是串行队列,则queue是串行执行的;如果targetQueue是并行队列,那么queue是并行的。
  46.  

    dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue",DISPATCH_QUEUE_SERIAL);

  47. dispatch_queue_t queue1 = dispatch_queue_create("queue1",DISPATCH_QUEUE_SERIAL);

  48. dispatch_queue_t queue2 = dispatch_queue_create("queue2",DISPATCH_QUEUE_CONCURRENT);

  49. dispatch_set_target_queue(queue1,targetQueue);

  50. dispatch_set_target_queue(queue2,targetQueue);

  51. dispatch_async(queue1,^{

  52. NSLog(@"queue1 1");

  53. });

  54. @"queue1 2");

  55. });

  56. dispatch_async(queue2,152)">@"queue2 1");

  57. });

  58. @"queue2 2");

  59. });

  60. dispatch_async(targetQueue,152)">@"target queue");

  61. });

  62. 如果targetQueueSerial Dispatch Queue,那么输出结果必定如下:

  63. queue1 1
  64. queue1 2
  65. queue2 1
  66. queue2 2
  67. target queue
  68. 如果targetQueueConcurrent Dispatch Queue,那么输出结果可能如下:

  69. 2
  70. target queue
  71. queue2 2
  72. 回到RACTargetQueueScheduler中来,在这里传进来的入参是dispatch_get_main_queue( ),这是一个Serial Dispatch Queue,这里再调用dispatch_set_target_queue方法,相当于把queue的优先级设置的和main_queue一致。

  73. 3. scheduler
  74. 以下三个方法实质是同一个方法

  75. + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
  76. + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
  77. + (RACScheduler *)scheduler;
  78. instancetype)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
  79.     return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority,152)">0)];
  80. }
  81. + (instancetype)schedulerWithPriority:(RACSchedulerPriority)priority {
  82.     self schedulerWithPriority:priority name:@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"];
  83. }
  84. + (instancetype)scheduler {
  85.     self schedulerWithPriority:RACSchedulerPriorityDefault];
  86. }
  87. 通过源码我们能知道,scheduler这一系列的三个方法,是创建了一个 Global Dispatch Queue,对应的属于Concurrent Dispatch Queue。

  88. schedulerWithPriority: name:方法可以指定线程的优先级和名字。

  89. schedulerWithPriority:方法只能执行优先级,名字为默认的@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"。

  90. scheduler方法创建出来的queue的优先级是默认的,名字也是默认的@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"。

  91. 注意,scheduler和mainThreadScheduler,immediateScheduler这两个单例不同的是,scheduler每次都会创建一个新的Concurrent Dispatch Queue。

  92. 4. currentScheduler

  93. instancetype)currentScheduler {
  94.     RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
  95.     if (scheduler != nil) return scheduler;
  96.     if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
  97.      首先,在ReactiveCocoa 中定义了这么一个key,@"RACSchedulerCurrentSchedulerKey",这个用来从线程字典里面存取出对应的RACScheduler。

  98.  
    NSString * const RACSchedulerCurrentSchedulerKey = @"RACSchedulerCurrentSchedulerKey";
  99.  

    在currentScheduler这个方法里面看到的是从线程字典里面取出一个RACScheduler。至于什么时候存的,下面会解释到。

  100.  

    如果能从线程字典里面取出一个RACScheduler,就返回取出的RACScheduler。如果字典里面没有,再判断当前的scheduler是否是在主线程上。

  101.  
    + (BOOL)isOnMainThread {
  102.     return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
  103. }
  104.  

    判断方法如上,只要是NSOperationQueue在mainQueue上,或者NSThread是主线程,都算是在主线程上。

  105.  

    如果是在主线程上,就返回mainThreadScheduler。
    如果既不在主线程上,线程字典里面也找不到对应key值对应的value,那么就返回nil。

  106.  

    RACScheduler除了有6个类方法,还有4个实例方法

  107.  
    void))block;
  108. - (RACDisposable *)after:(void))block;
  109. - (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void))block;
  110.  

    这4个方法其实从名字上就知道是用来干嘛的。

  111.  

    schedule:是为RACScheduler添加一个任务,入参是一个闭包。

  112.  

    after: schedule:是为RACScheduler添加一个定时任务,在date时间之后才执行任务。

  113.  

    afterDelay: schedule:是为RACScheduler添加一个延时执行的任务,延时delay时间之后才执行任务。

  114.  

    after: repeatingEvery: withLeeway: schedule:是为RACScheduler添加一个定时任务,在date时间之后才开始执行,然后每隔interval秒执行一次任务。

  115.  

    这四个方法会分别在RACScheduler的各个子类里面进行重写。

  116.  

    比如之前提到的immediateScheduler,schedule:方法中会直接立即执行闭包。after: schedule:方法添加一个定时任务,在date时间之后才执行任务。after: repeatingEvery: withLeeway: schedule:这个方法在RACImmediateScheduler中就直接返回nil。

  117.  

    还有其他子类在下面会分析这4个方法的实现。

  118.  

    另外还有最后3个方法

  119.  
    - (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
  120. - (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable
  121. - (void)performAsCurrentScheduler:( 前两个方法是实现RACSequence中signalWithScheduler:方法的,具体分析见这篇文章

  122.  

    performAsCurrentScheduler:方法是在RACQueueScheduler中使用到了,在下面子类分析里面详细分析。

  123.  

    二. RACSequence的一些子类

  124.  

    RACSequence总共有以下5个子类。

  125.  
    1. RACTestScheduler
  126. 这个类主要是一个测试类,主要用在单元测试中,它是用来验证异步调用没有花费大量的时间等待。RACTestScheduler也可以用在多线程当中,当时一次只能在排队的方法队列中选择一个方法执行。

  127. RACTestSchedulerAction : NSObject
  128. @property (nonatomic,copy,0)">readonly) NSDate *date;
  129. readonly) void (^block)(void);
  130. strong,0)">readonly) RACDisposable *disposable;
  131. - (id)initWithDate:(NSDate *)date block:(void))block;
  132.  在单元测试中,ReactiveCocoa为了方便比较每个方法调用,新建了一个RACTestSchedulerAction对象,用来更加方便的比较和描述测试的全过程。RACTestSchedulerAction的定义如上。现在再来解释一下参数。

  133.  

    date是一个时间,时间主要是用来比较和决定下一次该轮到哪个闭包要开始执行了。

  134.  

    void (^block)(void)闭包是RACScheduler中的一个任务。

  135.  

    disposable是控制一个action是否可以执行的。一旦disposed了,那么这个action就不会被执行。

  136.  

    initWithDate: block: 方法是初始化一个新的action。

  137.  

    在单元测试过程中,需要调用step方法来进行查看每次调用闭包的情况。

  138.  
    void)step {
  139.     [self step:1];
  140. }
  141. - (void)stepAll {
  142.     [self step:NSUIntegerMax];
  143. }
  144.  

    step和stepAll方法都是调用step:方法。step只是执行一次RACScheduler中的任务,stepAll是执行所有的RACScheduler中的任务。既然都是调用step:,那接下来分析一下step:的具体实现。

  145.  
    void)step:(NSUInteger)ticks {
  146.     @synchronized (self) {
  147.         for (NSUInteger i = 0; i < ticks; i++) {
  148.             const void *actionPtr = NULL;
  149.             if (!CFBinaryHeapGetMinimumIfPresent(self.scheduledActions,&actionPtr)) break;
  150.             RACTestSchedulerAction *action = (__bridge id)actionPtr;
  151.             CFBinaryHeapRemoveMinimumValue(self.scheduledActions);
  152.             if (action.disposable.disposed) continue;
  153.             RACScheduler *prevIoUsScheduler = RACScheduler.currentScheduler;
  154.             NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
  155.             action.block();
  156.             if (prevIoUsScheduler != nil) {
  157.                 NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = prevIoUsScheduler;
  158.             } else {
  159.                 [NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
  160.             }
  161.         }
  162.     }
  163. }
  164.  

    step:的实现主要就是一个for循环。循环的次数就是入参ticks决定的。首先const void *actionPtr是一个指向函数的指针。在上述实现中有一个很重要的函数——CFBinaryHeapGetMinimumIfPresent。该函数的原型如下:

  165.  
    Boolean CFBinaryHeapGetMinimumIfPresent(CFBinaryHeapRef heap,0)">void **value)
  166.  

    这个函数的主要作用的是在二分堆heap中查找一个最小值。

  167.  
    CFComparisonResult RACCompareScheduledActions(void *ptr1,0)">void *ptr2,0)">void *info) {
  168.     RACTestSchedulerAction *action1 = (__bridge id)ptr1;
  169.     RACTestSchedulerAction *action2 = (__bridge id)ptr2;
  170.     return CFDateCompare((__bridge CFDateRef)action1.date,(__bridge CFDateRef)action2.date,NULL);
  171. }
  172.  

    比较规则如上,就是比较两者的date的值。从二分堆中找出这样一个最小值,对应的就是scheduler中的任务。如果最小值有几个相等最小值,就随机返回一个最小值。返回的函数放在actionPtr中。整个函数的返回值是一个BOOL值,如果二分堆不为空,能找到最小值就返回YES,如果二分堆为空,就找不到最小值了,就返回NO

  173.  

    stepAll方法里面传入了NSUIntegerMax,这个for循环也不会死循环,因为到堆中所有的任务都执行完成之后,CFBinaryHeapGetMinimumIfPresent返回NO,就会执行break,跳出循环。

  174.  

    这里会把currentScheduler保存到线程字典里面。接着会执行action.block,执行任务。

  175.  
    NSCParameterAssert(block != nil);
  176.     self) {
  177.         NSDate *uniqueDate = [NSDate dateWithTimeIntervalSinceReferenceDate:self.numberOfDirectlyScheduledBlocks];
  178.         self.numberOfDirectlyScheduledBlocks++;
  179.         RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:uniqueDate block:block];
  180.         CFBinaryHeapAddValue(void *)action);
  181.         return action.disposable;
  182.     }
  183. }
  184. - (RACDisposable *)after:(self) {
  185.         RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:date block:block];
  186.         return action.disposable;
  187.     }
  188. }
  189.  

    schedule:方法里面会累加numberOfDirectlyScheduledBlocks值,这个值也会初始化成时间,以便比较各个方法该调度的时间。numberOfDirectlyScheduledBlocks最终会代表总共有多少个block任务产生了。然后用CFBinaryHeapAddValue加入到堆中。

  190.  

    after:schedule:就是直接新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValueblock闭包加入到堆中。

  191.  

    after: repeatingEvery: withLeeway: schedule:同样也是新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValueblock闭包加入到堆中。

  192.  
    2. RACSubscriptionScheduler
  193.  

    RACSubscriptionSchedulerRACScheduler最后一个单例。RACScheduler中唯一的三个单例现在就齐全了:RACImmediateSchedulerRACTargetQueueScheduler RACSubscriptionScheduler

  194.  
    instancetype)subscriptionScheduler {
  195.     static RACScheduler *subscriptionScheduler;
  196.     return subscriptionScheduler;
  197. }
  198.  

    RACSubscriptionScheduler 的名字是@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"

  199.  
    id)init {
  200.     self = [super initWithName:@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"];
  201.     if (self == nil) return nil;
  202.     _backgroundScheduler = [RACScheduler scheduler];  
  203.     return self;
  204. }
  205.  

    RACSubscriptionScheduler初始化的时候会新建一个Global Dispatch Queue

  206.  
    NSCParameterAssert(block != NULL);
  207.     if (RACScheduler.currentScheduler == nil) self.backgroundScheduler schedule:block];
  208.     block();
  209.      如果RACScheduler.currentSchedulernil就用backgroundScheduler调用block闭包,否则就执行block闭包。

  210.  
    void))block {
  211.     RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
  212.     return [scheduler after:date schedule:block];
  213. }
  214.  
    return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
  215. }
  216.  

    两个after方法都有取出RACScheduler.currentScheduler,如果为空就用self.backgroundScheduler调用各自的after方法

  217.  

    RACSubscriptionScheduler中的backgroundScheduler的意义就在此,当RACScheduler.currentScheduler不存在的时候就会替换成self.backgroundScheduler

  218.  
    3. RACImmediateScheduler
  219.  

    这个子类在分析immediateScheduler方法的时候,详细分析过了,这里不再赘述。

  220.  
    4. RACQueueScheduler
  221.  
    NSCParameterAssert(block != NULL);
  222. RACDisposable *disposable = [[RACDisposable alloc] init];
  223. dispatch_async(self.queue,^{
  224. if (disposable.disposed) return;
  225. [self performAsCurrentScheduler:block];
  226. });
  227. return disposable;
  228. }
  229.  

    schedule:会调用performAsCurrentScheduler:方法

  230.  
    NSCParameterAssert(block != NULL);
  231.     RACScheduler *prevIoUsScheduler = RACScheduler.currentScheduler;
  232.     self;
  233.     @autoreleasepool {
  234.         block();
  235.     }
  236.     if (prevIoUsScheduler != nil) {
  237.         NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = prevIoUsScheduler;
  238.     } else {
  239.         [NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
  240.     }
  241. }
  242.  

    performAsCurrentScheduler:方法会先在调用block( )之前,把当前的scheduler存入线程字典中。

  243.  

    试想,如果现在在一个Concurrent Dispatch Queue中,在执行block( )之前需要先切换线程,切换到当前scheduler中。当执行完block闭包之后,prevIoUsScheduler如果不为nil,那么就还原现场,线程字典里面再存回原来的scheduler,反之prevIoUsSchedulernil,那么就移除掉线程字典里面的key

  244.  

    这里需要值得注意的是:

  245.  

    scheduler本质其实是一个quene,并不是一个线程。它只能保证里面的线程都是串行执行的,但是它不能保证每个线程不一定都是在同一个线程里面执行。

  246.  

    如上面这段performAsCurrentScheduler:的实现所表现的那样。所以
    scheduler使用Core Data很容易崩溃,很可能跑到子线程上面去了。一旦写数据的时候到了子线程上,很容易就Crash了。一定要记得回到main queue上。

  247.  
    NSCParameterAssert(block != NULL);
  248.     RACDisposable *disposable = [[RACDisposable alloc] init];
  249.     dispatch_after([self.class wallTimeWithDate:date],47); font-size:16px"> 在after中调用dispatch_after方法,经过date时间之后再调用performAsCurrentScheduler:。

  250.  

    wallTimeWithDate:的实现如下:

  251.  
    + (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
  252.     NSCParameterAssert(date != nil);
  253.     double seconds = 0;
  254.     double frac = modf(date.timeIntervalSince1970,&seconds);
  255.     struct timespec walltime = {
  256.         .tv_sec = (time_t)fmin(fmax(seconds,LONG_MIN),LONG_MAX),.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC,LONG_MAX)
  257.     };
  258.     return dispatch_walltime(&walltime,152)">0);
  259. }
  260.  

    dispatch_walltime函数是由POSIX中使用的struct timespec类型的时间得到dispatch_time_t类型的值。dispatch_time函数通常用于计算相对时间,而dispatch_walltime函数用于计算绝对时间。

  261.  

    这段代码其实很简单,就是把date的时间转换成一个dispatch_time_t类型的。由NSDate类对象获取能传递给dispatch_after函数dispatch_time_t类型的值。

  262.  
    NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
  263.     NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSCParameterAssert(block != NULL);
  264.     uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
  265.     uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);
  266.     dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,152)">0,0)">self.queue);
  267.     dispatch_source_set_timer(timer,[return [RACDisposable disposableWithBlock:^{
  268.         dispatch_source_cancel(timer);
  269.     }];
  270. }
  271.  

    after: repeatingEvery: withLeeway: schedule:方法里面的实现就是用GCDself.queue上创建了一个Timer,时间间隔是interval,修正时间是leeway

  272.  

    leeway这个参数是为dispatch source指定一个期望的定时器事件精度,让系统能够灵活地管理并唤醒内核。例如系统可以使用leeway值来提前或延迟触发定时器,使其更好地与其它系统事件结合。创建自己的定时器时,应该尽量指定一个leeway值。不过就算指定leeway值为0,也不能完完全全期望定时器能够按照精确的纳秒来触发事件。

  273.  

    这个定时器在interval执行入参闭包。在取消任务的时候调用dispatch_source_cancel取消定时器timer

  274.  
    5. RACTargetQueueScheduler
  275.  

    这个子类在分析mainThreadScheduler方法的时候,详细分析过了,这里不再赘述。

  276.  

    三. RACScheduler是如何“取消”并发任务的

  277.  
    既然RACScheduler是对GCD的封装,那么在GCD的上层可以实现一些GCD所无法完成的“特性”。这里的“特性”是打引号的,因为底层是GCD,上层的特性只能通过一些特殊手段来实现看似是新的特性。在这一点上,RACScheduler就实现了GCD没有的特性——“取消”任务。

  278. Operation Queues
    相对 GCD来说,使用 Operation Queues 增加一点点额外的开销,但是却换来了非常强大的灵活性和功能,它可以给 operation 之间添加依赖关系、取消一个正在执行的 operation 、暂停和恢复 operation queue 等;

  279. GCD
    是一种更轻量级的,以 FIFO的顺序执行并发任务的方式,使用 GCD时我们可以并不关心任务的调度情况,而让系统帮我们自动处理。但是 GCD的缺陷也是非常明显的,想要给任务之间添加依赖关系、取消或者暂停一个正在执行的任务时就会变得非常棘手。

  280. 既然GCD不方便取消一个任务,那么RACScheduler是怎么做到的呢?

  281. 这就体现在RACQueueScheduler上。回头看看RACQueueSchedulerschedule:实现 after: schedule:实现。

  282. 最核心的代码:

  283. return;
  284.       [self performAsCurrentScheduler:block];
  285.  });
  286. 调用performAsCurrentScheduler:之前,加了一个判断,判断当前是否取消了任务,如果取消了任务,就return,不会调用block闭包。这样就实现了取消任务的“假象”。

  287. 四. RACScheduler是如何和RAC其他组件进行完美整合的

  288. 在整个ReactiveCocoa中,利用RACScheduler实现了很多操作,和RAC是深度整合的。这里就来总结总结ReactiveCocoa中总共有哪些地方用到了RACScheduler

  289. ReactiveCocoa 中全局搜索RACScheduler,遍历完所有库,RACScheduler就用在以下10个类中。下面就来看看是如何用在这些地方的。

  290. 从下面这些地方使用了Scheduler中,我们就可以了解到哪些操作是在子线程,哪些是在主线程。区分出了这些,对于线程不安全的操作,我们就能心有成足的处理好它们,让它们回到主线程中去操作,这样就可以减少很多莫名的Crash。这些Crash都是因为线程问题导致的。

  291. 1. RACCommand
  292. id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock
  293. 这个方法十分复杂,里面用到了RACScheduler.immediateSchedulerdeliverOn:RACScheduler.mainThreadScheduler。具体的源码分析会在下一篇RACCommand源码分析里面详细分析。

  294. - (RACSignal *)execute:(id)input
  295. 在这个方法中,会调用subscribeOn:RACScheduler.mainThreadScheduler

  296. 2. RACDynamicSignal
  297. - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
  298.     NSCParameterAssert(subscriber != nil);
  299.     RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
  300.     subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
  301.     self.didSubscribe != NULL) {
  302.         RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
  303.             RACDisposable *innerDisposable = self.didSubscribe(subscriber);
  304.             [disposable addDisposable:innerDisposable];
  305.         }];
  306.         [disposable addDisposable:schedulingDisposable];
  307.     }
  308.      RACDynamicSignalsubscribe:订阅过程中会用到subscriptionScheduler。于是对这个scheduler调用schedule:就会执行下面这段代码

  309.  
    NSCParameterAssert(block != NULL);
  310.     self.backgroundScheduler schedule:block];
  311.     block();
  312.      如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault

  313.  

    同理,在RACEmptySignalRACErrorSignalRACReturnSignalRACSignal的相关的signal订阅中也都会调用subscriptionScheduler

  314.  
    3. RACBehaviorSubject
  315.  
    id<RACSubscriber>)subscriber {
  316.     RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
  317.     RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
  318.         self) {
  319.             [subscriber sendNext:self.currentValue];
  320.         }
  321.     }];
  322.     return [RACDisposable disposableWithBlock:^{
  323.         [subscriptionDisposable dispose];
  324.         [schedulingDisposable dispose];
  325.     }];
  326. }
  327.  

    RACBehaviorSubjectsubscribe:订阅过程中会用到subscriptionScheduler。于是对这个scheduler调用schedule:,代码在上面分析过了。

  328.  

    同理,如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault

  329.  
    4. RACReplaySubject
  330.  

    它的订阅也同上面信号的订阅一样,会调用subscriptionScheduler

  331.  

    由于RACReplaySubject是在子线程上,所以建议在使用Core Data这些不安全库的时候一定要记得加上deliverOn

  332.  
    5. RACSequence
  333.  

    RACSequence中,以下两个方法用到了RACScheduler

  334.  
    - (RACSignal *)signal {
  335.     return [[self signalWithScheduler:[RACScheduler scheduler]] setNameWithFormat:@"[%@] -signal",0)">self.name];
  336. }
  337.  
    - (RACSignal *)signalWithScheduler:(RACScheduler *)scheduler {
  338.     return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  339.         __block RACSequence *sequence = self;
  340.         return [scheduler scheduleRecursiveBlock:^(void (^reschedule)(void)) {
  341.             if (sequence.head == nil) {
  342.                 [subscriber sendCompleted];
  343.                 return;
  344.             }
  345.             [subscriber sendNext:sequence.head];
  346.             sequence = sequence.tail;
  347.             reschedule();
  348.         }];
  349.     }] setNameWithFormat:@"[%@] -signalWithScheduler: %@",0)">self.name,scheduler];
  350. }
  351.  

    上面两个方法调用RACScheduler中的scheduleRecursiveBlock:方法。关于这个方法的源码分析可以看RACSequence的源码分析

  352.  
    6. RACSignal+Operations
  353.  

    这里总共有9方法用到了Scheduler

  354.  

    第一个方法

  355.  
    static RACDisposable *subscribeForever (RACSignal *signal,0)">void (^next)(id),0)">void (^error)(NSError *,RACDisposable *),0)">void (^completed)(RACDisposable *))
  356.  

    在上面这个方法里面用到了

  357.  
    RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
  358.  

    取出currentScheduler或者一个Global Dispatch Queue,然后调用scheduleRecursiveBlock:。

  359.  

    第二个方法

  360.  
    - (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate
  361.  

    在上面这个方法中会调用

  362.  
    RACScheduler *scheduler = [RACScheduler scheduler];
  363. RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler
  364.  

    在delayScheduler中调用afterDelay: schedule:方法,这也是throttle:valuesPassingTest:方法实现的很重要的一步。

  365.  

    第三个方法

  366.  
    - (RACSignal *)delay:(NSTimeInterval)interval
  367.  

    由于这是一个延迟方法,肯定是会调用Scheduler的after方法

  368.  
    RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
  369.    RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
  370.  

    RACScheduler.currentScheduler ?: scheduler 这个判断在上述几个时间相关的方法都用到了。

  371.  

    所以,这里给一个建议:
    delay由于不一定会回到当前线程中,所以delay之后再去订阅可能就在子线程中去执行。所以使用delay的时候最好追加一个deliverOn。

  372.  

    第四个方法

  373.  
    - (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler
  374.  

    在这个方法中理所当然的需要调用[scheduler afterDelay:interval schedule:flushValues]这个方法,来达到延迟的目的,从而实现缓冲buffer的效果

  375.  

    第五个方法

  376.  
    + (RACSignal *)interval:( 第六个方法

  377.  
    NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway { }
  378.  

    第五个方法 和 第六个方法都用传进去的入参scheduler去调用after: repeatingEvery: withLeeway: schedule:方法

  379.  

    第七个方法

  380.  
    - (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { }
  381.  

    在这个方法中会用入参scheduler调用afterDelay: schedule:,延迟一段时候后,执行[disposable dispose],从而也实现了超时发送sendError:。

  382.  

    第八个方法

  383.  
    - (RACSignal *)deliverOn:(RACScheduler *)scheduler { }
  384.  

    第九个方法

  385.  
    - (RACSignal *)subscribeOn:(RACScheduler *)scheduler { }
  386.  

    第八个方法 和 第九个方法都是根据入参scheduler去调用schedule:方法。入参是什么类型的scheduler决定了schedule:执行在哪个queue上。

  387.  
    7. 在RACSignal中
  388.  

    在RACSignal也有积极计算和惰性求值的信号。

  389.  
    + (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(id<RACSubscriber> subscriber))block {
  390.     NSCParameterAssert(scheduler != nil);
  391.     NSCParameterAssert(block != NULL);
  392.     RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
  393.     [[signal publish] connect];
  394.     return [signal setNameWithFormat:@"+startEagerlyWithScheduler: %@ block:",47); font-size:16px"> startEagerlyWithScheduler中会调用startLazilyWithScheduler产生一个信号signal,然后紧接着转换成热信号。通过startEagerlyWithScheduler产生的信号就直接是一个热信号。

  395.  
    + (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(NSCParameterAssert(block != NULL);
  396.     RACMulticastConnection *connection = [[RACSignal
  397.                                            createSignal:^ id (id<RACSubscriber> subscriber) {
  398.                                                block(subscriber);
  399.                                                return nil;
  400.                                            }]
  401.                                           multicast:[RACReplaySubject subject]];
  402.     return [[[RACSignal
  403.               createSignal:^ id<RACSubscriber> subscriber) {
  404.                   [connection.signal subscribe:subscriber];
  405.                   [connection connect];
  406.                   return nil;
  407.               }]
  408.              subscribeOn:scheduler]
  409.             setNameWithFormat:@"+startLazilyWithScheduler: %@ block:",47); font-size:16px"> 上述是startLazilyWithScheduler:的源码实现,在这个方法中和startEagerlyWithScheduler最大的区别就出来了,connect方法在return的信号中,所以Lazily就体现在,通过startLazilyWithScheduler建立出来的信号,只有订阅它之后才能调用到connect,转变成热信号。

  410.  

    在这里调用了subscribeOn:scheduler,这里用到了scheduler。

  411.  
    8. 在NSData+RACSupport中
  412.  
    + (RACSignal *)rac_readContentsOfURL:(NSURL *)URL options:(NSDataReadingOptions)options scheduler:(RACScheduler *)scheduler {
  413.     NSCParameterAssert(scheduler != nil);
  414.     RACReplaySubject *subject = [RACReplaySubject subject];
  415.     [subject setNameWithFormat:@"+rac_readContentsOfURL: %@ options: %lu scheduler: %@",URL,(unsigned long)options,scheduler];
  416.     [scheduler schedule:^{
  417.         NSError *error = nil;
  418.         NSData *data = [[NSData alloc] initWithContentsOfURL:URL options:options error:&error];
  419.         if (data == nil) {
  420.             [subject sendError:error];
  421.         } else {
  422.             [subject sendNext:data];
  423.             [subject sendCompleted];
  424.         }
  425.     }];
  426.     return subject;
  427. }
  428.  

    在这个方法中,会传入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。那么调用schedule方法就会执行到这里:

  429.  
    return disposable;
  430. }
  431.  
    9. 在NSString+RACSupport中
  432.  
    NSURL *)URL usedEncoding:(NSStringEncoding *)encoding scheduler:(RACScheduler *)scheduler {
  433.     @"+rac_readContentsOfURL: %@ usedEncoding:scheduler: %@",210)">NSString *string = [NSString stringWithContentsOfURL:URL usedEncoding:encoding error:&error];
  434.         if (string == nil) {
  435.             [subject sendError:error];
  436.         } else {
  437.             [subject sendNext:string];
  438.             [subject sendCompleted];
  439.         }
  440.     }];
  441.      NSData+RACSupport中的rac_readContentsOfURL: options: scheduler:一样,也会传入RACQueueScheduler或者RACTargetQueueSchedulerRACScheduler

  442.  
    10. NSUserDefaults+RACSupport
  443.  
    RACScheduler *scheduler = [RACScheduler scheduler];
  444.  

    在这个方法中也会新建RACTargetQueueScheduler,一个Global Dispatch Queue。优先级是RACSchedulerPriorityDefault

  445.  

    最后

  446.  

    关于RACScheduler底层实现分析都已经分析完成。最后请大家多多指教。

  447.       
  448.       
  449.                  
    •               
    •           
    • 猜你在找的React相关文章

    •         
    •         
    •         
    •         
    •         
    •