我试图得到一个芹菜任务的结果,这个任务并不复杂,但是包含链和组。
为了更加方便,我希望能够通过调用get_leaf
函数来获取上一个子任务的结果,但是我无法通过此函数获得所需的信息。
这里是一个小例子,目的是试图解释我想做什么。
我的tasks.py
文件:
from celery import Celery,chain,group,signature
celery_app = Celery('test',backend="redis://localhost",broker="redis://localhost")
@celery_app.task
def task_1(string):
return list(string)
@celery_app.task
def task_2(char):
return char + "1"
@celery_app.task
def group_task(char_list):
return group(task_2.signature(args=[char]) for char in char_list)()
@celery_app.task
def concat_task(string_list):
return "".join(string_list)
@celery_app.task
def full_task(char_list):
return chain(group(task_2.signature(args=[char]) for char in char_list),signature(concat_task))()
使用以下命令启动工作程序:celery -A tasks worker --loglevel=info
,这是生成的日志:
$ celery -A tasks worker --loglevel=info
-------------- celery@datalab-mxnet-test.novalocal v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-862.14.4.el7.x86_64-x86_64-with-centos-7.6.1810-Core 2019-11-15 09:09:05
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: test:0x7fb7048d14e0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.concat_task
. tasks.full_task
. tasks.group_task
. tasks.task_1
. tasks.task_2
[2019-11-15 09:09:05,934: INFO/MainProcess] Connected to redis://localhost:6379//
[2019-11-15 09:09:05,950: INFO/MainProcess] mingle: searching for neighbors
[2019-11-15 09:09:06,980: INFO/MainProcess] mingle: all alone
[2019-11-15 09:09:07,003: INFO/MainProcess] celery@datalab-mxnet-test.novalocal ready.
[2019-11-15 09:09:08,028: INFO/MainProcess] Events of group {task} enabled by remote.
[2019-11-15 09:19:19,376: INFO/MainProcess] Received task: tasks.full_task[5d36ffad-ec11-4e3e-b9e0-e30f09656fb1]
[2019-11-15 09:19:19,387: INFO/MainProcess] Received task: tasks.task_2[d8aced04-f8f8-432b-ab97-fa9f226a080b]
[2019-11-15 09:19:19,393: INFO/MainProcess] Received task: tasks.task_2[99ea4414-6d74-44e8-888c-12dd7c52bbdb]
[2019-11-15 09:19:19,395: INFO/ForkPoolWorker-2] Task tasks.task_2[d8aced04-f8f8-432b-ab97-fa9f226a080b] succeeded in 0.0011995714157819748s: 'l1'
[2019-11-15 09:19:19,396: INFO/MainProcess] Received task: tasks.task_2[26846b2a-69aa-49a0-bbd1-3ed1715c99ae]
[2019-11-15 09:19:19,396: INFO/ForkPoolWorker-16] Task tasks.full_task[5d36ffad-ec11-4e3e-b9e0-e30f09656fb1] succeeded in 0.01657240092754364s: <AsyncResult: 33cc8dda-e83e-4374-9498-4fdb206b99ba>
[2019-11-15 09:19:19,398: INFO/ForkPoolWorker-3] Task tasks.task_2[99ea4414-6d74-44e8-888c-12dd7c52bbdb] succeeded in 0.003855053335428238s: 'a1'
[2019-11-15 09:19:19,401: INFO/MainProcess] Received task: tasks.task_2[13aae2c5-77b2-467f-9489-27463fa4bb75]
[2019-11-15 09:19:19,404: INFO/ForkPoolWorker-6] Task tasks.task_2[13aae2c5-77b2-467f-9489-27463fa4bb75] succeeded in 0.0013813097029924393s: 'a1'
[2019-11-15 09:19:19,406: INFO/ForkPoolWorker-5] Task tasks.task_2[26846b2a-69aa-49a0-bbd1-3ed1715c99ae] succeeded in 0.0035116691142320633s: 'l1'
[2019-11-15 09:19:19,407: INFO/MainProcess] Received task: tasks.task_2[5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50]
[2019-11-15 09:19:19,414: INFO/MainProcess] Received task: tasks.task_2[9f1f646a-20c5-48e3-941b-2010a4bc8eb1]
[2019-11-15 09:19:19,417: INFO/ForkPoolWorker-8] Task tasks.task_2[5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50] succeeded in 0.0018390323966741562s: 'l1'
[2019-11-15 09:19:19,444: INFO/ForkPoolWorker-9] Task tasks.task_2[9f1f646a-20c5-48e3-941b-2010a4bc8eb1] succeeded in 0.028518572449684143s: 'a1'
[2019-11-15 09:19:19,445: INFO/MainProcess] Received task: tasks.concat_task[33cc8dda-e83e-4374-9498-4fdb206b99ba]
[2019-11-15 09:19:19,453: INFO/ForkPoolWorker-11] Task tasks.concat_task[33cc8dda-e83e-4374-9498-4fdb206b99ba] succeeded in 0.005582597106695175s: 'l1a1a1l1l1a1'
为了获得结果,我正在尝试做的事情:
In [1]: from celery import Celery,group
In [2]: from celery.result import AsyncResult
In [3]: celery_app = Celery('test',backend='redis://localhost',broker='redis://localhost')
In [4]: task = celery_app.send_task("tasks.full_task",args=["lalala"])
In [5]: task_info = AsyncResult(task.id)
In [6]: task_info.get()
Out[6]:
[['33cc8dda-e83e-4374-9498-4fdb206b99ba',[['dafdb3c6-1d03-4ad1-8755-3952481eeab0',None],[[['d8aced04-f8f8-432b-ab97-fa9f226a080b',[['99ea4414-6d74-44e8-888c-12dd7c52bbdb',[['26846b2a-69aa-49a0-bbd1-3ed1715c99ae',[['13aae2c5-77b2-467f-9489-27463fa4bb75',[['5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50',[['9f1f646a-20c5-48e3-941b-2010a4bc8eb1',None]]]],None]
In [7]: task_info.get_leaf()
Out[7]: 'a1'
In [8]: task_info.graph
Out[8]:
5d36ffad-ec11-4e3e-b9e0-e30f09656fb1(7)
dafdb3c6-1d03-4ad1-8755-3952481eeab0(6)
d8aced04-f8f8-432b-ab97-fa9f226a080b(0)
99ea4414-6d74-44e8-888c-12dd7c52bbdb(0)
26846b2a-69aa-49a0-bbd1-3ed1715c99ae(0)
13aae2c5-77b2-467f-9489-27463fa4bb75(0)
5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50(0)
9f1f646a-20c5-48e3-941b-2010a4bc8eb1(0)
dafdb3c6-1d03-4ad1-8755-3952481eeab0(6)
d8aced04-f8f8-432b-ab97-fa9f226a080b(0)
99ea4414-6d74-44e8-888c-12dd7c52bbdb(0)
26846b2a-69aa-49a0-bbd1-3ed1715c99ae(0)
13aae2c5-77b2-467f-9489-27463fa4bb75(0)
5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50(0)
9f1f646a-20c5-48e3-941b-2010a4bc8eb1(0)
d8aced04-f8f8-432b-ab97-fa9f226a080b(0)
99ea4414-6d74-44e8-888c-12dd7c52bbdb(0)
26846b2a-69aa-49a0-bbd1-3ed1715c99ae(0)
13aae2c5-77b2-467f-9489-27463fa4bb75(0)
5bf5ee32-ef13-4d7d-9fec-a10ddbe95c50(0)
9f1f646a-20c5-48e3-941b-2010a4bc8eb1(0)
我不明白为什么33cc8dda-e83e-4374-9498-4fdb206b99ba
返回中的concat_task get
不在图中。
我期望的是能够使用get_leaf
函数获得此concat子任务的结果并获得在日志中可见的结果'l1a1a1l1l1a1'
。
如果有人可以告诉我我做错了,那将非常有帮助。
以下是我使用的Python软件包版本:
$ pip freeze
amqp==2.5.1
Babel==2.7.0
backcall==0.1.0
billiard==3.6.1.0
celery==4.3.0
certifi==2019.9.11
chardet==3.0.4
decorator==4.4.0
flower==0.9.3
idna==2.8
importlib-metadata==0.23
ipython==7.8.0
ipython-genutils==0.2.0
jedi==0.15.1
kombu==4.6.5
more-itertools==7.2.0
parso==0.5.1
pdf2image==1.10.0
pexpect==4.7.0
pickleshare==0.7.5
Pillow==6.2.0
prompt-toolkit==2.0.10
ptyprocess==0.6.0
Pygments==2.4.2
pytesseract==0.3.0
pytz==2019.3
redis==3.3.11
requests==2.22.0
six==1.12.0
tornado==5.1.1
traitlets==4.3.3
urllib3==1.25.6
vine==1.3.0
wcwidth==0.1.7
zipp==0.6.0