任何人都可以帮助解决以下问题吗?
形式问题
在给定类型为A
的数据帧的情况下,如何处理Spark作业,需要输出类型为B
的数据帧,并保持基数。地图功能不是简单的ai => bi
,而是(ai,bi-1) => bi
。起始值b0
是已知的,没有问题。
实际案例
我有一个在交易所交易订单的数据框,列出了插入新订单,删除订单,更新价格和数量等的人员。在这种情况下,可以将订单簿(更多信息here)定义为当时每个活动订单的有序集合。这样的集合显然由与时间t
相关的新变化(一些新订单进来,一些旧订单出去)以及所有先前的变化组成。
在上面的术语中,A
是一个订单事件,B
是在给定其先前状态的情况下从该事件产生的书。
我目前拥有的东西 大约一年前,我遇到了这个问题,并且用一个令人讨厌但又有效的技巧解决了这个问题。这是我拥有的代码的简化版本(实际上,根据书和行ID,我会输出一些统计信息):
val orderEvents: RDD[OrderEvent] = dfSorted.rdd.repartition(1)
val orderBookacc = new Lobaccumulator(new ConsistentLob(dateValue(sparkSession)))
sparkSession.sparkContext.register(orderBookacc,s"lastLob_$strippedID")
orderEvents.zipWithIndex().map { case (event,_) =>
orderBookacc.add(event)
orderBookacc.value
}
之所以可行,是因为我利用累加器并强制逻辑按顺序执行,而无需通过将所有数据拟合到一个分区来合并部分结果。
最近的限制 上面的解决方案除了非常笨拙之外,还无法扩展。到目前为止,这并不是一个问题,该作业的数据大约为1万行,但是最近我们有了支持新数据流的要求,这些数据流可以在单个数据帧中最多容纳4000万行。在这种情况下,单分区模型似乎是不可接受的,特别是因为群集无法为我们每个节点提供这么多的内存(在编写本文时,每个进程3g)
技术规格,我认为它不是很相关,但是为了清楚起见,我正在Cloudera Spark集群下工作,应用程序提交了驱动程序模式(Windows驱动程序)。 Spark版本是2.4.4,我正在使用Scala 2.11.2