我创建此文件是为了将股票价格数据上传到Mongodb。它会在csv文件中为所有股票和交易日的时间创建一堆每小时文档。然后,它使用CSV文件中的数据更新这些文档。因此,每个文件最终将在单独的文件中按小时分别包含每个股票名称的1min,5min,30min和60min数据(时间序列数据)。
代码bulk_write一次写入5000个更新。第一次更新5000需要0.7秒,而最后一次更新(1天数据约650,000个更新)需要80秒。总共需要90分钟才能加载650k更新。但是,如果我使用不同的日期数据,则第一次写入可能会花费50秒,而每次写入都会变慢。我正在使用Mongodb社区4.2。我只是在进行个别更新后才使用批量写入,但这似乎没什么区别。
谁能告诉我为什么初始写入后它变得如此缓慢,以及为什么不在目录顶部的CSV文件似乎变得如此缓慢?我检查以确保开始时是pymongo.has_c。我试过运行mongodb --nojournal。我是python和mongodb的新手。我在底部包括了一个小的输出示例。感谢您的协助
https://www.screencast.com/t/fEiR2pxpTgHG https://www.screencast.com/t/y5RSmx2hISE
from pymongo import MongoClient,Updateone
import pymongo
import csv
from datetime import datetime
import logging
import shutil
import time
def startmongo():
client = MongoClient('localhost',27017)
db = client.test
global coll
coll = db.test1
z=pymongo.has_c()
print(z)
def importmongo(filename):
with open('D:/msjho/Downloads/FP/Intraday NYSE Sept/'+ filename,'r') as file:
data = csv.reader(file,delimiter=',')
row1 =next(data) #access first row data
print(row1)
""" use the correct date format"""
try:
date = datetime.strptime(row1[1],'%d/%m/%Y %H:%M')
except ValueError:
date = datetime.strptime(row1[1],'%d-%b-%Y %H:%M')
date = date.replace(minute=0)
tickers=[]
for row in data:
if row[0] not in tickers:
tickers.append(row[0])
no_stocks =len(tickers)
""" ensure 1st and last tickers on correct have already been place store in the DB """
x = (coll.find_one({"date": date,"ticker": tickers[0]},{"ticker":1,"_id":0}))
y = (coll.find_one({"date": date,"ticker": tickers[-1]},"_id":0}))
if x == {"ticker":tickers[0]} and y == {"ticker":tickers[-1]}:
print(tickers[0])
return ('its already there')
print('Loading up initial documents')
logging. info('number of companies in ' + filename + ' '+ str(no_stocks))
no_doc = 0
for ticker in tickers:
for hours in range(9,17):
date1 = date.replace(hour=hours)
coll.insert_one({"ticker": ticker,"date":date1})
#x+=1
no_doc +=1
logging.info('number of companies inserted to Mdb ' + filename + ' '+ str(no_doc))
def updatePriceInfo(filename):
with open('D:/msjho/Downloads/FP/Intraday NYSE Sept/'+ filename,')
''' work out if its a 1min,5min,30min or 60 min file'''
if filename[-5]=='1':
filetype ='1min'
elif filename[-5] == '5':
filetype ='5min'
elif filename[-5] == '0':
if filename[-6] == '3':
filetype ='30min'
elif filename[-6] == '6':
filetype ='60min'
r = 0
start = time.time()
operations =[]
for row in data:
""" write 5000 updates at a time"""
if len(operations) == 5000:
coll.bulk_write(operations,ordered = False)
end = time.time()
r+=1
print('ops is now'+ str(r)+' '+str(end-start))
operations =[]
""" use the correct date format"""
try:
date = datetime.strptime(row[1],'%d/%m/%Y %H:%M')
except ValueError:
date = datetime.strptime(row[1],'%d-%b-%Y %H:%M')
mins = date.minute
placeset = filetype +'.'+ str(mins)
updatedate = date.replace(minute =0) #change date so can be used to find update document in update query
if row[5] > row[2]:
direction = 1 #ticker rose in time period
elif row[5]< row[2]:
direction = 0 #ticker fell in time period
else:
direction = 2 #ticker finished equal
operations.append(Updateone({"ticker":row[0],"date": updatedate},{"$set":
{placeset:
{ "p":[row[2],row[3],row[4],row[5]],"v":row[6],"d":direction
},}
},upsert = True))
#logging.info('number of rows(prices) inserted from ' + filename + ' ' + str(rw) + ' insert complete\n\n' )
if ( len(operations) > 0 ):
coll.bulk_write(operations,ordered=False)
print('finished')
if __name__ == '__main__':
#logging.basicConfig(filename="D:\msjho\Downloads\Logs\importstockdata.log",level=logging.INFO)
filename = 'NYSE_20190903_1.csv'
startmongo()
importmongo(filename)
updatePriceInfo(filename)