在pyspark数据框中遍历两列的同时向新列添加值

我有一个带有列的pyspark数据框(除了更多列): 每个月有多个ID。每个ID的活动状态由数量列确定。如果数量> 0,则活动= 1,否则为0。

+-----------------------------+---
|id|amount|  dates   | active |
+-----------------------------+---
| X|     0|2019-05-01|    0   |
| X|   120|2019-06-01|    1   |      
| Y|    60|2019-06-01|    1   |
| X|     0|2019-07-01|    0   |
| Y|     0|2019-07-01|    0   |
| Z|    50|2019-06-01|    1   |
| Y|     0|2019-07-01|    0   |
+-----------------------------+---

我要计算和添加的新列是p3mactive。 它是根据过去三个月的活跃状态计算的。 例如:对于id = x,日期= 2019-08-01,p3mactive = 1,因为X在2019-06-01中处于活动状态。 如果之前的几个月不存在,则p3m active =0。如果只有1或2个月,则p3m active可以简单地计算为max(active(month-1),active(month-2))。基本上是基于现有的列。

+-----------------------------+-----------+
|id|amount|  dates   | active | p3mactive |
+-----------------------------+-----------+
| X|     0|2019-05-01|    0   |     0     |
| X|   120|2019-06-01|    1   |     0     |      
| Y|    60|2019-06-01|    1   |     0     |
| X|     0|2019-07-01|    0   |     1     |
| Y|     0|2019-07-01|    0   |     1     |
| Z|    50|2019-06-01|    1   |     0     |
| Y|     0|2019-07-01|    0   |     1     |
+-----------------------------+-----------+

所以基本上:

  1. X的05的有效值为0,并且在此之前没有几个月,因此p3mactive为0。
  2. Y在06中变为活动状态,因此p3mactive = 07中为1,而p3mactive在06中仍为0。
  3. Z仅具有06的数据,因此06中的p3mactive为0

,依此类推。让我知道是否对流程有任何疑问。

我想在pyspark中使用更好的数据框操作和函数来实现这一点。 我通常可以轻松地想到如何使用pandas或python来执行此操作,但是我是新手,无法想到一种方法,可以在每个给定的月份中循环遍历id,然后将前三个月的活动状态选择为最大值(m1,m2,m3)函数,如果上个月不存在,则保留边缘条件。任何帮助将不胜感激。

wpz731 回答:在pyspark数据框中遍历两列的同时向新列添加值

您可以通过when函数使用lagWindow来做到这一点:

from pyspark.sql.window import Window
from pyspark.sql.functions import when,col,lag

w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive",when(
    (lag(df.active,1).over(w) == 1)| 
    (lag(df.active,2).over(w) == 1) | 
    (lag(df.active,3).over(w) == 1),1).otherwise(0))

您不能循环遍历pyspark数据帧,但是可以使用Window跨越它们。您可以使用when应用条件,也可以使用lag查看先前的行,并使用lead查看将来的行。如果x之前的行不存在,则条件的计算结果为false,您将得到一个0,如用例所述。

我希望这会有所帮助。

本文链接:https://www.f2er.com/3119523.html

大家都在问