为了处理RDD操作上的Spark异常,我可以在附加的exceptions
列中使用以下方法:
val df: DataFrame = ...
val rddWithExcep = df.rdd.map { row: Row =>
val memberIdStr = row.getas[String]("member_id")
val memberIdInt = Try(memberIdStr.toInt) match {
case Success(integer) => List(integer,null)
case Failure(ex) => List(null,ex.toString)
}
Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}
val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int",Integertype,true),StructField("exceptions",StringType,true)))
val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep,castWithExceptionSchema)
castExcepDf.printSchema()
castExcepDf.show()
是否可以在Spark SQL上处理此类异常?例如,当前在发生任何错误的情况下,Spark SQL只会返回null
值并隐藏错误。
例如,将0除以null
值,而不是错误。
我认为-这在Spark SQL中是一个非常严重的问题,因为它可以简单地生成您根本不会注意到的意外/错误数据。
是否有可能重写此行为,并让Spark因适当的详细异常而失败?