有些表具有重复的行。我正在尝试减少重复项并保留最新的my_date
(如果有的话)
具有相同my_date
的行,无论使用哪一个都行
val dataFrame = readCsv()
.dropDuplicates("my_id","my_date")
.withColumn("my_date_int",$"my_date".cast("bigint"))
import org.apache.spark.sql.functions.{min,max,grouping}
val aggregated = dataFrame
.groupBy(dataFrame("my_id").alias("g_my_id"))
.agg(max(dataFrame("my_date_int")).alias("g_my_date_int"))
val output = dataFrame.join(aggregated,dataFrame("my_id") === aggregated("g_my_id") && dataFrame("my_date_int") === aggregated("g_my_date_int"))
.drop("g_my_id","g_my_date_int")
但是在这段代码之后,我抢到distinct my_id
时得到的钱比源表中少3000
。可能是什么原因?
如何调试这种情况?