本文介绍了将数据分散到任务集群工作进程:未知地址方案'网关'的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在遵循the accepted answer to this SO question上找到的代码(&q;块,然后是散布部分),但在尝试将 pandas 散布给工作人员时遇到奇怪的错误。DataFrame。

如果重要的话,我正在使用jupyter笔记本电脑。

我不确定此错误是什么意思,它非常隐晦,因此如果有任何帮助,我们将不胜感激。

from dask_gateway import Gateway
import dask.dataframe as dd
import dask

gateway = Gateway()
options = gateway.cluster_options()
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(10)
client = cluster.get_client()

X_train = ... # build pandas.DataFrame

x = dd.from_pandas(X_train, npartitions=10)
x = x.persist(get=dask.threaded.get)  # chunk locally
futures = client.scatter(dict(x.dask))  # scatter chunks
x.dask = x 
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/tmp/ipykernel_567/3586545525.py in <module>
      1 x = dd.from_pandas(X_train, npartitions=10)
      2 x = x.persist(get=dask.threaded.get)  # chunk locally
----> 3 futures = client.scatter(dict(x.dask))  # scatter chunks
      4 x.dask = x

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, timeout, asynchronous)
   2182         else:
   2183             local_worker = None
-> 2184         return self.sync(
   2185             self._scatter,
   2186             data,

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    866             return future
    867         else:
--> 868             return sync(
    869                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    870             )

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330     if error[0]:
    331         typ, exc, tb = error[0]
--> 332         raise exc.with_traceback(tb)
    333     else:
    334         return result[0]

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in f()
    313             if callback_timeout is not None:
    314                 future = asyncio.wait_for(future, callback_timeout)
--> 315             result[0] = yield future
    316         except Exception:
    317             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
   2004             isinstance(k, (bytes, str)) for k in data
   2005         ):
-> 2006             d = await self._scatter(keymap(stringify, data), workers, broadcast)
   2007             return {k: d[stringify(k)] for k in data}
   2008 

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _scatter(self, data, workers, broadcast, direct, local_worker, timeout, hash)
   2073                 )
   2074             else:
-> 2075                 await self.scheduler.scatter(
   2076                     data=data2,
   2077                     workers=workers,

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    893             name, comm.name = comm.name, "ConnectionPool." + key
    894             try:
--> 895                 result = await send_recv(comm=comm, op=key, **kwargs)
    896             finally:
    897                 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    686         if comm.deserialize:
    687             typ, exc, tb = clean_exception(**response)
--> 688             raise exc.with_traceback(tb)
    689         else:
    690             raise Exception(response["exception_text"])

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in handle_comm()
    528                             result = asyncio.ensure_future(result)
    529                             self._ongoing_coroutines.add(result)
--> 530                             result = await result
    531                     except (CommClosedError, CancelledError):
    532                         if self.status in (Status.running, Status.paused):

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/scheduler.py in scatter()
   5795         assert isinstance(data, dict)
   5796 
-> 5797         keys, who_has, nbytes = await scatter_to_workers(
   5798             nthreads, data, rpc=self.rpc, report=False
   5799         )

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils_comm.py in scatter_to_workers()
    143     rpcs = {addr: rpc(addr) for addr in d}
    144     try:
--> 145         out = await All(
    146             [
    147                 rpcs[address].update_data(

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in All()
    214     while not tasks.done():
    215         try:
--> 216             result = await tasks.next()
    217         except Exception:
    218 

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv_from_rpc()
    893             name, comm.name = comm.name, "ConnectionPool." + key
    894             try:
--> 895                 result = await send_recv(comm=comm, op=key, **kwargs)
    896             finally:
    897                 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/core.py in send_recv()
    688             raise exc.with_traceback(tb)
    689         else:
--> 690             raise Exception(response["exception_text"])
    691     return response
    692 

Exception: ValueError("unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx', 'ws', 'wss'])")

推荐答案

dd.from_pandas()在内部执行此操作,因此您不必再手动执行此操作。您可以直接在x上使用Dask DataFrame接口,计算应该会自动在您的集群上运行。:)

answer you've linked是5年前的,现在已经过时了,因为自那以后达斯克已经成熟了很多。例如,x.dask现在引用";高级图形";(最近添加的功能),而不是低级图形。DaskGateway使用自己的URL方案,我猜它无法与这种旧的Dask语法正确连接。

还要注意,不再推荐混合调度程序(如答案中所述)。

这篇关于将数据分散到任务集群工作进程:未知地址方案&#39;网关&#39;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 03:20