使用pymongo更新Mongodb的过程变得非常缓慢

我创建此文件是为了将股票价格数据上传到Mongodb。它会在csv文件中为所有股票和交易日的时间创建一堆每小时文档。然后,它使用CSV文件中的数据更新这些文档。因此,每个文件最终将在单独的文件中按小时分别包含每个股票名称的1min,5min,30min和60min数据(时间序列数据)。

代码bulk_write一次写入5000个更新。第一次更新5000需要0.7秒,而最后一次更新(1天数据约650,000个更新)需要80秒。总共需要90分钟才能加载650k更新。但是,如果我使用不同的日期数据,则第一次写入可能会花费50秒,而每次写入都会变慢。我正在使用Mongodb社区4.2。我只是在进行个别更新后才使用批量写入,但这似乎没什么区别。

谁能告诉我为什么初始写入后它变得如此缓慢,以及为什么不在目录顶部的CSV文件似乎变得如此缓慢?我检查以确保开始时是pymongo.has_c。我试过运行mongodb --nojournal。我是python和mongodb的新手。我在底部包括了一个小的输出示例。感谢您的协助

https://www.screencast.com/t/fEiR2pxpTgHG https://www.screencast.com/t/y5RSmx2hISE

from pymongo import MongoClient,Updateone
import pymongo
import csv
from datetime import datetime
import logging
import shutil
import time 



def startmongo():
    client = MongoClient('localhost',27017)
    db = client.test
    global coll
    coll = db.test1
    z=pymongo.has_c()
    print(z)

def importmongo(filename):

    with open('D:/msjho/Downloads/FP/Intraday NYSE Sept/'+ filename,'r') as file:
        data = csv.reader(file,delimiter=',')

        row1 =next(data) #access first row data
        print(row1)
        """ use the correct date format"""
        try:
            date = datetime.strptime(row1[1],'%d/%m/%Y %H:%M')
        except ValueError:
            date = datetime.strptime(row1[1],'%d-%b-%Y %H:%M')

        date = date.replace(minute=0)
        tickers=[]


        for row in data:
            if row[0] not in tickers:
                tickers.append(row[0])
        no_stocks =len(tickers)

        """ ensure 1st and last tickers on correct have already been place store in the DB """

        x = (coll.find_one({"date": date,"ticker": tickers[0]},{"ticker":1,"_id":0}))
        y = (coll.find_one({"date": date,"ticker": tickers[-1]},"_id":0}))

        if x == {"ticker":tickers[0]} and y == {"ticker":tickers[-1]}:
            print(tickers[0])
            return ('its already there')

        print('Loading up initial documents')
        logging. info('number of companies in ' + filename + ' '+ str(no_stocks))
        no_doc = 0
        for ticker in tickers:
            for hours in range(9,17):
                date1 = date.replace(hour=hours)
                coll.insert_one({"ticker": ticker,"date":date1})
                #x+=1
            no_doc +=1

        logging.info('number of companies inserted to Mdb ' + filename + ' '+ str(no_doc))

def updatePriceInfo(filename):

    with open('D:/msjho/Downloads/FP/Intraday NYSE Sept/'+ filename,')

        ''' work out if its a 1min,5min,30min or 60 min file'''

        if filename[-5]=='1':
            filetype ='1min'

        elif filename[-5] == '5':
            filetype ='5min'

        elif filename[-5] == '0':
            if filename[-6] == '3':
                filetype ='30min'
            elif filename[-6] == '6':
                filetype ='60min'

        r = 0

        start = time.time()
        operations =[]

        for row in data:

            """ write 5000 updates at a time"""

            if len(operations) == 5000:
                coll.bulk_write(operations,ordered = False)
                end = time.time()
                r+=1
                print('ops is now'+ str(r)+' '+str(end-start))
                operations =[]

                """ use the correct date format"""
            try:
                date = datetime.strptime(row[1],'%d/%m/%Y %H:%M')
            except ValueError:
                date = datetime.strptime(row[1],'%d-%b-%Y %H:%M')

            mins = date.minute
            placeset = filetype +'.'+ str(mins)

            updatedate = date.replace(minute =0)  #change date so can be used to find update document in update query

            if row[5] > row[2]:
                direction = 1 #ticker rose in time period
            elif row[5]< row[2]:
                direction = 0 #ticker fell in time period
            else:
                direction = 2 #ticker finished equal


            operations.append(Updateone({"ticker":row[0],"date": updatedate},{"$set":
                                 {placeset:
                                     { "p":[row[2],row[3],row[4],row[5]],"v":row[6],"d":direction
                                      },}  
                            },upsert = True))


        #logging.info('number of rows(prices) inserted from ' + filename + ' ' + str(rw) + ' insert complete\n\n' )
        if ( len(operations) > 0 ):
            coll.bulk_write(operations,ordered=False)
        print('finished')     

if __name__ == '__main__': 

    #logging.basicConfig(filename="D:\msjho\Downloads\Logs\importstockdata.log",level=logging.INFO)
    filename = 'NYSE_20190903_1.csv'
    startmongo()
    importmongo(filename)
    updatePriceInfo(filename)
mystiquec 回答:使用pymongo更新Mongodb的过程变得非常缓慢

如果您没有任何索引,那么我建议创建一个索引,如下所示:

db.coll.createIndex( { "ticker": 1,"date": 1 } )
本文链接:https://www.f2er.com/3109703.html

大家都在问