RACStream

3

RACStream 继承自 NSObject,是 RACSignalRACSequence 的父类,定义了一些流的操作方法。从名字上可以看出来,这个对象就像流一样可以往任何一个出口流,同时也可以给这个流设计一道道关卡,改变流(既包括流的内容,也包括融合,也包括流的筛选等)。

 

一、.h 文件
@class RACStream;

/// A block which accepts a value from a RACStream and returns a new instance of the same stream class.
///
/// Setting `stop` to `YES` will cause the bind to terminate after the returned value. Returning `nil` will result in immediate termination.
/**
 * @param  value  表示接收到信号的原始值,还未作处理
 * @param  *stop  用于控制绑定 block,如果 *stop = YES,那么就会结束绑定
 *
 * @return  信号。做好处理后,再通过这个信号返回出去,一般使用 RACReturnSignal
 */
typedef RACStream * (^RACStreamBindBlock)(id value, BOOL *stop);

接受来自 RACStream 的值并返回 RACStream 类的新实例的 block。将 stop 设置为 YES 将导致绑定在返回值之后终止。返回 nil 将立即终止。

/// An abstract class representing any stream of values.
///
/// This class represents a monad, upon which many stream-based operations can be built.
///
/// When subclassing RACStream, only the methods in the main @interface body need to be overridden.
@interface RACStream : NSObject

表示任何数据流的抽象类。

这个类表示一个 monad,在这个 monad 上可以构建许多基于流的操作。当继承 RACStream 时,只需要重写这个主要类的方法。

/// This extension contains functionality to support naming streams for debugging.
///
/// Subclasses do not need to override the methods here.
@interface RACStream ()

此扩展包含支持调试的对流进行命名的功能。子类不需要重写方法。

/// The name of the stream. This is for debugging/human purposes only.
@property (copy) NSString *name;

流的名称,用于调试目的。

/// Operations built on the RACStream primitives.
///
/// These methods do not need to be overridden, although subclasses may occasionally gain better performance from doing so.
@interface RACStream (Operations)

基于 RACStream 构建的操作。这些方法不需要被重写,尽管有时子类可以通过重写获得更好的性能。

/// Zips streams using +zip:, then reduces the resulting tuples into a single value using -reduceEach:
///
/// streams     - The streams to combine. These must all be instances of the
///               same concrete class implementing the protocol. If this
///               collection is empty, the returned stream will be empty.
/// reduceBlock - The block which reduces the values from all the streams
///               into one value. It must take as many arguments as the
///               number of streams given. Each argument will be an object
///               argument. The return value must be an object. This argument
///               must not be nil.
///
/// Example:
///
///   [RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {
///       return [NSString stringWithFormat:@"%@: %@", string, number];
///   }];
///
/// Returns a new stream containing the results from each invocation of
/// `reduceBlock`.
+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock;

使用 zip: 方法打包多个流,然后使用 -reduceEach: 将生成的元组 tuples 缩减为单个值。

/// Returns a stream obtained by concatenating `streams` in order.
+ (instancetype)concat:(id<NSFastEnumeration>)streams;

返回按顺序连接 streams 而获得的流对象。

/// Combines values in the receiver from left to right using the given block.
///
/// The algorithm proceeds as follows:
///
///  1. `startingValue` is passed into the block as the `running` value, and the
///  first element of the receiver is passed into the block as the `next` value.
///  2. The result of the invocation is added to the returned stream.
///  3. The result of the invocation (`running`) and the next element of the
///  receiver (`next`) is passed into `block`.
///  4. Steps 2 and 3 are repeated until all values have been processed.
///
/// startingValue - The value to be combined with the first element of the
///                 receiver. This value may be `nil`.
/// reduceBlock   - The block that describes how to combine values of the
///                 receiver. If the receiver is empty, this block will never be
///                 invoked. Cannot be nil.
///
/// Examples
///
///      RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
///
///      // Contains 1, 3, 6, 10
///      RACSequence *sums = [numbers scanWithStart:@0 reduce:^(NSNumber *sum, NSNumber *next) {
///          return @(sum.integerValue + next.integerValue);
///      }];
///
/// Returns a new stream that consists of each application of `reduceBlock`. If the receiver is empty, an empty stream is returned.
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock;

bind 基础上封装的改变方法,用同样的 block 执行每次流中的值,并将结果用于后一次执行当中,每次都把 block 执行后的值变成新的流中的对象。

/// Takes values until the given block returns `YES`.
///
/// Returns a stream of the initial values in the receiver that fail `predicate`.
/// If `predicate` never returns `YES`, a stream equivalent to the receiver is returned.
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate;

bind 基础上封装的改变方法,取当前流的对象值,直到当前值满足提供的 block,就会将当前流变为空(不是空流)。

/// Takes values until the given block returns `NO`.
///
/// Returns a stream of the initial values in the receiver that pass `predicate`.
/// If `predicate` never returns `NO`, a stream equivalent to the receiver is returned.
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate;

bind 基础上封装的改变方法,取当前流的对象值,直到当前值不满足提供的 block,就会将当前流变为空(不是空流)。

/// Skips values until the given block returns `YES`.
///
/// Returns a stream containing the values of the receiver that follow any initial values failing `predicate`. If `predicate` never returns `YES`, an empty stream is returned.
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate;

bind 基础上封装的改变方法,忽略当前流的对象值(变为空流),直到当前值满足提供的 block

/// Skips values until the given block returns `NO`.
///
/// Returns a stream containing the values of the receiver that follow any initial values passing `predicate`. If `predicate` never returns `NO`, an empty stream is returned.
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate;

bind 基础上封装的改变方法,忽略当前流的对象值(变为空流),直到当前值不满足提供的 block

/// Returns a stream of values for which -isEqual: returns NO when compared to the previous value.
- (instancetype)distinctUntilChanged;

bind 基础上封装的改变方法,当流中后一次的值和前一次的值不同的时候,才会返回当前值的流,否则返回空流(第一次默认被忽略)。

1、empty
/*
  Returns an empty stream.
 */
+ (instancetype)empty
{
    return nil;
}

返回一个空的流对象。由子类定义行为。

 

2、bind:
/* Lazily binds a block to the values in the receiver.

  This should only be used if you need to terminate the bind early, or close over some state. -flattenMap: is more appropriate for all other cases.

  block - A block returning a RACStreamBindBlock. This block will be invoked
          each time the bound stream is re-evaluated. This block must not be nil or return nil.

  Returns a new stream which represents the combined result of all lazy applications of `block`.
 */
- (instancetype)bind:(RACStreamBindBlock (^)(void))block
{
    return nil;
}

只改变当前流对象的方法。

bind 函数的作用:

  1.  会订阅原始的信号。
  2.  任何时刻原始信号发送一个值,都会绑定的 block 转换一次。
  3.  一旦绑定的 block 转换了值变成信号,就立即订阅,并把值发给订阅者 subscriber
  4.  一旦绑定的 block 要终止绑定,原始的信号就 complete
  5.  当所有的信号都 complete,发送 completed 信号给订阅者 subscriber
  6.  如果中途信号出现了任何 error,都要把这个错误发送给 subscriber

 

3、return:
/* Lifts `value` into the stream monad.

  Returns a stream containing only the given value.
 */
+ (instancetype)return:(id)value
{
    return nil;
}

返回一个仅包含 value 数据的流对象。由子类定义行为。

 

4、concat:
/* Appends the values of `stream` to the values in the receiver.

   stream - A stream to concatenate. This must be an instance of the same concrete class as the receiver, and should not be `nil`.

   Returns a new stream representing the receiver followed by `stream`.
 */
- (instancetype)concat:(RACStream *)stream
{
    return nil;
}

在当前响应流已经完成后,紧接着注入新的响应流。抽象方法未实现,由子类定义行为。

 

5、zipWith:
/* Zips the values in the receiver with those of the given stream to create RACTuples.

   The first value of each stream will be combined, then the second value, and so forth, until at least one of the streams is exhausted.

   stream - The stream to zip with. This must be an instance of the same concrete class as the receiver, and should not be `nil`.

   Returns a new stream of RACTuples, representing the zipped values of the two streams.
 */
- (instancetype)zipWith:(RACStream *)stream
{
   return nil;
}

将不同的流进行打包合成一个流。抽象方法未实现,由子类定义行为。

 

6、setNameWithFormat:
/* Sets the name of the receiver to the given format string. 

   This is for debugging purposes only, and won't do anything unless the RAC_DEBUG_SIGNAL_NAMES environment variable is set.
  
   Returns the receiver, for easy method chaining.
 */
- (instancetype)setNameWithFormat:(NSString *)format, ... 
{
    if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;

    NSCParameterAssert(format != nil);
 
    va_list args;
    va_start(args, format);

    NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
    va_end(args);

    self.name = str;
    return self;
}

将流的名称设置为给定的格式字符串,这仅用于调试目的,除非设置了 rac_debug_signal_names 环境变量,否则不会执行任何操作。返回流对象用于方法链的调用。

这里值得学习的是可变参数的使用,va_list 是在 C 语言中解决变参问题的一组宏。

va_list 的用法:

①、首先在函数里定义一个 va_list 型的变量,这个变量是指向参数的指针;

②、然后用 va_start 宏初始化变量刚定义的 va_list 变量,va_start 的第 2 个参数是可变参数的前一个参数;

③、最后用 va_end 宏结束可变参数的获取。

 

7、flattenMap:

要分析这个方法,先看看一个例子。

    // 1. 创建信号
    RACSignal * signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
       
        [subscriber sendNext:@"111"];
        
        return nil;
    }];
    
    // 2. 调用 flattenMap 
    RACSignal * flattenSignal = [signal flattenMap:^RACStream *(id value) {
        
        value = [NSString stringWithFormat:@"Flatten Map %@", value];
        return [RACReturnSignal return:value];
    }]; 

    // 3. 订阅信号。flattenSignal 为 RACDynamicSignal 类型
    [flattenSignal subscribeNext:^(id x) {
        NSLog(@"%@", x);
    }];


Flatten Map 111

从输出 Flatten Map 111 可以看到,订阅的 block 里输出的是 flattenMap: 处理过的数据。

/*
 Maps `block` across the values in the receiver and flattens the result.

 Note that operators applied _after_ -flattenMap: behave differently from operators _within_ -flattenMap:. See the Examples section below.

 This corresponds to the `SelectMany` method in Rx.

 block - A block which accepts the values in the receiver and returns a new instance of the receiver's class.          Returning `nil` from this block is equivalent to returning an empty signal.

 Examples

   [signal flattenMap:^(id x) {
       // Logs each time a returned signal completes.
       return [[RACSignal return:x] logCompleted];
   }];

   [[signal flattenMap:^(id x) {
        return [RACSignal return:x];
       }]
       // Logs only once, when all of the signals complete.
       logCompleted];

 Returns a new stream which represents the combined streams resulting from mapping `block`.
*/
- (instancetype)flattenMap:(RACStream * (^)(id value))block
{
    Class class = self.class;
    return [[self bind:^{
        return ^(id value, BOOL *stop) {

            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"%@", stream);

	    return stream;
	};
    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

从方法的源码可以看到 flattenMap 是在 bind 基础上封装的。

flattenMap 调用 bind 方法传入 blockRACSignalbind 方法内部会执行 block 的返回值 RACStreamBindBlock,它也是一个 block,最后会调用最内层代码。内层代码将 value 数据传给 flattenMap 参数的 block,这里进行了数据的处理并返回一个 RACStream 对象(通常是 RACReturnSignal 对象),RACStream 向上传递,在 bindblock 里被包装成 RACDynamicSignal 类(被添加到了 signals 数组中)。

4

总结:flattenMap 根据前一个信号的参数创建一个新的信号

 

8、flatten
/* Flattens a stream of streams.

   This corresponds to the `Merge` method in Rx.

   Returns a stream consisting of the combined streams obtained from the receiver.
*/
- (instancetype)flatten 
{
    __weak RACStream *stream __attribute__((unused)) = self; 

    return [[self flattenMap:^(id value) { 
         return value; 
    }] setNameWithFormat:@"[%@] -flatten", self.name]; 
}

flattenMap 基础上封装的改变方法,就是函数式编程中的聚合 Merge

要了解这个方法需要结合 RACSignal 中的 merge: 方法。

- (RACSignal *)merge:(RACSignal *)signal
{
    // 调用类方法,传入信号数组
    return [[RACSignal merge:@[ self, signal ]] setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
}

+ (RACSignal *)merge:(id<NSFastEnumeration>)signals
{
    NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];

    // 数组持有多个信号
    for (RACSignal *signal in signals) {
        [copiedSignals addObject:signal];
    }

    return [[[RACSignal createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {

        // 订阅者发送内容,内容为信号 self 和 signal
        for (RACSignal *signal in copiedSignals) {
	     [subscriber sendNext:signal];
	}

        [subscriber sendCompleted];
	return nil;
    }] 
    flatten] setNameWithFormat:@"+merge: %@", copiedSignals];
}

过程图:

5

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];
    RACSignal *signalB = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalB"];
        return nil;
    }];
    RACSignal * mergeSignal = [signalA merge:signalB];
    
    [mergeSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

signalA
signalB

当执行 merge 的时候创建了 CD 两个内部信号,这里的 mergeSignal 实际为 D 信号。当订阅 D 信号后,执行上图右侧的流程,一直会执行到 Amerge 方法里 createSignal:block,循环发送 signal 信号,最终的结果是执行各个信号的 sendNext: 方法。 

merge 看起来就是把多个信号(signalAsignalB)合并成一个信号 mergeSignal,只要其中任何一个信号发送数据,合并的信号 mergeSignal 都能接收到事件。

ReactiveCocoa 源码阅读之攻略 flatten

 

9、map:
/* Maps `block` across the values in the receiver. 

   This corresponds to the `Select` method in Rx. 

   Returns a new stream with the mapped values.
 */
- (instancetype)map:(id (^)(id value))block 
{
    NSCParameterAssert(block != nil);

    Class class = self.class;
	
    return [[self flattenMap:^(id value) {
	return [class return:block(value)];
    }] setNameWithFormat:@"[%@] -map:", self.name];
}

flattenMap 基础上封装的改变方法,就是函数式编程中的 Select。在 flattenMap 中的 block 中返回的值必须也是流对象,而 map 则不需要,它是将流中的对象执行 block 后,用流的 return 方法将值变成流对象。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];

    RACSignal *mergeSignal = [signalA map:^id(id value) {
        return @"aaa";
    }];

    [mergeSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

aaa

要理解 map 需要结合 return: 方法,return: 方法在 RACReturnSignal 类中实现。

+ (RACSignal *)return:(id)value
{
    RACReturnSignal *signal = [[self alloc] init];
    signal->_value = value;

    return signal;
}

删除了在 map 流程中执行不到的代码。由上可以看到:return 方法是保存了 value 值,然后返回一个 RACReturnSignal 对象。所以 map 方法可以理解成:

- (instancetype)map:(id (^)(id value))block
{
    return [self flattenMap:^(id value) {
	return [[RACReturnSignal alloc] init];  // 新建 return 信号,这里已经执行了 block(value),获取到了 block 的返回值
    }];
}

经由 flattenMap 向外返回的是 RACReturnSignal 类型对象。当 RACReturnSignal 对象被订阅时:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
{
    NSCParameterAssert(subscriber != nil);

    return [RACScheduler.subscriptionScheduler schedule:^{
	[subscriber sendNext:self.value];
	[subscriber sendCompleted];
    }];
}

将属性 value 向外传递,所以这样导致会替换原来信号发送的内容。

flattenMapmap 的理解:

map 方法根据原信号创建了一个新的信号,并且变换了信号的输出值,这两个信号具有明显的先后顺序关系。而 flattenMap 方法,直接生成了一个新的信号,这两个信号并没有先后顺序关系,属于同层次的平行关系。

FlatternMap中的 Block 返回信号;Map中的 Block 返回对象。
开发中,如果信号发出的值不是信号,映射一般使用 Map;如果信号发出的值是信号,映射一般使用 FlatternMap

ReactiveCocoa框架菜鸟入门(五)--信号的FlattenMapMapFlatternMapMap的区别

 

10、mapReplace:
/* Replaces each value in the receiver with the given object. 

   Returns a new stream which includes the given object once for each value in the receiver.
 */
- (instancetype)mapReplace:(id)object 
{
    return [[self map:^(id _) {
	return object;
    }] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
}

map 的基础上封装的改变方法,直接替换当前流中的对象,形成一个新的对象流。

    RACSignal * signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];

    RACSignal * mergeSignal = [signalA mapReplace:@"aaa"];
    
    [mergeSignal subscribeNext:^(id x) {
        NSLog(@"%@", x);
    }];

aaa

map: 的功能一样,替换流输出的内容,mapReplace: 采用直接传值的方式,map: 采用 block

 

11、combinePreviousWithStart:reduce:
/* Combines each previous and current value into one object.

   This method is similar to -scanWithStart:reduce:, but only ever operates on the previous and current values (instead of the whole stream), and does not pass the return value of `reduceBlock` into the next invocation of it. 
 
   start - The value passed into `reduceBlock` as `previous` for the first value.
   reduceBlock - The block that combines the previous value and the current value to create the reduced value. Cannot be nil.

   Examples 

      RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;

      // Contains 1, 3, 5, 7 
      RACSequence *sums = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) { 
          return @(previous.integerValue + next.integerValue);
      }];

   Returns a new stream consisting of the return values from each application of `reduceBlock`.
*/
- (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock 
{
    NSCParameterAssert(reduceBlock != NULL);
	
    return [[[self scanWithStart:RACTuplePack(start)
	                  reduce:^(RACTuple *previousTuple, id next) {
			
                id value = reduceBlock(previousTuple[0], next);
	 	return RACTuplePack(next, value);
	   }]
           map:^(RACTuple *tuple) {
		return tuple[1];
	   }]
	   setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];
}

通过官方的示例,数组的内容由 @[1, 2, 3, 4] 变成了 @[1, 3, 5, 7],可以猜测 combinePreviousWithStart:reduce: 方法是将旧数组的两个相邻的值通过 reduce:^{} 处理产生新的值。

6

RACSequencebind:passingThroughValuesFromSequence: 调用 bind: 传入的 block。实际上 block 的传递流程是:

7

最终会调用到用户处理的 

RACSequence *sums = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
                    
     return @(previous.integerValue + next.integerValue);      
}];

第一次时 previous = @0,就是第一个参数。在这里有对 start 进行 RACTuplePack 包装。

传入的数组为 @[ 1, 2, 3, 4 ]start = 2,那么调用顺序是:1 + 2(start) = 32 + 1(pre) = 33 + 2(pre) = 54 + 3(pre) = 7。归纳:该方法传入 start 数值,可以理解索引为 -1,然后相邻的数据相加。 

 

12、filter:
/* Filters out values in the receiver that don't pass the given test.

   This corresponds to the `Where` method in Rx. 

   Returns a new stream with only those values that passed.
 */
- (instancetype)filter:(BOOL (^)(id value))block
{
    NSCParameterAssert(block != nil);

    Class class = self.class;
	
    return [[self flattenMap:^ id (id value) {
	if (block(value)) {
	    return [class return:value];
	}
        else {
	    return class.empty;
	}
    }] setNameWithFormat:@"[%@] -filter:", self.name];
}

flattenMap: 基础上封装的改变方法,过滤掉当前流中不符合要求的对象,将之变为空流。

block(value) 返回 YES 时,使用 RACReturnSignal 对象保存 value 值,订阅时打印出来;如果返回 NO,则返回空流,订阅时也就打印不出来东西。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber 
{
    NSCParameterAssert(subscriber != nil);

    return [RACScheduler.subscriptionScheduler schedule:^{
	[subscriber sendCompleted];
    }];
}

RACEmptySignal 直接调用 sendCompleted 方法,不会 sendNext

 

13、ignore:
/* Filters out values in the receiver that equal (via -isEqual:) the provided value. 

   value - The value can be `nil`, in which case it ignores `nil` values. 

   Returns a new stream containing only the values which did not compare equal to `value`.
 */
- (instancetype)ignore:(id)value 
{
    return [[self filter:^ BOOL (id innerValue) {
	return innerValue != value && ![innerValue isEqual:value];
    }] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
}

filter 基础封装的改变方法,忽略和当前值一样的对象,将之变为空流。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];

    RACSignal * ignoreSignal = [signalA ignore:@"signalB"];  // 如果是 ignore:@"signalA" 则不会输出
    
    [ignoreSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

signalA

如果 sendNext: 的内容与 ignore: 的内容相同,filterblock 返回 NO,否则返回 YES。即达到了忽略值的功能。

 

14、reduceEach:
/* Unpacks each RACTuple in the receiver and maps the values to a new value. 

   reduceBlock - The block which reduces each RACTuple's values into one value. It must take as many arguments as the number of tuple elements to process. Each argument will be an object argument. The return value must be an object. This argument cannot be nil. 

   Returns a new stream of reduced tuple values.
 */
- (instancetype)reduceEach:(id (^)())reduceBlock 
{
    NSCParameterAssert(reduceBlock != nil);
 
    __weak RACStream *stream __attribute__((unused)) = self;

    return [[self map:^(RACTuple *t) {
	NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
	return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
    }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

解包接收器中的每个 RACTuple,并将所有的值映射为一个新值。

    RACSignal * signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        RACTuple * tuple = RACTuplePack(@"signalA");
        [subscriber sendNext:tuple];  // 传递的数据需要是 RACTuple 类型,否则会触发断言
        return nil;
    }];

    RACSignal * reduceEachSignal = [signalA reduceEach:^id{
        return @"aaa";
    }];
    
    [reduceEachSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

aaa

 

15、startWith:
/* 
   Returns a stream consisting of `value`, followed by the values in the receiver. 
 */
- (instancetype)startWith:(id)value 
{
    return [[[self.class return:value]
	concat:self]
	setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
}

contact 基础上封装的多流之间的顺序方法,在当前流的值流出之前,加入一个初始值。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        RACTuple * tuple = RACTuplePack(@"signalA");
        [subscriber sendNext:tuple];
        return nil;
    }];

    RACSignal * startWithSignal = [signalA startWith:@"1"];
    
    [startWithSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

1
<RACTuple: 0x6000017eaf30> ( signalA )

在输出 tuple(signalA) 之前先输出 1,这段逻辑在 RACSignalconcat: 方法中。

- (RACSignal *)concat:(RACSignal *)signal {

        RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
	    [subscriber sendNext:x];  // 输出 1
	} error:^(NSError *error) {
	    [subscriber sendError:error];
	} completed:^{
	    RACDisposable *concattedDisposable = [signal subscribe:subscriber];  // 输出 tuple(signalA)
            serialDisposable.disposable = concattedDisposable;
	}];
}

 

16、skip:
/* Skips the first `skipCount` values in the receiver. 

   Returns the receiver after skipping the first `skipCount` values. If `skipCount` is greater than the number of values in the stream, an empty stream is returned.
 */
- (instancetype)skip:(NSUInteger)skipCount 
{
    Class class = self.class;
	
    return [[self bind:^{
	__block NSUInteger skipped = 0;

	return ^(id value, BOOL *stop) {
	    if (skipped >= skipCount) return [class return:value];

	    skipped++;
	    return class.empty;
	};
    }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

bind 基础上封装的改变方法,忽略当前流前 n 次的对象值,将之变为空流,不输出。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        [subscriber sendNext:@"signalB"];
        [subscriber sendNext:@"signalC"];
        [subscriber sendNext:@"signalD"];

        return nil;
    }];

    RACSignal * skipSignal = [signalA skip:2];
    
    [skipSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

signalC
signalD

跳过了第一、二次的输出。

 

17、take:
/*
   Returns a stream of the first `count` values in the receiver. If `count` is greater than or equal to the number of values in the stream, a stream equivalent to the receiver is returned. 
 */
- (instancetype)take:(NSUInteger)count 
{	
    Class class = self.class;
	
    if (count == 0) return class.empty;

    return [[self bind:^{
	__block NSUInteger taken = 0;

	return ^ id (id value, BOOL *stop) {
	    if (taken < count) {
		++taken;
		if (taken == count) *stop = YES;
	            return [class return:value];
		}
                else {
		    return nil;
		}
	    };
	}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

bind 基础上封装的改变方法,只取当前流中的前 n 次对象值,之后将流变为空(不是空流)。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        [subscriber sendNext:@"signalB"];
        [subscriber sendNext:@"signalC"];
        [subscriber sendNext:@"signalD"];

        return nil;
    }];

    RACSignal *mergeSignal = [signalA take:2];
    
    [mergeSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

signalA
signalB

skip: 相对,只取第一、二条数据。

 

18、join:block:
/* Combines a list of streams using the logic of the given block.

   streams - The streams to combine. 
   block - An operator that combines two streams and returns a new one. The returned stream should contain 2-tuples of the streams' combined values. 

   Returns a combined stream.
 */
+ (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block 
{
    RACStream *current = nil;

    // Creates streams of successively larger tuples by combining the input streams one-by-one.
    for (RACStream *stream in streams) {
	// For the first stream, just wrap its values in a RACTuple. That way, if only one stream is given, the result is still a stream of tuples.
	if (current == nil) {
	    current = [stream map:^(id x) {
		return RACTuplePack(x);  // 生成 RACReturnSignal 对象,value = RACTuplePack(x)
	    }];

	    continue;
	}

	current = block(current, stream);
    }

    if (current == nil) return [self empty];  // 如果为 nil,返回空流

    return [current map:^(RACTuple *xs) {
	
        /* Right now, each value is contained in its own tuple, sorta like:
	
	   (((1), 2), 3)
	
	   We need to unwrap all the layers and create a tuple out of the result.
         */
	NSMutableArray *values = [[NSMutableArray alloc] init];

	while (xs != nil) {
	    [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
	    xs = (xs.count > 1 ? xs.first : nil);
	}

	return [RACTuple tupleWithObjectsFromArray:values];
    }];
}

使用给定 block 的逻辑组合流列表。

  1. for 循环部分:如果信号数组里只有 1 个信号,那么通过 map,最终获取的结果被放进一个 tuple 里,比如 [value1];如果数组里信号多于 1 个,那么第一个信号和第二信号就要做 zipWith(此时 block 就是 zipWith)操作,参考上面zipWith,得到的返回结果是 [[value1], value2];如果还有第 3 个信号,那么将信号 1 和信号 2 zipWith 的结果与信号 3 继续 zipWith,得到的结果就是 [[[value1],value2], value3]
  2. return 部分:for 循环得到的信号最后 sendNext 的值是一个 [[[value1], value2], value3] 之类的 tuple,如同注释里的“(((1), 2), 3)”一样,需要转换成 [value1, value2, value3]

 

19、zip:
/* Zips the values in the given streams to create RACTuples. 


   The first value of each stream will be combined, then the second value, and so forth, until at least one of the streams is exhausted. 

   streams - The streams to combine. These must all be instances of the same concrete class implementing the protocol. If this collection is empty, the returned stream will be empty. 

   Returns a new stream containing RACTuples of the zipped values from the streams.
 */
+ (instancetype)zip:(id<NSFastEnumeration>)streams 
{
    return [[self join:streams block:^(RACStream *left, RACStream *right) {
	return [left zipWith:right];
    }] setNameWithFormat:@"+zip: %@", streams];
}

打包多流,将多个流中的值包装成一个 RACTuple 对象。

    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalA"];
        return nil;
    }];
    
    RACSignal *signalB = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalB"];
        return nil;
    }];
    
    RACSignal *signalC = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"signalC"];
        return nil;
    }];
    
    RACSequence * sequence = @[signalA, signalB, signalC].rac_sequence;

    RACSignal * zipSignal = [RACSignal zip:sequence];
    
    [zipSignal subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];

<RACTuple: 0x600002ae00d0> (
    signalA,
    signalB,
    signalC
)

 

 

 

 

3. RACStream子类策略

RACStream是RACSignal和RACSequence的父类,但是,RACSignal和RACSequence都自己实现了一套bind,zipWith和contât方法,所以在不同的子类中,RACStream中定义的各种操作对应到各种子类,就会有不同的涵义。 
---------------------
作者:chenyin10011991
来源:CSDN
原文:https://blog.csdn.net/chenyin10011991/article/details/51971388
版权声明:本文为博主原创文章,转载请附上博文链接!

You may also like...