如何使用Spark Cassandra连接器存储结构类型

我具有以下JSON结构,其中包含员工详细信息及其地址-

[
{"id" : 1000,"name" : "dev","age" : 30,"address" : 
       {"city":"noida","state":"UP","pincode":"201201"}},{"id" : 1001,"name" : "ravi","age" : 36,"pincode":"201501"}} 
]

我在cassandra中有这张桌子-

create table sparkdb.employee (id bigint,name text,age int,city text,state text,pincode text,primary key(id));

现在我有一个问题,如何在JSON上方存储Cassandra雇员表中嵌套structType的地址。 ?

这是我已删除的代码-

 val spark = SparkSession.builder()
  .appName("CassandraconnectorIntegration")
  .master("local[*]")
  .getOrCreate()

val empDF = spark.read
  .option("multiline",true)
  .json(getclass.getResource("/sparksql/employee.json").getPath)

empDF.printSchema()

import spark.implicits._
val empDS = empDF.as[Employee]


empDS.write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Overwrite)
  .option("confirm.truncate","true") // this mode is required when using Overwrite mode
  .option("spark.cassandra.connection.host","127.0.0.1")
  .option("spark.cassandra.connection.port","9042")
  .option("keyspace","sparkdb")
  .option("table","employee")
  .save()

}

case class Address(city: String,state: String,pincode: String)
case class Employee(id: Long,name: String,age: Long,address: Address)

注意-我知道的一种方法是先选择带有别名的列,然后插入该数据框,这意味着-

empDS.createOrReplaceTempView("employee")
val empDF_out = spark.sql("select id,name,age,address.city city,address.state state,address.pincode pincode from employee")
empDF_out.write.format() .... ... .... 

但这对我来说似乎不好,这意味着如果我有那么多列,那么我必须首先单独选择它们。

zjc121 回答:如何使用Spark Cassandra连接器存储结构类型

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3161783.html

大家都在问