如何在Dataframe上处理类似Spark Map的作业,其中每行输出取决于当前行和上一个输出?

任何人都可以帮助解决以下问题吗?

形式问题 在给定类型为A的数据帧的情况下,如何处理Spark作业,需要输出类型为B的数据帧,并保持基数。地图功能不是简单的ai => bi,而是(ai,bi-1) => bi。起始值b0是已知的,没有问题。

实际案例 我有一个在交易所交易订单的数据框,列出了插入新订单,删除订单,更新价格和数量等的人员。在这种情况下,可以将订单簿(更多信息here)定义为当时每个活动订单的有序集合。这样的集合显然由与时间t相关的新变化(一些新订单进来,一些旧订单出去)以及所有先前的变化组成。 在上面的术语中,A是一个订单事件,B是在给定其先前状态的情况下从该事件产生的书。

我目前拥有的东西 大约一年前,我遇到了这个问题,并且用一个令人讨厌但又有效的技巧解决了这个问题。这是我拥有的代码的简化版本(实际上,根据书和行ID,我会输出一些统计信息):

    val orderEvents: RDD[OrderEvent] = dfSorted.rdd.repartition(1)
    val orderBookacc = new Lobaccumulator(new ConsistentLob(dateValue(sparkSession)))
    sparkSession.sparkContext.register(orderBookacc,s"lastLob_$strippedID")
    orderEvents.zipWithIndex().map { case (event,_) =>
      orderBookacc.add(event)
      orderBookacc.value
    }

之所以可行,是因为我利用累加器并强制逻辑按顺序执行,而无需通过将所有数据拟合到一个分区来合并部分结果。

最近的限制 上面的解决方案除了非常笨拙之外,还无法扩展。到目前为止,这并不是一个问题,该作业的数据大约为1万行,但是最近我们有了支持新数据流的要求,这些数据流可以在单个数据帧中最多容纳4000万行。在这种情况下,单分区模型似乎是不可接受的,特别是因为群集无法为我们每个节点提供这么多的内存(在编写本文时,每个进程3g)

技术规格,我认为它不是很相关,但是为了清楚起见,我正在Cloudera Spark集群下工作,应用程序提交了驱动程序模式(Windows驱动程序)。 Spark版本是2.4.4,我正在使用Scala 2.11.2

hxhospital 回答:如何在Dataframe上处理类似Spark Map的作业,其中每行输出取决于当前行和上一个输出?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3159739.html

大家都在问