我正在尝试执行看似简单的任务,但现在无法使其正常工作。我想使用云编写器从SQL数据库收集数据并将其保存在GCS中。我遇到权限问题
这是我的DAG:
public void execute(Runnable command) {
try {
e.execute(command);
} finally { reachabilityFence(this); }
}
我创建了一个具有以下角色的特定服务帐户: Cloud SQL管理员 作曲工作者 存储对象创建者
创建环境时,我指定了该帐户,然后将上述DAG上传到相应的存储桶中。
我收到此错误:
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
from airflow import models
import datetime
export_body = {
"exportContext": {
"kind": "sql#exportContext","fileType": "csv","uri": "gs://mybucket/export_sql.csv","csvExportOptions": {
"selectQuery": "select count(*) as number from some_table"
}
}
}
yesterday = datetime.datetime.combine(
datetime.datetime.today(),datetime.datetime.min.time())
start_date = yesterday
JOB_NAME = "job_name"
default_args = {
'start_date': start_date,}
with models.DAG(JOB_NAME,schedule_interval="@hourly",default_args=default_args) as dag:
sql_export_task = CloudSqlInstanceExportOperator(body=export_body,project_id="project_id",instance='instance',task_id='sql_export_task')
sql_export_task
我认为Storage Object Creator角色应该授予我权限。 我应该在服务帐户中添加其他角色吗?哪一个? 任何有关如何进行的建议或解决方案将不胜感激。谢谢!
编辑:我添加了存储管理员角色,这消除了此错误。 但是,尽管我的DAG似乎无法正常工作。
气流接口发送混合信号:任务没有状态:
但这是成功的吗?
我检查了我希望创建的csv文件是否丢失。
任何有关如何进行的建议或解决方案将不胜感激。谢谢!