我有一个PySpark数据框(例如df
),它表示具有分类和数值属性的时间序列数据。每十分钟收集一次数据。以下是数据框的外观示例:
+------+-----+-------------------+-----+
| name| type| timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00| 11|
| name1|type1|2012-01-10 00:00:10| 14|
| name1|type1|2012-01-10 00:00:20| 2|
| name1|type1|2012-01-10 00:00:30| 3|
| name1|type1|2012-01-10 00:00:40| 55|
| name1|type1|2012-01-10 00:00:50| 10|
| name5|type1|2012-01-10 00:01:00| 5|
| name2|type2|2012-01-10 00:01:10| 8|
| name5|type1|2012-01-10 00:01:20| 1|
|name10|type1|2012-01-10 00:01:30| 12|
|name11|type3|2012-01-10 00:01:40| null|
+------+-----+-------------------+-----+
对于给定的名称和类型,我想通过将分位数(例如0.8
)作为我的汇总方法来对数据进行分组并每周汇总此数据框中的数字属性。假设原始数据帧每十分钟采样一次时间值,那么理想情况下,我应该期望每周每个名称的得分为1008。但是,有时我缺少得分的数据(或null
值)。如果 f对于任何星期的任何给定名称,有效数据点的数量(score
的非空值或缺失值)小于某个数字(例如504),我想忽略该名称我的汇总数据框中给定的一周如何在PySpark中做到这一点?
以下是我目前正在做的事情。
from pyspark.sql import Window
import pyspark.sql.functions as F
agg_expr = F.expr("percentile_approx(score,0.8)")
df = df.groupBy(
"name","type",F.window("timestamp","1 week")
.getField("start")
.alias("aggregate_ts"),).agg(agg_expr.alias("score"))