pyspark读取pickle文件内容并存储到hive

前端之家收集整理的这篇文章主要介绍了pyspark读取pickle文件内容并存储到hive前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。

过程:

  • 使用pickle模块读取.plk文件

  • 将读取到的内容转为RDD;

  • 将RDD转为DataFrame之后存储到Hive仓库中;

1、使用pickle保存和读取pickle文件

  1. import pickle
  2. data = ""
  3. path = "xxx.plj"
  4. #保存为pickle
  5. pickle.dump(data,open(path,'wb'))
  6. 读取pickle
  7. data2 = pickle.load(open(path,1)">rb'))

使用python3读取python2保存的pickle文件时,会报错:

UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)

解决方法

  1. data2 = pickle.load(open(path,1)">',encoding=latin1'))

使用python2读取python3保存的pickle文件时,会报错:

unsupported pickle protocol:3

解决方法

  1. pickle
  2. path = xxx.plk"
  3. path2 = xxx2.plk
  4. data = pickle.load(open(path,1)">保存为python2pickle
  5. pickle.dump(data,open(path2,1)">'),protocol=2)
  6. 读取pickle
  7. data2 = pickle.load(open(path2,1)">'))

2、读取pickle的内容并转为RDD

  1. from pyspark.sql SparkSession
  2. Row
  3. pickle
  4. spark = SparkSession \
  5. .builder \
  6. .appName(Python Spark sql basic example) \
  7. .config(spark.some.config.option",1)">some-value) \
  8. .getOrCreate()
  9. with open(picle_path,) as fp:
  10. data = pickle.load(fp)
  11. 这里可根据data的类型进行相应的操作
  12.  
  13. 假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
  14. pickleRdd = spark.parallelize(data)

3、将rdd转为dataframe并存入到Hive中

  1. 定义列名
  2. column = Row(col转为dataframe
  3. pickleDf =pickleRdd.map(lambda x:column(x))
  4. 存储到Hive中,会新建数据库hive_database,新建表:hive_table,以覆盖的形式添加partitionBy用于指定分区字段
  5. pickleDf..write.saveAsTable(hive_database.hvie_tableoverwritesql的方式

  6. data = [
  7.     (1,1)">3145),(1,1)">41465256263281349137)
  8. ]
  9. df = spark.createDataFrame(data,[idtest_idcamera_id])
  10.  
  11.  method onedefault是默认数据库的名字,write_test 是要写到default中数据表的名字
  12. df.registerTempTable(test_hive)
  13. sqlContext.sql(create table default.write_test select * from test_hive")
  14. 或者:

  15.  df 转为临时表/临时视图
  16. df.createOrReplaceTempView(df_tmp_view spark.sql 插入hive
  17. spark.sql(insert overwrite table 
  18.                     XXXXX   表名
  19.                    partition(分区名称=分区值)    多个分区按照逗号分开
  20.                    select 
  21.                    XXXXX   字段名称,跟hive字段顺序对应,不包含分区字段
  22.                    from df_tmp_view""")
  23. 2)以saveAsTable的形式

  24.  "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
  25. #  mode("append")是在原有表的基础上进行添加数据
  26. df.write.format(hive").mode(").saveAsTable(default.write_test')
  27. 以下是通过rdd创建dataframe的几种方法

  28. (1)通过键值对

  29. d = [{name': Aliceage': 1}]
  30. output = spark.createDataFrame(d).collect()
  31. print(output)
  32.  [Row(age=1,name='Alice')]
  33. (2)通过rdd

  34. a = [()]
  35. rdd = sc.parallelize(a)
  36. output = spark.createDataFrame(rdd).collect()
  37. (output)
  38. output = spark.createDataFrame(rdd,1)">]).collect()
  39.  [Row(_1='Alice',_2=1)] [Row(name='Alice',age=1)]
  40. (3)通过rdd和Row

  41.  Row
  42. a = [( sc.parallelize(a)
  43. Person = Row()
  44. person = rdd.map(lambda r: Person(*r))
  45. output = spark.createDataFrame(person).collect()
  46. from pyspark.sql.types import *
  47. a = [( sc.parallelize(a)
  48. schema = StructType(
  49.     [
  50.         StructField(,StringType(),True),StructField( spark.createDataFrame(rdd,schema).collect()
  51. df = spark.createDataFrame(rdd,1)">])
  52. print(df)   DataFrame[name: string,age: bigint]
  53. print(type(df.toPandas()))   <class 'pandas.core.frame.DataFrame'>
  54.  传入pandas DataFrame
  55. output = spark.createDataFrame(df.toPandas()).collect()

    猜你在找的Hadoop相关文章