原文地址:http://www.jianshu.com/p/980ffdf3ed8c
前言
在使用ReactiveCocoa 过程中,Josh Abernathy和Justin Spahr-Summers两位大神为了能让RAC的使用者更畅快的在沉浸在FRP的世界里,更好的进行并发编程,于是就对GCD进行了一次封装,并与RAC的各大组件进行了完美的整合。
自从有了RACScheduler以后,使整个RAC并发编程的代码里面更加和谐统一,更加顺手,更加“ReactiveCocoa”。
目录
- 1.RACScheduler是如何封装GCD的
- 2.RACSequence的一些子类
- 3.RACScheduler是如何“取消”并发任务的
- 4.RACScheduler是如何和RAC其他组件进行完美整合的
一. RACScheduler是如何封装GCD的
RACScheduler在ReactiveCocoa中到底是干嘛的呢?处于什么地位呢?官方给出的定义如下:
- Schedulers are used to control when and where work is performed
RACScheduler在ReactiveCocoa中是用来控制一个任务,何时何地被执行。它主要是用来解决ReactiveCocoa中并发编程的问题的。
RACScheduler的实质是对GCD的封装,底层就是GCD实现的。
要分析RACScheduler,先来回顾一下GCD。
众所周知,在GCD中,Dispatch Queue主要分为2类,Serial Dispatch Queue 和 Concurrent Dispatch Queue 。其中Serial Dispatch Queue是等待现在执行中处理结束的队列,Concurrent Dispatch Queue是不等待现在执行中处理结束的队列。
生成Dispatch Queue的方法也有2种,第一种方式是通过GCD的API生成Dispatch Queue。
生成Serial Dispatch Queue
- dispatch_queue_t serialDispatchQueue = dispatch_queue_create("com.gcd.SerialDispatchQueue",DISPATCH_QUEUE_SERIAL);
生成Concurrent Dispatch Queue
- dispatch_queue_t concurrentDispatchQueue = dispatch_queue_create("com.gcd.ConcurrentDispatchQueue",DISPATCH_QUEUE_CONCURRENT);
第二种方法是直接获取系统提供的Dispatch Queue。系统提供的也分为2类,Main Dispatch Queue 和 Global Dispatch Queue。Main Dispatch Queue 对应着是Serial Dispatch Queue,Global Dispatch Queue 对应着是Concurrent Dispatch Queue。
Global Dispatch Queue主要分为8种。
- - DISPATCH_QUEUE_PRIORITY_HIGH: QOS_CLASS_USER_INITIATED
- - DISPATCH_QUEUE_PRIORITY_DEFAULT: QOS_CLASS_DEFAULT
- - DISPATCH_QUEUE_PRIORITY_LOW: QOS_CLASS_UTILITY
- - DISPATCH_QUEUE_PRIORITY_BACKGROUND: QOS_CLASS_BACKGROUND
其次是,是否支持 overcommit。加上上面4个优先级,所以一共8种Global Dispatch Queue。带有 overcommit 的队列表示每当有任务提交时,系统都会新开一个线程处理,这样就不会造成某个线程过载(overcommit)。
- typedef enum : long {
- RACSchedulerPriorityHigh = DISPATCH_QUEUE_PRIORITY_HIGH,RACSchedulerPriorityDefault = DISPATCH_QUEUE_PRIORITY_DEFAULT,RACSchedulerPriorityLow = DISPATCH_QUEUE_PRIORITY_LOW,RACSchedulerPriorityBackground = DISPATCH_QUEUE_PRIORITY_BACKGROUND,} RACSchedulerPriority;
首先是RACScheduler中的优先级,这里只封装了4种,也是分别对应GCD中的DISPATCH_QUEUE_PRIORITY_HIGH,DISPATCH_QUEUE_PRIORITY_DEFAULT,DISPATCH_QUEUE_PRIORITY_LOW,DISPATCH_QUEUE_PRIORITY_BACKGROUND。
RACScheduler有6个类方法,都是用来生成一个queue的。
- + (RACScheduler *)immediateScheduler;
- + (RACScheduler *)mainThreadScheduler;
-
- + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
- + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
- + (RACScheduler *)scheduler;
-
- + (RACScheduler *)currentScheduler;
接下来依次分析一下它们的底层实现。
1. immediateScheduler
- + (instancetype)immediateScheduler {
- static dispatch_once_t onceToken;
- static RACScheduler *immediateScheduler;
- dispatch_once(&onceToken,^{
- immediateScheduler = [[RACImmediateScheduler alloc] init];
- });
-
- return immediateScheduler;
- }
immediateScheduler底层实现就是生成了一个RACImmediateScheduler的单例。
RACImmediateScheduler 是继承自RACScheduler。
- @interface RACImmediateScheduler : RACScheduler
- @end
在RACScheduler中,每个种类的RACScheduler都会有一个name属性,名字也算是他们的标示。RACImmediateScheduler的name是@"com.ReactiveCocoa.RACScheduler.immediateScheduler"
RACImmediateScheduler的作用和它的名字一样,是立即执行闭包里面的任务。
- - (RACDisposable *)schedule:(void (^)(void))block {
- NSCParameterAssert(block != NULL);
-
- block();
- return nil;
- }
-
- - (RACDisposable *)after:(NSDate *)date schedule:(NSCParameterAssert(date != nil);
- NSCParameterAssert(block != NULL);
-
- [NSThread sleepUntilDate:date];
- block();
-
- return nil;
- }
在schedule:方法中,直接调用执行入参block( )闭包。在after: schedule:方法中,线程先睡眠,直到date的时刻,再醒过来执行入参block( )闭包。
- - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(NSCAssert(NO,@"+[RACScheduler immediateScheduler] does not support %@.",NSStringFromSelector(_cmd));
- 当然RACImmediateScheduler是不可能支持after: repeatingEvery: withLeeway: schedule:方法的。因为它的定义就是立即执行的,不应该repeat。
-
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
for (__block NSUInteger remaining = 1; remaining > 0; remaining--) {
recursiveBlock(^{
remaining++;
});
}
RACImmediateScheduler的scheduleRecursiveBlock:方法中只要recursiveBlock闭包存在,就会无限递归调用执行,除非recursiveBlock不存在了。
2. mainThreadScheduler
mainThreadScheduler也是一个类型是RACTargetQueueScheduler的单例。
instancetype)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken,^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});
return mainThreadScheduler;
}
mainThreadScheduler的名字是@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler"。
RACTargetQueueScheduler继承自RACQueueScheduler
RACTargetQueueScheduler : RACQueueScheduler
- (id)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue;
在RACTargetQueueScheduler中,只有一个初始化方法。
- (dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"com.ReactiveCocoa.RACTargetQueueScheduler(%s)",dispatch_queue_get_label(targetQueue)];
}
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String,DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
dispatch_set_target_queue(queue,targetQueue);
return [super initWithName:name queue:queue];
}
所以重点就在dispatch_set_target_queue方法里面了。
dispatch_set_target_queue方法主要有两个目的:一是设置dispatch_queue_create创建队列的优先级,二是建立队列的执行阶层。
- 当使用dispatch_queue_create创建队列的时候,不管是串行还是并行,它们的优先级都是DISPATCH_QUEUE_PRIORITY_DEFAULT级别,而这个API就是可以设置队列的优先级。
举个例子:
dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue",DISPATCH_QUEUE_SERIAL);
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH,0);
//注意:被设置优先级的队列是第一个参数。
dispatch_set_target_queue(serialQueue,globalQueue);
通过上面的代码,就把将serailQueue设置成DISPATCH_QUEUE_PRIORITY_HIGH。
- 使用这个dispatch_set_target_queue方法可以设置队列执行阶层,例如dispatch_set_target_queue(queue,targetQueue);
这样设置时,相当于将queue指派给targetQueue,如果targetQueue是串行队列,则queue是串行执行的;如果targetQueue是并行队列,那么queue是并行的。
dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue",DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue1 = dispatch_queue_create("queue1",DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("queue2",DISPATCH_QUEUE_CONCURRENT);
dispatch_set_target_queue(queue1,targetQueue);
dispatch_set_target_queue(queue2,targetQueue);
dispatch_async(queue1,^{
NSLog(@"queue1 1");
});
@"queue1 2");
});
dispatch_async(queue2,152)">@"queue2 1");
});
@"queue2 2");
});
dispatch_async(targetQueue,152)">@"target queue");
});
queue1 1
queue1 2
queue2 1
queue2 2
target queue
2
target queue
queue2 2
3. scheduler
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
+ (RACScheduler *)scheduler;
instancetype)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority,152)">0)];
}
+ (instancetype)schedulerWithPriority:(RACSchedulerPriority)priority {
self schedulerWithPriority:priority name:@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"];
}
+ (instancetype)scheduler {
self schedulerWithPriority:RACSchedulerPriorityDefault];
}
注意,scheduler和mainThreadScheduler,immediateScheduler这两个单例不同的是,scheduler每次都会创建一个新的Concurrent Dispatch Queue。
4. currentScheduler
instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
首先,在ReactiveCocoa 中定义了这么一个key,@"RACSchedulerCurrentSchedulerKey",这个用来从线程字典里面存取出对应的RACScheduler。
NSString * const RACSchedulerCurrentSchedulerKey = @"RACSchedulerCurrentSchedulerKey";
在currentScheduler这个方法里面看到的是从线程字典里面取出一个RACScheduler。至于什么时候存的,下面会解释到。
如果能从线程字典里面取出一个RACScheduler,就返回取出的RACScheduler。如果字典里面没有,再判断当前的scheduler是否是在主线程上。
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}
判断方法如上,只要是NSOperationQueue在mainQueue上,或者NSThread是主线程,都算是在主线程上。
如果是在主线程上,就返回mainThreadScheduler。
如果既不在主线程上,线程字典里面也找不到对应key值对应的value,那么就返回nil。
void))block;
- (RACDisposable *)after:(void))block;
- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void))block;
这4个方法其实从名字上就知道是用来干嘛的。
schedule:是为RACScheduler添加一个任务,入参是一个闭包。
after: schedule:是为RACScheduler添加一个定时任务,在date时间之后才执行任务。
afterDelay: schedule:是为RACScheduler添加一个延时执行的任务,延时delay时间之后才执行任务。
after: repeatingEvery: withLeeway: schedule:是为RACScheduler添加一个定时任务,在date时间之后才开始执行,然后每隔interval秒执行一次任务。
这四个方法会分别在RACScheduler的各个子类里面进行重写。
还有其他子类在下面会分析这4个方法的实现。
另外还有最后3个方法
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable
performAsCurrentScheduler:方法是在RACQueueScheduler中使用到了,在下面子类分析里面详细分析。
二. RACSequence的一些子类
RACSequence总共有以下5个子类。
1. RACTestScheduler
RACTestSchedulerAction : NSObject
@property (nonatomic,copy,0)">readonly) NSDate *date;
readonly) void (^block)(void);
strong,0)">readonly) RACDisposable *disposable;
- (id)initWithDate:(NSDate *)date block:(void))block;
date是一个时间,时间主要是用来比较和决定下一次该轮到哪个闭包要开始执行了。
void (^block)(void)闭包是RACScheduler中的一个任务。
disposable是控制一个action是否可以执行的。一旦disposed了,那么这个action就不会被执行。
initWithDate: block: 方法是初始化一个新的action。
void)step {
[self step:1];
}
- (void)stepAll {
[self step:NSUIntegerMax];
}
void)step:(NSUInteger)ticks {
@synchronized (self) {
for (NSUInteger i = 0; i < ticks; i++) {
const void *actionPtr = NULL;
if (!CFBinaryHeapGetMinimumIfPresent(self.scheduledActions,&actionPtr)) break;
RACTestSchedulerAction *action = (__bridge id)actionPtr;
CFBinaryHeapRemoveMinimumValue(self.scheduledActions);
if (action.disposable.disposed) continue;
RACScheduler *prevIoUsScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
action.block();
if (prevIoUsScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = prevIoUsScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
}
}
Boolean CFBinaryHeapGetMinimumIfPresent(CFBinaryHeapRef heap,0)">void **value)
这个函数的主要作用的是在二分堆heap中查找一个最小值。
CFComparisonResult RACCompareScheduledActions(void *ptr1,0)">void *ptr2,0)">void *info) {
RACTestSchedulerAction *action1 = (__bridge id)ptr1;
RACTestSchedulerAction *action2 = (__bridge id)ptr2;
return CFDateCompare((__bridge CFDateRef)action1.date,(__bridge CFDateRef)action2.date,NULL);
}
stepAll方法里面传入了NSUIntegerMax,这个for循环也不会死循环,因为到堆中所有的任务都执行完成之后,CFBinaryHeapGetMinimumIfPresent返回NO,就会执行break,跳出循环。
这里会把currentScheduler保存到线程字典里面。接着会执行action.block,执行任务。
NSCParameterAssert(block != nil);
self) {
NSDate *uniqueDate = [NSDate dateWithTimeIntervalSinceReferenceDate:self.numberOfDirectlyScheduledBlocks];
self.numberOfDirectlyScheduledBlocks++;
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:uniqueDate block:block];
CFBinaryHeapAddValue(void *)action);
return action.disposable;
}
}
- (RACDisposable *)after:(self) {
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:date block:block];
return action.disposable;
}
}
after:schedule:就是直接新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValue把block闭包加入到堆中。
after: repeatingEvery: withLeeway: schedule:同样也是新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValue把block闭包加入到堆中。
2. RACSubscriptionScheduler
RACSubscriptionScheduler是RACScheduler最后一个单例。RACScheduler中唯一的三个单例现在就齐全了:RACImmediateScheduler,RACTargetQueueScheduler ,RACSubscriptionScheduler。
instancetype)subscriptionScheduler {
static RACScheduler *subscriptionScheduler;
return subscriptionScheduler;
}
RACSubscriptionScheduler 的名字是@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"
id)init {
self = [super initWithName:@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"];
if (self == nil) return nil;
_backgroundScheduler = [RACScheduler scheduler];
return self;
}
RACSubscriptionScheduler初始化的时候会新建一个Global Dispatch Queue。
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) self.backgroundScheduler schedule:block];
block();
如果RACScheduler.currentScheduler为nil就用backgroundScheduler去调用block闭包,否则就执行block闭包。
void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date schedule:block];
}
return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
}
RACSubscriptionScheduler中的backgroundScheduler的意义就在此,当RACScheduler.currentScheduler不存在的时候就会替换成self.backgroundScheduler。
3. RACImmediateScheduler
这个子类在分析immediateScheduler方法的时候,详细分析过了,这里不再赘述。
4. RACQueueScheduler
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue,^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
NSCParameterAssert(block != NULL);
RACScheduler *prevIoUsScheduler = RACScheduler.currentScheduler;
self;
@autoreleasepool {
block();
}
if (prevIoUsScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = prevIoUsScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
这里需要值得注意的是:
scheduler本质其实是一个quene,并不是一个线程。它只能保证里面的线程都是串行执行的,但是它不能保证每个线程不一定都是在同一个线程里面执行。
如上面这段performAsCurrentScheduler:的实现所表现的那样。所以
在scheduler使用Core Data很容易崩溃,很可能跑到子线程上面去了。一旦写数据的时候到了子线程上,很容易就Crash了。一定要记得回到main queue上。
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
wallTimeWithDate:的实现如下:
+ (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
NSCParameterAssert(date != nil);
double seconds = 0;
double frac = modf(date.timeIntervalSince1970,&seconds);
struct timespec walltime = {
.tv_sec = (time_t)fmin(fmax(seconds,LONG_MIN),LONG_MAX),.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC,LONG_MAX)
};
return dispatch_walltime(&walltime,152)">0);
}
NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSCParameterAssert(block != NULL);
uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,152)">0,0)">self.queue);
dispatch_source_set_timer(timer,[return [RACDisposable disposableWithBlock:^{
dispatch_source_cancel(timer);
}];
}
after: repeatingEvery: withLeeway: schedule:方法里面的实现就是用GCD在self.queue上创建了一个Timer,时间间隔是interval,修正时间是leeway。
leeway这个参数是为dispatch source指定一个期望的定时器事件精度,让系统能够灵活地管理并唤醒内核。例如系统可以使用leeway值来提前或延迟触发定时器,使其更好地与其它系统事件结合。创建自己的定时器时,应该尽量指定一个leeway值。不过就算指定leeway值为0,也不能完完全全期望定时器能够按照精确的纳秒来触发事件。
这个定时器在interval执行入参闭包。在取消任务的时候调用dispatch_source_cancel取消定时器timer。
5. RACTargetQueueScheduler
这个子类在分析mainThreadScheduler方法的时候,详细分析过了,这里不再赘述。
三. RACScheduler是如何“取消”并发任务的
既然RACScheduler是对GCD的封装,那么在GCD的上层可以实现一些GCD所无法完成的“特性”。这里的“特性”是打引号的,因为底层是GCD,上层的特性只能通过一些特殊手段来实现看似是新的特性。在这一点上,RACScheduler就实现了GCD没有的特性——“取消”任务。
既然GCD不方便取消一个任务,那么RACScheduler是怎么做到的呢?
这就体现在RACQueueScheduler上。回头看看RACQueueScheduler的schedule:实现 和 after: schedule:实现。
return;
[self performAsCurrentScheduler:block];
});
四. RACScheduler是如何和RAC其他组件进行完美整合的
在整个ReactiveCocoa中,利用RACScheduler实现了很多操作,和RAC是深度整合的。这里就来总结总结ReactiveCocoa中总共有哪些地方用到了RACScheduler。
从下面这些地方使用了Scheduler中,我们就可以了解到哪些操作是在子线程,哪些是在主线程。区分出了这些,对于线程不安全的操作,我们就能心有成足的处理好它们,让它们回到主线程中去操作,这样就可以减少很多莫名的Crash。这些Crash都是因为线程问题导致的。
1. 在RACCommand中
id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock
- (RACSignal *)execute:(id)input
2. 在RACDynamicSignal中
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
NSCParameterAssert(block != NULL);
self.backgroundScheduler schedule:block];
block();
如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault。
3. 在RACBehaviorSubject中
id<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}
同理,如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault。
4. 在RACReplaySubject中
由于RACReplaySubject是在子线程上,所以建议在使用Core Data这些不安全库的时候一定要记得加上deliverOn。
5. 在RACSequence中
在RACSequence中,以下两个方法用到了RACScheduler:
- (RACSignal *)signal {
return [[self signalWithScheduler:[RACScheduler scheduler]] setNameWithFormat:@"[%@] -signal",0)">self.name];
}
- (RACSignal *)signalWithScheduler:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block RACSequence *sequence = self;
return [scheduler scheduleRecursiveBlock:^(void (^reschedule)(void)) {
if (sequence.head == nil) {
[subscriber sendCompleted];
return;
}
[subscriber sendNext:sequence.head];
sequence = sequence.tail;
reschedule();
}];
}] setNameWithFormat:@"[%@] -signalWithScheduler: %@",0)">self.name,scheduler];
}
上面两个方法会调用RACScheduler中的scheduleRecursiveBlock:方法。关于这个方法的源码分析可以看RACSequence的源码分析。
6. 在RACSignal+Operations中
这里总共有9个方法用到了Scheduler。
第一个方法:
static RACDisposable *subscribeForever (RACSignal *signal,0)">void (^next)(id),0)">void (^error)(NSError *,RACDisposable *),0)">void (^completed)(RACDisposable *))
在上面这个方法里面用到了
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
取出currentScheduler或者一个Global Dispatch Queue,然后调用scheduleRecursiveBlock:。
第二个方法:
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate
RACScheduler *scheduler = [RACScheduler scheduler];
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler
第三个方法:
- (RACSignal *)delay:(NSTimeInterval)interval
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
RACScheduler.currentScheduler ?: scheduler 这个判断在上述几个时间相关的方法都用到了。
所以,这里给一个建议:
delay由于不一定会回到当前线程中,所以delay之后再去订阅可能就在子线程中去执行。所以使用delay的时候最好追加一个deliverOn。
第四个方法:
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler
第五个方法:
+ (RACSignal *)interval:( 第六个方法:
NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway { }
第七个方法:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { }
第八个方法:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler { }
第九个方法:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler { }
7. 在RACSignal中
在RACSignal也有积极计算和惰性求值的信号。
+ (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
[[signal publish] connect];
return [signal setNameWithFormat:@"+startEagerlyWithScheduler: %@ block:",47); font-size:16px"> startEagerlyWithScheduler中会调用startLazilyWithScheduler产生一个信号signal,然后紧接着转换成热信号。通过startEagerlyWithScheduler产生的信号就直接是一个热信号。
+ (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(NSCParameterAssert(block != NULL);
RACMulticastConnection *connection = [[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
block(subscriber);
return nil;
}]
multicast:[RACReplaySubject subject]];
return [[[RACSignal
createSignal:^ id<RACSubscriber> subscriber) {
[connection.signal subscribe:subscriber];
[connection connect];
return nil;
}]
subscribeOn:scheduler]
在这里调用了subscribeOn:scheduler,这里用到了scheduler。
8. 在NSData+RACSupport中
+ (RACSignal *)rac_readContentsOfURL:(NSURL *)URL options:(NSDataReadingOptions)options scheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
RACReplaySubject *subject = [RACReplaySubject subject];
[subject setNameWithFormat:@"+rac_readContentsOfURL: %@ options: %lu scheduler: %@",URL,(unsigned long)options,scheduler];
[scheduler schedule:^{
NSError *error = nil;
NSData *data = [[NSData alloc] initWithContentsOfURL:URL options:options error:&error];
if (data == nil) {
[subject sendError:error];
} else {
[subject sendNext:data];
[subject sendCompleted];
}
}];
return subject;
}
return disposable;
}
9. 在NSString+RACSupport中
NSURL *)URL usedEncoding:(NSStringEncoding *)encoding scheduler:(RACScheduler *)scheduler {
@"+rac_readContentsOfURL: %@ usedEncoding:scheduler: %@",210)">NSString *string = [NSString stringWithContentsOfURL:URL usedEncoding:encoding error:&error];
if (string == nil) {
[subject sendError:error];
} else {
[subject sendNext:string];
[subject sendCompleted];
}
}];
同NSData+RACSupport中的rac_readContentsOfURL: options: scheduler:一样,也会传入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。
10. 在NSUserDefaults+RACSupport中
RACScheduler *scheduler = [RACScheduler scheduler];
在这个方法中也会新建RACTargetQueueScheduler,一个Global Dispatch Queue。优先级是RACSchedulerPriorityDefault。
最后
关于RACScheduler底层实现分析都已经分析完成。最后请大家多多指教。
-
-
猜你在找的React相关文章