我想问您-专家-有关使用大型数据集对使用PySpark(当然是用Python编写)的功能进行单元测试的最佳方法。
理想世界:
理想情况下,应使测试的输入和输出/预期数据与测试非常接近。几乎在每个教程和博客文章中都对它进行了描述,例如here
def test_amount_spent_udf(spark_session: SparkSession) -> None:
input_df = spark_session.createDataFrame([
Row(customer_name="Geoffrey",date="2016-04-22",category="Foo",product_name="Bar",quantity=1,price=2.00),])
expected_result = 2.00
result = amount_spent_udf(data=input_df)
assert result.collect()[0].amount_spent == expected_result
在这种情况下,添加例如更多行用于检查不同情况。每个人都很高兴,如果需要,单元测试非常容易维护和扩展。
真实世界:
当金额基于多于quantity
或price
的列时,就会出现问题。
当我们遇到这样的情况时,情况如何:
def test_amount_spent_udf(spark_session: SparkSession) -> None:
input_df = spark_session.createDataFrame([
Row(
customer_name="Geoffrey",price=2.00,used=True,method="online",shipping_price=4.00,global_discount=0.05,personal_discount=0.15,),])
expected_result = 5.60
result = amount_spent_udf(data=input_df)
assert result.collect()[0].amount_spent == expected_result
如您所见,结果有所不同,因为必须使用global_discount
,personal_discount
甚至是shipping_price
之类的其他字段来计算金额。
在这里,添加新行比上一个示例难一些。尤其是由于每行的符号数限制(120)和必须以某种方式显示的更多列。
问题:
在我们的项目中,我们遇到的情况甚至更加复杂。我们具有将包含30列的DataFrame转换为具有45列的DataFrame的功能,或基于其他15个元素计算一个值的功能(仅是示例)。
为这些功能准备测试数据不是问题,但是当我们想要将所有数据放入测试(甚至放在单独的模块中)时,这是一个非常丑陋且费时的代码。
暂时简化数据的临时解决方案是将它们放入元组列表:
INPUT_DATA = [
("Geoffrey","2016-04-22","Foo","Bar",1,2.00,True,"online",4.00,0.05,0.15),]
当数据顺序与模式列的顺序相同时,可以以这种方式保留数据,但是当列数等于30?时,此解决方案是不够的
将数据放入例如CSV文件也是不可能的,因为不可能将这些文件导入Databricks平台并在其中运行测试。
注意:在Databricks平台上运行单元测试对我们很重要,因为这是实际代码在其中运行的环境。
我想征询您的意见,也许有人遇到了同样的问题,并以我在此未描述甚至我不认为的方式解决了这个问题