我有一个数据框my_df
,其中包含4列:
+----------------+---------------+--------+---------+
| user_id| domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
| josh| wanadoo.fr| 1| 15|
| josh| random.it| 0| 12|
| samantha| wanadoo.fr| 1| 16|
| bob| eidsiva.net| 1| 5|
| bob| media.net| 0| 1|
| dylan| vodafone.it| 1| 448|
| dylan| somesite.net| 0| 20|
| dylan| yolosite.net| 0| 49|
| dylan| random.it| 0| 3|
| don| vodafone.it| 1| 39|
| don| popsugar.com| 0| 10|
| don| fabio.com| 1| 49|
+----------------+---------------+--------+---------+
这是我计划要做的-
查找所有
user_id
,其中domain
的最大频率isp_flag=0
的频率小于domain
的最大频率isp_flag=1
的25%
因此,在上面的示例中,我的output_df
看起来像-
+----------------+---------------+--------+---------+
| user_id| domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
| bob| eidsiva.net| 1| 5|
| bob| media.net| 0| 1|
| dylan| vodafone.it| 1| 448|
| dylan| yolosite.net| 0| 49|
| don| fabio.com| 1| 49|
| don| popsugar.com| 0| 10|
+----------------+---------------+--------+---------+
我认为我需要使用窗口函数来执行此操作,因此我尝试了以下操作,首先针对每个isp_flag=0
-分别找到isp_flag=1
和user_id
的最大频域-
>>> win_1 = Window().partitionBy("user_id","domain","isp_flag").orderBy((col("frequency").desc()))
>>> final_df = my_df.select("*",rank().over(win_1).alias("rank")).filter(col("rank")==1)
>>> final_df.show(5) # this just gives me the original dataframe back
我在这里做错了什么?我如何到达上面打印的最后一个output_df
?