如何在PySpark的RDD中找到每个唯一密钥的最短日期?

我有一个格式为[(ID,Date),(ID,Date)...]的元组列表,日期格式为datetime。作为RDD的示例,我正在使用:

[('1',datetime.date(2012,1,01)),('2',('3',('4',('5',('1',datetime.date(2011,datetime.date(2013,datetime.date(2015,datetime.date(2010,datetime.date(2018,01))]

我需要收集ID和与每个ID相关联的最短日期。大概这是一个reduceByKey动作,但是我无法整理出相关的功能。我猜想我只是在使事情复杂化,但是在识别适当的lambda(如果reduceByKey在这种情况下不是最有效的方法)方面,将不胜感激。

我已经搜索了StackOverflow并找到了类似的答案hereherehere,但是同样,我无法成功修改这些答案以适合我的特定情况。通常,datetime格式似乎会使事情复杂化(datetime格式本身是由于我解析xml的方式所致,因此,如果有帮助,我可以回过头将其解析为字符串)。

我尝试了以下操作,但均收到错误消息:

.reduceByKey(min)-IndexError:元组索引超出范围

reduceByKey(lambda x,y: (x,min(y)))-IndexError:元组索引超出范围(如果datetime转换为字符串,或者如果日期时间格式转换为下面的错误)

.reduceByKey(lambda x,y: (x[0],min(y)))-TypeError:“ datetime.date”对象不可下标

我希望最终结果如下:

[('1',01))]
ydp1016 回答:如何在PySpark的RDD中找到每个唯一密钥的最短日期?

我知道了。有几个问题。对于初学者,这是适用的语法。首先(当然创建了SparkSession之后),我将RDD转换为具有以下内容的数据框:

df = spark.createDataFrame(df,['col1','col2'])

然后执行groupBy和聚合功能。您会在其他SO答案中看到这些内容,但是我认为我将其发布在这里,因为它是在我的特定情况下提出的。

from pyspark.sql import functions as F
df= df.groupBy('col1').agg(F.min('col2'))

为了将数据恢复为RDD格式,我使用了

result = df.rdd.map(lambda x: (x[0],x[4]))

在这种特殊情况下,我还将数据帧的第0列和第4列的元素映射回分配给result的元组。

在此过程中,我还发现了一些有趣的内容,可能对其他人有帮助:

  1. 我的数据框中有一些我不知道的Null值 持续造成的情况,主要是“ NoneType无法下标” 错误。虽然这很合理,但我花了一些时间才弄清楚 NoneType所在的位置。
  2. 我的某些XML被错误地解析,因此它返回的是(None)的元组,而不是上面的数据格式所要求的(None,None)的元组。

这些更正使我能够.show()数据帧(而不仅仅是.printSchema().groupBy及其相关对象从来都不是问题。

本文链接:https://www.f2er.com/3114248.html

大家都在问