计算数据帧时出现CancelledError

我在不同的机器上安装了一个带有调度程序的容器和几个工作容器。 因此,我正在尝试运行下一个代码(只是我的问题的最小示例):

import os
import numpy as np
import dask.distributed as dds
import dask.dataframe as dd
import pandas as pd

client = dds.Client('ADDRESS_HERE') 

df = pd.DataFrame({
    'column': ['val1','val2','val1','val3'] * 30
})

df_dask = dd.from_pandas(df,npartitions=2)

df_unique = df_dask[['column']].dropna().drop_duplicates()

df_unique.compute()

但是最终的.compute()方法引发以下异常:

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-6-9ab9b046d151> in <module>
----> 1 df_unique.compute()

/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(self,**kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self,traverse=False,**kwargs)
    166         return result
    167 

/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(*args,**kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk,keys,**kwargs)
    437     return repack([f(r,*a) for r,(f,a) in zip(results,postcomputes)])
    438 

/usr/local/lib/python3.6/dist-packages/distributed/client.py in get(self,dsk,restrictions,loose_restrictions,resources,sync,asynchronous,direct,retries,priority,fifo_timeout,actors,**kwargs)
   2543                     should_rejoin = False
   2544             try:
-> 2545                 results = self.gather(packed,asynchronous=asynchronous,direct=direct)
   2546             finally:
   2547                 for f in futures.values():

/usr/local/lib/python3.6/dist-packages/distributed/client.py in gather(self,futures,errors,asynchronous)
   1843                 direct=direct,1844                 local_worker=local_worker,-> 1845                 asynchronous=asynchronous,1846             )
   1847 

/usr/local/lib/python3.6/dist-packages/distributed/client.py in sync(self,func,callback_timeout,*args,**kwargs)
    760         else:
    761             return sync(
--> 762                 self.loop,callback_timeout=callback_timeout,**kwargs
    763             )
    764 

/usr/local/lib/python3.6/dist-packages/distributed/utils.py in sync(loop,**kwargs)
    331     if error[0]:
    332         typ,exc,tb = error[0]
--> 333         raise exc.with_traceback(tb)
    334     else:
    335         return result[0]

/usr/local/lib/python3.6/dist-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout),future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

/usr/local/lib/python3.6/dist-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

CancelledError: 
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds,closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/distributed/utils.py",line 662,in log_errors
    yield
  File "/usr/local/lib/python3.6/dist-packages/distributed/client.py",line 1290,in _close
    await gen.with_timeout(timedelta(seconds=2),list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/distributed/utils.py",line 1019,in _reconnect
    await self._close()
  File "/usr/local/lib/python3.6/dist-packages/distributed/client.py",list(coroutines))
concurrent.futures._base.CancelledError

任何想法都可能是这个问题的根源吗?

UPD。似乎这是部署缓慢的问题,但我仍然不知道这个问题的根源。

taodaidai 回答:计算数据帧时出现CancelledError

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3152829.html

大家都在问