我正在使用Monix反应流。
我有一组文件名f: Observable[File]
,我想将其转换为打开的资源in: Observable[InputStream]
。
然后我有一个不同的文件位置p: Observable[Int]
流,并且我想从位置InputStream
的{{1}}读取以获得一个p
。
现在,当我为同一文件获得多个out: Observable[Byte]
值时,我只想为每个InputStream
打开一次f
。当然,我想在新文件p
到达或取消(或错误)时正确关闭流。
我将如何实现?也许有一些使用cats-effect f
的解决方案?