如何使用python异步运行2个云函数?

我有3个用python编写的GCP云功能,即CF1,CF2,CF3。 CF1将检查某些条件,因此应并行执行CF2和CF3。

我尝试过

if condition is true:
    requests.get("url of CF2")
    print("CF2 executed successfully")
    requests.get("url of CF3")
    print("CF3 executed successfully")

CF1代码:

import requests

static_query = "select * from `myproject.mydataset.mytable`"
    try:

        # Executing query and loading data into temporary table.
        client = bigquery.Client()
        job_config = bigquery.QueryJobConfig()
        dest_dataset = client.dataset(temporary_dataset,temporary_project)
        dest_table = dest_dataset.table(temporary_table)
        job_config.destination = dest_table
        job_config.create_disposition = 'CREATE_IF_NEEDED'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = client.query(static_query,location=bq_location,job_config=job_config)
        query_job.result()
        table = client.get_table(dest_table)
        expiration = (datetime.now() + timedelta(minutes=expiration_time))
        table.expires = expiration
        table = client.update_table(table,["expires"])
        logging.info("Query result loaded into temporary table: {}".format(temporary_table))

        # Check row count of resultant query from temporary table.
        count_query = "select count(*) size from `{}.{}.{}`".format(temporary_project,temporary_dataset,temporary_table)
        job = client.query(count_query)
        results = job.result()
        count = 0
        for row in results:
            count = row.size

        # If row count of query result is empty log error message on stack-driver.
        if count == 0:
            logging.error("Query executed with empty result set.")

        # If row count of query result has records then trigger below two cloud functions (this should be parallel execution).
        else:
            # Trigger CF2 cloud function.
            requests.get("{}".format(cf2_endpoint))
            logging.info("CF2 executed successfully.")

            # Trigger CF3 cloud function.
            requests.get("{}".format(cf3_endpoint))
            logging.info("CF3 executed successfully.")
    except RuntimeError:
        logging.error("Exception occurred {}".format(error_log_client.report_exception()))

在这里,我要异步执行CF2和CF3。任何建议和解决方案,谢谢。

beijiao_1608 回答:如何使用python异步运行2个云函数?

执行异步调用的最佳服务是PubSub。为此,您必须:

  • 创建PubSub主题
  • 使用先前创建的主题上的PubSub事件上的触发器部署CF2
  • 使用先前创建的主题上的PubSub事件上的触发器部署CF3
  • 功能CF1创建带有或不带有参数的PubSub消息并将其发布
  • 功能CF2和CF3与PubSub中发布的消息并行触发。他们从中提取参数并进行处理

如果CF2和CF3已经存在并由HTTP调用触发,则可以在PubSub主题上设置HTTP推送订阅。

如果CF2或CF3失败,则消息将重新发送到该功能,直到有效的确认(2XX HTTP响应)或消息TTL(默认为7天)为止。

顺便说一句,您不耦合,可伸缩,并行,重试错误的CF。

,

如果您需要执行异步请求,并且您正在使用Python,则可以尝试使用aiohttpasyncio库,这里有一个示例here。 另外,您可以检查Cloud Pub/SubCloud Cloud Tasks

本文链接:https://www.f2er.com/3167633.html

大家都在问