我在不同的机器上安装了一个带有调度程序的容器和几个工作容器。 因此,我正在尝试运行下一个代码(只是我的问题的最小示例):
import os
import numpy as np
import dask.distributed as dds
import dask.dataframe as dd
import pandas as pd
client = dds.Client('ADDRESS_HERE')
df = pd.DataFrame({
'column': ['val1','val2','val1','val3'] * 30
})
df_dask = dd.from_pandas(df,npartitions=2)
df_unique = df_dask[['column']].dropna().drop_duplicates()
df_unique.compute()
但是最终的.compute()
方法引发以下异常:
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<ipython-input-6-9ab9b046d151> in <module>
----> 1 df_unique.compute()
/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(self,**kwargs)
163 dask.base.compute
164 """
--> 165 (result,) = compute(self,traverse=False,**kwargs)
166 return result
167
/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(*args,**kwargs)
434 keys = [x.__dask_keys__() for x in collections]
435 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436 results = schedule(dsk,keys,**kwargs)
437 return repack([f(r,*a) for r,(f,a) in zip(results,postcomputes)])
438
/usr/local/lib/python3.6/dist-packages/distributed/client.py in get(self,dsk,restrictions,loose_restrictions,resources,sync,asynchronous,direct,retries,priority,fifo_timeout,actors,**kwargs)
2543 should_rejoin = False
2544 try:
-> 2545 results = self.gather(packed,asynchronous=asynchronous,direct=direct)
2546 finally:
2547 for f in futures.values():
/usr/local/lib/python3.6/dist-packages/distributed/client.py in gather(self,futures,errors,asynchronous)
1843 direct=direct,1844 local_worker=local_worker,-> 1845 asynchronous=asynchronous,1846 )
1847
/usr/local/lib/python3.6/dist-packages/distributed/client.py in sync(self,func,callback_timeout,*args,**kwargs)
760 else:
761 return sync(
--> 762 self.loop,callback_timeout=callback_timeout,**kwargs
763 )
764
/usr/local/lib/python3.6/dist-packages/distributed/utils.py in sync(loop,**kwargs)
331 if error[0]:
332 typ,exc,tb = error[0]
--> 333 raise exc.with_traceback(tb)
334 else:
335 return result[0]
/usr/local/lib/python3.6/dist-packages/distributed/utils.py in f()
315 if callback_timeout is not None:
316 future = gen.with_timeout(timedelta(seconds=callback_timeout),future)
--> 317 result[0] = yield future
318 except Exception as exc:
319 error[0] = sys.exc_info()
/usr/local/lib/python3.6/dist-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
CancelledError:
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds,closing client
distributed.utils - ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/distributed/utils.py",line 662,in log_errors
yield
File "/usr/local/lib/python3.6/dist-packages/distributed/client.py",line 1290,in _close
await gen.with_timeout(timedelta(seconds=2),list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/distributed/utils.py",line 1019,in _reconnect
await self._close()
File "/usr/local/lib/python3.6/dist-packages/distributed/client.py",list(coroutines))
concurrent.futures._base.CancelledError
任何想法都可能是这个问题的根源吗?
UPD。似乎这是部署缓慢的问题,但我仍然不知道这个问题的根源。