Python多重处理如何在不使用.join()方法的情况下更新管理器列表中的复杂对象

大约2个月前,我开始使用Python编程,而在过去的2周中,我一直在努力解决此问题。 我知道有很多与此类似的线程,但是我找不到适合我情况的解决方案。

我需要一个主进程,它是与Telegram交互的一个进程,另一个进程是buffer,它了解从主进程接收到的复杂对象并对其进行更新。

我想以一种更简单,更流畅的方式进行操作。

由于未使用join()方法进行多次处理,因此当前对象尚未更新。

然后我尝试使用多线程代替,但是它给我带来了与Pyrogram的兼容性问题,这是我用来与Telegram交互的框架。

我再次写了项目的“复杂性”,以便重现我所遇到的相同错误,并从每个人那里获得并提供最佳帮助。

a.py

class A():
    def __init__(self,length = -1,height = -1):
        self.length = length
        self.height = height

b.py

from a import A
class B(A):
    def __init__(self,height = -1,width = -1):
        super().__init__(length = -1,height = -1)
        self.length = length
        self.height = height
        self.width = width

    def setHeight(self,value):
        self.height = value

c.py

class C():
    def __init__(self,a,x = 0,y = 0):
        self.a = a
        self.x = x
        self.y = y

    def func1(self):
        if self.x < 7:
            self.x = 7

d.py

from c import C
class D(C):
    def __init__(self,y = 0,z = 0):
        super().__init__(a,y = 0)
        self.a = a
        self.x = x
        self.y = y
        self.z = z

    def func2(self):
        self.func1()

main.py

from b import B
from d import D
from  multiprocessing import Process,Manager
from buffer import buffer

if __name__ == "__main__":

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer,args = (lizt,)) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to keep running while the buffer process takes a few minutes to end an instance passed in the list
    #hence I can't wait the join() function to update the objects inside the buffer but i need objects updated in order to pop them out from the list

    import datetime as dt
    t = dt.datetime.now()

    #library of kind of multithreading (pool of 4 processes),uses asyncio lib
    #this while was put to reproduce the same error I am getting

    while True:
        if t + dt.timedelta(seconds = 10) < dt.datetime.now():
            lizt.append(D(B(5,5,5)))
            t = dt.datetime.now()


"""
#This is the code which looks like the one in my project

#main.py
from pyrogram import Client #library of kind of multithreading (pool of 4 processes),uses asyncio lib
from b import B
from d import D
from  multiprocessing import Process,Manager
from buffer import buffer

if __name__ == "__main__":

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account",api_id,api_hash)

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer,)) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to run at the same time as the buffer process
    #hence I can't wait the join() function to update the objects inside the buffer

@app.on_message()
def my_handler(client,message):
    lizt.append(complex_object_conatining_message)
"""

buffer.py

def buffer(buffer):
    print("buffer was defined")
    while True:
        if len(buffer) > 0:
            print(buffer[0].x) #prints 0
            buffer[0].func2() #this changes the class attribute locally in the class instance but not in here
            print(buffer[0].x) #prints 0,but I'd like it to be 7

            print(buffer[0].a.height) #prints 5
            buffer[0].a.setHeight(10) #and this has the same behaviour
            print(buffer[0].a.height) #prints 5 but I'd like it to be 10

            buffer.pop(0)

这是关于我遇到的问题的完整代码。 从字面上看,每个建议都是受欢迎的,希望是建设性的,在此先感谢您!

skycom11 回答:Python多重处理如何在不使用.join()方法的情况下更新管理器列表中的复杂对象

最后,我不得不改变解决此问题的方式,就像框架一样,使用asyncio。

此解决方案可提供我一直在寻找的所有内容:

复杂对象更新

-避免多处理问题(尤其是join()的问题)

它也是:

-lightweight:在我有2个python进程之前1)大约40K 2)大约75K

此实际过程约为30K(而且更快,更干净)

这是解决方案,我希望它对像我一样的其他人有用:

跳过部分类,因为此解决方案绝对可以更新复杂的对象

main.py

from pyrogram import Client
import asyncio
import time

def cancel_tasks():
    #get all task in current loop
    tasks = asyncio.Task.all_tasks()
    for t in tasks:
        t.cancel()

try:
    buffer = []
    firstWorker(buffer) #this one is the old buffer.py file and function
    #the missing loop and loop method are explained in the next piece of code
except KeyboardInterrupt:
    print("")
finally:
    print("Closing Loop")
    cancel_tasks()

firstWorker.py

import asyncio
def firstWorker(buffer):
    print("First Worker Executed")

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account",api_id,api_hash)

    @app.on_message()
    async def my_handler(client,message):
        print("Message Arrived")
        buffer.append(complex_object_conatining_message)
        await asyncio.sleep(1)

    app.run(secondWorker(buffer)) #here is the trick: I changed the 
                                  #method run() of the Client class 
                                  #inside the Pyrogram framework 
                                  #since it was a loop itself. 
                                  #In this way I added another task 
                                  #to the existing loop in orther to 
                                  #let run both of them together.

我的secondWorker.py

import asyncio
async def secondWorker(buffer):
    while True:
        if len(buffer) > 0:
            print(buffer.pop(0))

        await asyncio.sleep(1)

了解此代码中使用的asyncio的资源可以在这里找到:

Asyncio simple tutorial

Python Asyncio Official Documentation

This tutorial about how to fix classical Asyncio errors

本文链接:https://www.f2er.com/3043919.html

大家都在问