早上好,
我确定这是一个简单的问题,但是我对Google Cloud和Python 3还是陌生的,所以请保持谦虚。
我希望使用pyodbc从SQL Server提取数据,这很简单。我现在想将该信息发布到PubSub主题,这是我面临的一些问题。
对于某些背景知识,这将是4部分过程的第二部分。最终,所需的工作流程将如下所示:
SQL Server-> Google Cloud PubSub->数据流->大查询
我利用了通过
获得的Google文档代码Publisher.pygit clone https://github.com/GooglecloudPlatform/python-docs-samples.git
最初看起来是这样的:
def publish_messages(project_id,topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
# [START pubsub_publish]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id,topic_name)
for n in range(1,10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message,the client returns a future.
future = publisher.publish(topic_path,data=data)
print(future.result())
print('Published messages.')
# [END pubsub_quickstart_publisher]
# [END pubsub_publish]
随后,我创建了许多修改后的版本,这些修改后的版本都无法执行我希望实现的任务。最新的迭代是:
import pyodbc
import os
import sys
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "H:\\venv\\SQLBDtoBigQuery\\key.json"
sqlSelect = "SELECT * FROM EMP"
connectionString = f"DRIVER={{SQL Server}};server={server};database={database};uid={username};pwd={password}"
conn = pyodbc.connect(connectionString)
cursor = conn.cursor()
cursor = conn.cursor().execute(sqlSelect)
columns = [column[0] for column in cursor.description]
print(columns)
results = []
for row in cursor.fetchall():
results.append(dict(zip(columns,row)))
def publish_messages(project_id,topic_name,inbound_data):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
# [START pubsub_publish]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id,topic_name)
for row in inbound_data:
# Data must be a bytestring
str_data=row
data = str_data.encode('utf-8')
# When you publish a message,attribute=data)
print('Published messages.')
随后,我开发了一种方法,该方法可以成功写入我的pubsub主题,但是记录并未写入到Big Query表中。另一个问题是,假设所有数据库列实际上都是数字,字符串和日期时间的混合,则假定它们都是字符串。以下是我现在拥有的代码。
import pyodbc
import os
import sys
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "H:\\venv\\SQLBDtoBigQuery\\key.json"
sqlSelect = "SELECT top 2 EMP_SK FROM EMP"
connectionString = f"DRIVER={{SQL Server}};server={server};database={database};uid={username};pwd={password}"
def GetRowsFromDb(connectionString,sqlSelect):
ret = []
columns = []
with pyodbc.connect(connectionString) as conn:
with conn.cursor() as cursor:
cursor.execute(sqlSelect)
columns = [column[0] for column in cursor.description]
ret = list(cursor.fetchall())
return ret,columns
def publish_messages(project_id,inbound_data,columns):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
# [START pubsub_publish]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id,topic_name)
namesToValues = dict()
for row in inbound_data:
for i in range(len(row)):
namesToValues[columns[i]] = str(row[i])
#namesToValues = dict(zip(columns,row))
# Data must be a bytestring
str_data="Something"
data = str_data.encode('utf-8')
# When you publish a message,data=data,**namesToValues)
print('Published messages.')
dbRows,columns = GetRowsFromDb(connectionString,sqlSelect)
publish_messages('lucky-trail-260923','aspectdb-0001',dbRows,columns)
任何有关此问题的帮助将不胜感激。
热烈问候Scott