由于权限错误,数据流无法将数据写入BigQuery

早上好,

我确定这是一个简单的问题,但是我对Google Cloud和Python 3还是陌生的,所以请保持谦虚。

我希望使用pyodbc从SQL Server提取数据,这很简单。我现在想将该信息发布到PubSub主题,这是我面临的一些问题。

对于某些背景知识,这将是4部分过程的第二部分。最终,所需的工作流程将如下所示:

SQL Server-> Google Cloud PubSub->数据流->大查询

我利用了通过

获得的Google文档代码Publisher.py
git clone https://github.com/GooglecloudPlatform/python-docs-samples.git

最初看起来是这样的:

def publish_messages(project_id,topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    # [START pubsub_quickstart_publisher]
    # [START pubsub_publish]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id,topic_name)

    for n in range(1,10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        # When you publish a message,the client returns a future.
        future = publisher.publish(topic_path,data=data)
        print(future.result())

    print('Published messages.')
    # [END pubsub_quickstart_publisher]
    # [END pubsub_publish]

随后,我创建了许多修改后的版本,这些修改后的版本都无法执行我希望实现的任务。最新的迭代是:

import pyodbc
import os
import sys

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "H:\\venv\\SQLBDtoBigQuery\\key.json"

sqlSelect = "SELECT * FROM EMP"
connectionString = f"DRIVER={{SQL Server}};server={server};database={database};uid={username};pwd={password}"

conn = pyodbc.connect(connectionString)
cursor = conn.cursor()

cursor = conn.cursor().execute(sqlSelect)
columns = [column[0] for column in cursor.description]
print(columns)

results = []

for row in cursor.fetchall():
    results.append(dict(zip(columns,row)))

def publish_messages(project_id,topic_name,inbound_data):
    """Publishes multiple messages to a Pub/Sub topic."""
    # [START pubsub_quickstart_publisher]
    # [START pubsub_publish]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id,topic_name)

    for row in inbound_data:        
        # Data must be a bytestring
        str_data=row
        data = str_data.encode('utf-8')
        # When you publish a message,attribute=data)

    print('Published messages.')

随后,我开发了一种方法,该方法可以成功写入我的pubsub主题,但是记录并未写入到Big Query表中。另一个问题是,假设所有数据库列实际上都是数字,字符串和日期时间的混合,则假定它们都是字符串。以下是我现在拥有的代码。

import pyodbc
import os
import sys

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "H:\\venv\\SQLBDtoBigQuery\\key.json"

sqlSelect = "SELECT top 2 EMP_SK FROM EMP"
connectionString = f"DRIVER={{SQL Server}};server={server};database={database};uid={username};pwd={password}"


def GetRowsFromDb(connectionString,sqlSelect):
   ret = []
   columns = []
   with pyodbc.connect(connectionString) as conn:
       with conn.cursor() as cursor:           
           cursor.execute(sqlSelect)
           columns = [column[0] for column in cursor.description]
           ret = list(cursor.fetchall())
   return ret,columns


def publish_messages(project_id,inbound_data,columns):
    """Publishes multiple messages to a Pub/Sub topic."""
    # [START pubsub_quickstart_publisher]
    # [START pubsub_publish]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id,topic_name)

    namesToValues = dict()
    for row in inbound_data:
        for i in range(len(row)):
            namesToValues[columns[i]] = str(row[i])
        #namesToValues = dict(zip(columns,row))
        # Data must be a bytestring
        str_data="Something"
        data = str_data.encode('utf-8')
        # When you publish a message,data=data,**namesToValues)

    print('Published messages.')


dbRows,columns = GetRowsFromDb(connectionString,sqlSelect)

publish_messages('lucky-trail-260923','aspectdb-0001',dbRows,columns)

任何有关此问题的帮助将不胜感激。

热烈问候Scott

kukududushen 回答:由于权限错误,数据流无法将数据写入BigQuery

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2937749.html

大家都在问