Profile Pyspark结构化流应用程序

我想测量执行时间并从应用程序本身分析Pyspark结构化流。例如,我想使用以下命令执行Python脚本

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import time   

if __name__ == "__main__":

    sc = SparkSession.builder.master('spark://localhost:7077').getOrCreate()
    df = sc.readStream.schema(pq_schema).parquet('../data/parquet')
    df.createOrReplaceTempView("vw_table")
    exec_query = sc.sql("""
            select sum(field_1),count(field_2),field_3 from vw_table group by field_3
    """)
    result_q = df_st.writeStream.outputMode("complete").format("console").start()
    result_q.awaitTermination()  

然后,我将文件复制到文件夹并检查每个文件时间。是否可以通过Python脚本来做到这一点?

qqxin000 回答:Profile Pyspark结构化流应用程序

出于测试目的,您可以减少每个触发器的文件数量:

df = sc.readStream.schema(pq_schema)\
     .option("maxFilesPerTrigger",1)\
     .parquet('../data/parquet')

然后在Spark UI中找到每个批次的工期。 或者,您可以在每个浴池中添加一个时间戳记(用于本地测试)。不建议使用此代码,它并不总是能正常工作。这是一个快速的技巧。

from datetime import datetime

startTime= datetime.now()
batchId = 1

def batch_duration(df,epoch_id):
        global startTime,batchId
        print("proccess time for batch {} = {}".format(batchId,datetime.now() - startTime))
        startTime= datetime.now()
        batchId+=1

result_q = df_st.writeStream\
                      .outputMode("complete")\
                      .foreachBatch(batch_duration)\
                      .start()\
                      .awaitTermination()
本文链接:https://www.f2er.com/3135187.html

大家都在问