我目前正在研究apache spark。我当前在本地计算机上的单节点配置上运行。
作为实践,我获取一个表示事件流的JSON文件,并使用将其加载到API中
spark.read.json()
然后,要查询它们,我使用
spark.sql('some query')
我的目标是在这些事件之间找到某些联系。
现在,我知道其他名为map()
和filter()
的函数,以此类推,它们可以以不同的方式实现相同的逻辑。
这些功能是处理相同数据的简单替代方法,还是内幕之间存在更深的差异?
我目前正在研究apache spark。我当前在本地计算机上的单节点配置上运行。
作为实践,我获取一个表示事件流的JSON文件,并使用将其加载到API中
spark.read.json()
然后,要查询它们,我使用
spark.sql('some query')
我的目标是在这些事件之间找到某些联系。
现在,我知道其他名为map()
和filter()
的函数,以此类推,它们可以以不同的方式实现相同的逻辑。
这些功能是处理相同数据的简单替代方法,还是内幕之间存在更深的差异?
使用spark.sql()
和诸如DataFrame
api之类的sql(诸如.select()
,.where()
,.join()
之类的方法)在性能上基本上是相同的和优化观点。这些转换将通过催化剂优化器来确定处理基础数据集的最有效方法。同样,spark具有几个native函数,可以通过任何一种方法使用,并通过催化引擎进行优化以使其表现出色。
我个人非常倾向于避免 spark.sql()
函数,这仅仅是因为在编译时而不是在运行时捕获打字错误和其他错误变得容易得多。例如:
`spark.sql("select name from emps wehre salary > 10000")`
可以毫无问题地进行编译,但是会在运行时失败,同时:
`emps.wehre($"salary" > 10000).select("name")`
将在编译时失败,从而节省了很多麻烦。
当然,有一些功能只能作为sql表达式使用(很少,tbh),在这些情况下,您可以使用.expr()
方法。这样一来,您仍然可以查询查询子句中的最多个。
话虽如此,使用.map()
进行转换应该保留给高度专业化的任务,这些任务无法使用普通的类似sql的方法来表达。 .map()
lambda尚未通过催化剂优化,因此效果也不佳。此外,对.map()
使用DataFrame
函数要求您将每个行作为Row
对象与之交互,这可能有些笨拙,乏味且容易出现运行时错误。>
如果必须.map()
,请使用强类型的Dataset
来完成,因为使用lambda函数中的值要容易得多。转换为Dataset
就像在其上调用.as[T]
一样简单。
最后,这个问题中没有提到的中间立场是利用UDF进行自定义工作。 UDF非常适合将功能应用于行中的一个或多个值。它们也没有进行 优化,但是它们避免了.map()
带来的一些其他开销。
一般的经验法则:如果(几乎)整个行都需要输入 作为转换的输入,请使用.map()
。如果该行内只需要1-3个值,那么UDF就足够了。