我正在尝试使用lambda函数创建一个AWS EMR集群。每次测试该功能时,它都会开始构建集群,但随后会失败,并显示以下错误:
Exception in thread "main" java.lang.RuntimeException: Local file does not exist.
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.fetchFile(ScriptRunner.java:30)
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.main(ScriptRunner.java:56)
at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
我的lambda代码如下:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import boto3
import datetime
def lambda_handler(event,context):
print ('Creating EMR')
connection = boto3.client('emr',region_name='us-east-1')
print (event)
cluster_id = connection.run_job_flow(
Name='MyTest',VisibleToAllUsers=True,JobFlowRole='EMR_EC2_DefaultRole',ServiceRole='EMR_DefaultRole',LogUri='s3://some-bucket/logs',ReleaseLabel='emr-5.21.0',Applications=[{'Name': 'Hadoop'},{'Name': 'Spark'}],Instances={
'InstanceGroups': [{
'Name': 'Master nodes','Market': 'ON_DEMAND','InstanceRole': 'MASTER','InstanceType': 'm3.xlarge','InstanceCount': 1,},{
'Name': 'Slave nodes','Market': 'SPOT','InstanceRole': 'CORE','InstanceCount': 2,}],'KeepJobFlowAliveWhenNoSteps': True,'Ec2KeyName': 'kvp-name','Ec2SubnetId': 'subnet-dag17490','EmrManagedMasterSecurityGroup': 'sg-xxxxxx','EmrManagedSlaveSecurityGroup': 'sg-xxxxxx',Configurations=[{
"Classification":"spark-env","Properties":{},"Configurations":[{
"Classification":"export","Properties":{
"PYSPARK_PYTHON":"python36","PYSPARK_DRIVER_PYTHON":"python36"
}
}]
}],Steps=[{
'Name': 'mystep','actionOnFailure': 'TERMINATE_CLUSTER','HadoopJarStep': {
'Jar': 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar','Args': [
"/home/hadoop/spark/bin/spark-submit","s3://bucket-name/wordcount.py",]
}
}]
)
return 'Started cluster {}'.format(cluster_id)
如您所见,我仍在理解创建集群所涉及的步骤,因此这里可能遗漏了一些东西。我正在使用Python3。目的是让wordcount.py文件执行,但出现“找不到文件”错误。我很困惑。它指的是哪个文件?
此外,我在存储桶中的文件的PUT上具有此lambda函数触发器,但奇怪的是,当我删除文件而不是使用控制台上传时,似乎lambda函数会触发。 预先感谢。