如何使用RxJava2从无限Stream <Integer>创建Observable <Integer>?

我正在尝试将RxJava从1升级到2。在我的旧代码中,我有如下方法:

private Observable<Integer> reversRange(int from,int to) {
    Stream<Integer> intStream = Stream.iterate(to,p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

但是现在在RxJava 2中,我不能使用from。这等价于什么代码? 我在this question中发现它是fromIterable,但我不知道如何在Stream中使用它。

或者其他示例,这不仅应该用于范围,而且应该用于任何无限流。

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0,p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}
qpqp78952 回答:如何使用RxJava2从无限Stream <Integer>创建Observable <Integer>?

如果您只有Stream中的Integer,则只需执行以下操作:

Observable.fromIterable(IntStream.rangeClosed(from,to)
            .boxed()
            .collect(Collectors.toList()));  

rangedClosed的参数包括在内。

您可以使用另一种通用方法,该方法与您尝试使用的方法更接近:

Observable.fromIterable(Stream.iterate(from,integer -> integer + 1)
                .filter(integer -> integer < (to+1))
                .limit((long(to+1)-long(from))
                .collect(Collectors.toList()));  

EDIT1

如果要无限流。 Java Stream的generateiterate都产生无限流。在我的示例中,使用iterate(您可以使用generate将其替换为Supplier,并在其中创建自定义对象创建代码),并摆脱所有limit这样的终端操作符。
然后,如果需要RxJava2支持背压,则将它们包装到ObservableFlowable中,然后包装到Observable中。

像这样:

Observable.just(Stream.generate(() -> // Object creation logic here));  

Observable.just(Flowable.just(Stream.generate(() -> // Object creation logic here)));  

请记住,如果执行此操作,则代码将无限期地创建对象,并且程序将一直运行直到内存用尽。
我想您有某种正在向您发送数据的服务,您需要进行一些转换并将该数据作为流发送到其他地方。我建议您以Future的形式获取数据,然后将其包装到Flowable中,然后将数据流式传输到要发送的位置。
喜欢:

Flowable.fromFuture(senderService.getDataAsCompletableFuture);  

然后指定背压策略。

EDIT2
您可以使用Observable.generate()来完成。
喜欢:

Observable.generate(() -> from,(value,emitter) -> {
        emitter.onNext(value);
        return value + 1; 
    });
,

使用generate()函数:

这是kotlin代码(扩展功能),但是您只需要稍微更改lambda。这适用于任何流。

fun <T> Stream<T>.toFlowable(): Flowable<T> {
  return Flowable.generate(
    Callable { iterator() },BiConsumer { ite,emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

如果愿意,您也可以使用Observable,但我不知道为什么。

fun <T> Stream<T>.toObservable(): Observable<T> {
  return Observable.generate(
    Callable { iterator() },emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

我认为在Java中将类似于:

public <T> Observable<T> streamToObservable(Stream<T> stream) {
  return Observable.generate(
    () -> stream.iterator(),(ite,emitter) -> {
      if (ite.hasNext()) {
        emitter.onNext(ite.next());
      } else {
        emitter.onComplete();
      }
    }
  );
}

,因此您的代码将变为:

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0,p -> p + 1);
    return streamToObservable(intStream);
}
,

如何使用 RxJava2 从无限流创建Observable?

赞:

    public static <T> Observable<T> streamToObservable(Stream<T> stream) {
        return Observable.fromIterable(stream::iterator);
    }

用法:



    public static void main(String[] args) {
        //Some infinite stream
        Stream<Integer> stream = Stream.iterate(1,(i)->i+1);

        //create an observable from the stream
        Observable<Integer> obs = streamToObservable(stream);
        
        //subscribe to the observable
        obs.subscribe((v) -> {
            System.out.println("Value from observable:"+v);
        });
        
    }

输出:

可观察值1的值
可观察2的值
可观察到的3的价值
可观察4的值
.........

This answer解释了如何将Stream转换为Iterable


RxJava3

//Some infinite stream
Stream<Integer> stream = Stream.iterate(1,(i)->i+1);

Observable<Integer> obs = Observable.fromStream(stream);
本文链接:https://www.f2er.com/3150168.html

大家都在问