我正在使用kryo序列化程序,并且声明了一个扩展Serializable的类,它具有一些属性和方法:
JOINs
我以这种方式创建GROUP BYs
:
class MySerializable extends Serializable {
var arrStr: Array[String] = _
// .... more attributes
def getarrName(index: Byte): String = arrStr(index)
// .... more methods
}
然后,问题是当我尝试使用以下代码时,出现空指针异常:
SparkSession
但是在 val conf = new SparkConf()
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MySerializable]))
val spark = SparkSession
.builder
.master(masterName)
.appName(name)
.config(conf)
.config("spark.ui.showConsoleProgress","false")
.config("spark.default.parallelism",numPart)
.config("spark.hadoop.validateOutputSpecs","false") //If set to true,validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants.
.getOrCreate
调用之外,val myDF = oldDF.map {
case Row(index: Double) => {
Row(concat(delimiter + objectMySerializable.getarrName(index.toByte)))
}
}(schema)
效果很好。因此,我做了以下更改解决了我的问题:
map
它有效!但是我想了解为什么我不能使用第一种方法,并且如果我理解错误的序列化概念,因为我认为这是一种以有效方式跨所有节点发送数据的最佳方法,但是我尝试这样做没有用。