在AWS动态数据框架中添加列 步骤1 步骤2 步骤3

我对AWS Glue非常陌生。我正在做一个小项目,要问的是从S3存储桶中读取文件,将其转置并将其加载到mysql表中。 S3存储桶中的源数据如下所示:

费用,数据,分钟,名称,sms,id,类别 5,1000,200,产品1,500,p1,服务

目标表结构为 产品ID,参数,值

我希望目标表具有以下值

p1,费用5

P1,数据,1000

我能够用ID和Value加载目标表。但是我无法填充参数列。此列不在输入数据中,我想根据我要填充的列值来填充字符串。

这是我用于成本的代码。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)

## @type: DataSource
## @args: [database = "mainclouddb",table_name = "s3product",transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb",transformation_ctx = "datasource0")

## @type: ApplyMapping
## @args: [mapping = [("cost","long","value","int"),("id","string","product_id","string")],transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0,mappings = [("cost",transformation_ctx = "applymapping1")

## @type: SelectFields
## @args: [paths = ["product_id","parameter","value"],transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1,paths = ["product_id",transformation_ctx = "selectfields2")

## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG",database = "mainclouddb",table_name = "mysqlmaincloud_product_parameter_mapping",transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2,choice = "MATCH_CATALOG",transformation_ctx = "resolvechoice3")

## @type: ResolveChoice
## @args: [choice = "make_cols",transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3,choice = "make_cols",transformation_ctx = "resolvechoice4")

## @type: DataSink
## @args: [database = "mainclouddb",transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4,transformation_ctx = "datasink5")

job.commit()

有人可以帮助我将此新列添加到我的数据框中,以便使其在表格中可用吗?

谢谢

qwertyu1212121 回答:在AWS动态数据框架中添加列 步骤1 步骤2 步骤3

对于较小的datsframe,您可以执行以下操作

  1. 将动态框架转换为火花数据框架
  2. 添加一列
  3. 转换回动态框架

步骤1

    "cmake.cacheInit": null,

步骤2

datasource0 = datasource0.toDF()

步骤3

from pyspark.sql.functions import udf
getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1

datasource0 = datasource0.withColumn('New_Col_Name',getNewValues(col('some_existing_col'))

问题在于,当您拥有大型数据集时,toDF()的操作非常昂贵!

本文链接:https://www.f2er.com/3121618.html

大家都在问