因此,我决定不使用DataFlow和Apache Beam,因为我的文件没有那么大。因此,刚在crontab中安排了工作。这是我创建的要处理的类:
import os
import shutil
import pandas as pd
import uuid
from google.cloud import storage
class DataTransformation:
"""A helper class which contains the logic to translate a csv into a
format BigQuery will accept.
"""
def __init__(self,schema,bucket_name,credentials):
""" Here we read the input schema and which file will be transformed. This is used to specify the types
of data to create a pandas dataframe.
"""
self.schema = schema
self.files = []
self.blob_files = []
self.client = storage.Client(credentials=credentials,project="stewardship")
self.bucket = self.client.get_bucket(bucket_name)
def parse_method(self,csv_file):
"""This method translates csv_file in a pandas dataframe which can be loaded into BigQuery.
Args:
csv_file: some.csv
Returns:
A pandas dataframe.
"""
df = pd.read_csv(csv_file,skiprows=[1],sep=self.schema.get('sep'),decimal=self.schema.get('decimal'),thousands=self.schema.get('thousands')
)
df.columns = self.schema.get('fields')
for col in self.schema.get('numeric_fields'):
df[col] = pd.to_numeric(df[col])
shutil.move(csv_file,"./temp/processed/{0}".format(
os.path.splitext(os.path.basename(csv_file))[0])
)
return df
def process_files(self):
"""This method process all files and concat to a unique dataframe
Returns:
A pandas dataframe contained.
"""
frames = []
for file in self.files:
frames.append(self.parse_method(file))
if frames:
return pd.concat(frames)
else:
return pd.DataFrame([],columns=['a'])
def download_blob(self):
"""Downloads a blob from the bucket."""
for blob_file in self.bucket.list_blobs(prefix="input"):
if self.schema.get("file_name") in blob_file.name:
unique_filename = "{0}_{1}".format(self.schema.get("file_name"),str(uuid.uuid4()))
destination_file = os.path.join("./temp/input",unique_filename + ".csv")
blob_file.download_to_filename(
destination_file
)
self.files.append(destination_file)
self.blob_files.append(blob_file)
return True if len(self.blob_files) > 0 else False
def upload_blob(self,destination_blob_name):
"""Uploads a file to the bucket."""
blob = self.bucket.blob(destination_blob_name)
blob.upload_from_filename(os.path.splitext(os.path.basename(destination_blob_name))[0] +
os.path.splitext(os.path.basename(destination_blob_name))[1])
def move_processed_files(self):
"""Move processed files to processed folder"""
for blob_file in self.blob_files:
self.bucket.rename_blob(blob_file,"processed/" + blob_file.name)
return [b.name for b in self.blob_files]
因此,主要我使用了pandas_gbq处理所有内容:
import logging
from google.oauth2 import service_account
from pandas.tests.io.test_gbq import pandas_gbq
from src.data_transformation import DataTransformation
from src.schemas import schema_files_csv
KEY_JSON = 'KEY.json'
PROJECT_ID = "<PROJECT_NAME>"
SUFFIX_TABLE_NAME = "<TABLE_SUFFIX>"
BUCKET_NAME = "BUCKET_NAME"
def run():
credentials = service_account. \
Credentials. \
from_service_account_file(KEY_JSON,)
# DataTransformation is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
for table,schema in schema_files_csv.items():
try:
logging.info("Processing schema for {}".format(schema.get("file_name")))
data_ingestion = DataTransformation(schema,BUCKET_NAME,credentials)
if not data_ingestion.download_blob():
logging.info(" 0 files to process")
continue
logging.info("Downloaded files: {}".format(",".join(data_ingestion.files) or "0 files"))
frame = data_ingestion.process_files()
logging.info("Dataframe created with some {} lines".format(str(frame.shape)))
if not frame.empty:
pandas_gbq.context.project,pandas_gbq.context.credentials = (PROJECT_ID,credentials)
pandas_gbq.to_gbq(frame,table.replace(SUFFIX_TABLE_NAME,""),if_exists="replace"
)
logging.info("Table {} was loaded on Big Query".format(table.replace(SUFFIX_TABLE_NAME,"")))
blob_files = data_ingestion.move_processed_files()
logging.info("Moving files {} to processed folder".format(",".join(blob_files)))
data_ingestion.upload_blob("info.log")
except ValueError as err:
logging.error("csv schema expected are wrong,please ask to Andre Araujo update the schema. "
"Error: {}".format(err.__str__()))
if __name__ == "__main__":
logging.basicConfig(filename='info.log',level=logging.INFO)
run()
要像这样在dict / JSON中使用模式,请执行以下操作:
{
"<PROJECT>.<DATASET>.<TABLE_NAME>": {
"file_name": "<NAME_OF_FILE>","fields": [
"Project","Assignment_Name","Request_Id","Resource_Grade","Resource_Role","Record_ID","Assignment_ID","Resource_Request_Account_Id",],"numeric_fields": [],"sep": ";","decimal": ".","thousands": ","
},.... other schema
}
本文链接:https://www.f2er.com/2533660.html