在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。
过程:
1、使用pickle保存和读取pickle文件
- import pickle
- data = ""
- path = "xxx.plj"
- #保存为pickle
- pickle.dump(data,open(path,'wb'))
- 读取pickle
- data2 = pickle.load(open(path,1)">rb'))
使用python3读取python2保存的pickle文件时,会报错:
UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)
解决方法:
- data2 = pickle.load(open(path,1)">',encoding=latin1'))
使用python2读取python3保存的pickle文件时,会报错:
unsupported pickle protocol:3
解决方法:
- pickle
- path = xxx.plk"
- path2 = xxx2.plk
- data = pickle.load(open(path,1)">保存为python2的pickle
- pickle.dump(data,open(path2,1)">'),protocol=2)
- 读取pickle
- data2 = pickle.load(open(path2,1)">'))
2、读取pickle的内容并转为RDD
- from pyspark.sql SparkSession
- Row
- pickle
- spark = SparkSession \
- .builder \
- .appName(Python Spark sql basic example) \
- .config(spark.some.config.option",1)">some-value) \
- .getOrCreate()
- with open(picle_path,) as fp:
- data = pickle.load(fp)
- 这里可根据data的类型进行相应的操作
- 假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
- pickleRdd = spark.parallelize(data)
3、将rdd转为dataframe并存入到Hive中
- 定义列名
- column = Row(col转为dataframe
- pickleDf =pickleRdd.map(lambda x:column(x))
- 存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
- pickleDf..write.saveAsTable(hive_database.hvie_tableoverwritesql的方式
data = [ (1,1)">3145),(1,1)">41465256263281349137)]
df = spark.createDataFrame(data,[idtest_idcamera_id])
method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable(test_hive)或者:
df 转为临时表/临时视图
df.createOrReplaceTempView(df_tmp_view spark.sql 插入hive spark.sql(insert overwrite table XXXXX 表名 partition(分区名称=分区值) 多个分区按照逗号分开select
XXXXX 字段名称,跟hive字段顺序对应,不包含分区字段 from df_tmp_view""")(2)以saveAsTable的形式
"overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据 df.write.format(hive").mode(").saveAsTable(default.write_test')以下是通过rdd创建dataframe的几种方法:
(1)通过键值对
d = [{name': Aliceage': 1}] output = spark.createDataFrame(d).collect() print(output)[Row(age=1,name='Alice')]
(2)通过rdd
a = [()] rdd = sc.parallelize(a) output = spark.createDataFrame(rdd).collect()(output)
output = spark.createDataFrame(rdd,1)">]).collect() [Row(_1='Alice',_2=1)] [Row(name='Alice',age=1)](3)通过rdd和Row
Row
a = [( sc.parallelize(a) Person = Row() person = rdd.map(lambda r: Person(*r)) output = spark.createDataFrame(person).collect()
from pyspark.sql.types import *
a = [( sc.parallelize(a)
schema = StructType(
[
StructField(,StringType(),True),StructField( spark.createDataFrame(rdd,schema).collect()
df = spark.createDataFrame(rdd,1)">])
print(df) DataFrame[name: string,age: bigint]
print(type(df.toPandas())) <class 'pandas.core.frame.DataFrame'>
传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()