在Databricks平台上使用大型数据集进行PySpark单元测试

我想问您-专家-有关使用大型数据集对使用PySpark(当然是用Python编写)的功能进行单元测试的最佳方法。

理想世界:

理想情况下,应使测试的输入和输出/预期数据与测试非常接近。几乎在每个教程和博客文章中都对它进行了描述,例如here

def test_amount_spent_udf(spark_session: SparkSession) -> None:
    input_df = spark_session.createDataFrame([
        Row(customer_name="Geoffrey",date="2016-04-22",category="Foo",product_name="Bar",quantity=1,price=2.00),])
    expected_result = 2.00

    result = amount_spent_udf(data=input_df)

    assert result.collect()[0].amount_spent == expected_result

在这种情况下,添加例如更多行用于检查不同情况。每个人都很高兴,如果需要,单元测试非常容易维护和扩展。

真实世界:

当金额基于多于quantityprice的列时,就会出现问题。
当我们遇到这样的情况时,情况如何:

def test_amount_spent_udf(spark_session: SparkSession) -> None:
    input_df = spark_session.createDataFrame([
        Row(
            customer_name="Geoffrey",price=2.00,used=True,method="online",shipping_price=4.00,global_discount=0.05,personal_discount=0.15,),])
    expected_result = 5.60

    result = amount_spent_udf(data=input_df)

    assert result.collect()[0].amount_spent == expected_result

如您所见,结果有所不同,因为必须使用global_discountpersonal_discount甚至是shipping_price之类的其他字段来计算金额。

在这里,添加新行比上一个示例难一些。尤其是由于每行的符号数限制(120)和必须以某种方式显示的更多列。

问题:

在我们的项目中,我们遇到的情况甚至更加复杂。我们具有将包含30列的DataFrame转换为具有45列的DataFrame的功能,或基于其他15个元素计算一个值的功能(仅是示例)。

为这些功能准备测试数据不是问题,但是当我们想要将所有数据放入测试(甚至放在单独的模块中)时,这是一个非常丑陋且费时的代码。

暂时简化数据的临时解决方案是将它们放入元组列表:

INPUT_DATA = [
    ("Geoffrey","2016-04-22","Foo","Bar",1,2.00,True,"online",4.00,0.05,0.15),]

当数据顺序与模式列的顺序相同时,可以以这种方式保留数据,但是当列数等于30?时,此解决方案是不够的

将数据放入例如CSV文件也是不可能的,因为不可能将这些文件导入Databricks平台并在其中运行测试。

注意:在Databricks平台上运行单元测试对我们很重要,因为这是实际代码在其中运行的环境。

我想征询您的意见,也许有人遇到了同样的问题,并以我在此未描述甚至我不认为的方式解决了这个问题

zchyqwerty 回答:在Databricks平台上使用大型数据集进行PySpark单元测试

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3110873.html

大家都在问