Dask with ProxyStore¶
Last updated 14 December 2023
This guide discusses using ProxyStore with Dask.distributed. Using ProxyStore to pass intermediate values between function invocations can yield considerable speedups in distributed applications.
Note
This guide assumes familiarity with Dask.distributed and ProxyStore.
Getting Started¶
Dask.distributed is a library for futures-based distributed computing.
To pass proxy objects created with ProxyStore to
Dask.distributed's Client.submit() or
Client.map() functions, we need to adjust
how we schedule our functions.
Consider this trivial example where we submit sum() on a list
of numbers to the Dask.distributed scheduler, but we pass the input list as
a proxy.
Running this code will yield this error.
$ python example.py
Traceback (most recent call last):
File "/home/jgpaul/workspace/proxystore-extensions/example.py", line 15, in <module>
y = client.submit(sum, p)
^^^^^^^^^^^^^^^^^^^^^
File "/home/jgpaul/workspace/proxystore-extensions/venv/lib/python3.11/site-packages/distributed/client.py", line 1943, in submit
key = funcname(func) + "-" + tokenize(func, kwargs, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jgpaul/workspace/proxystore-extensions/venv/lib/python3.11/site-packages/dask/base.py", line 964, in tokenize
hasher = _md5(str(tuple(map(normalize_token, args))).encode())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jgpaul/workspace/proxystore-extensions/venv/lib/python3.11/site-packages/dask/utils.py", line 641, in __call__
meth = self.dispatch(type(arg))
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jgpaul/workspace/proxystore-extensions/venv/lib/python3.11/site-packages/dask/utils.py", line 626, in dispatch
toplevel, _, _ = cls2.__module__.partition(".")
^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'property' object has no attribute 'partition'
This is a particularly confusing error at first glance. By default,
Dask.distributed assumes pure function by default.
This assumption means that results of pure functions can be cached and returned
if the same function is invoked with the same inputs. The function invocations
are tracked using a hash of the function, positional args, and keyword args.
Dask has a mechanism for normalizing arbitrary types to string tokens which
can be used to construct this hash. However, Dask's dispatch mechanism for
the token normalization inspects the __module__ attribute of the object's
type. This is a problem with Proxy because that
implements __module__ as a property which gets the __module__ from the
proxy's target object. Thus, accessing __module__ is only valid on a
Proxy instance and not the type itself.
There's a lot of moving parts and nuance to why this bug appears, but
the good news is that this error can be avoided by either passing a custom
key or pure=False to Client.submit().
The bad news is that we lose ability to avoid redundant computations
(unless you are careful about reusing the same key).
Better ProxyStore Integration¶
Creating proxies to transfer function inputs or results as you see fit
may be sufficient, but a custom
Client if provided which will
automate the process of proxying large function inputs or results.
Warning
The custom Dask Client
is an experimental feature and the API may change in the future. If you
encounter unexpected behavior, please
open a bug report.
Using this custom client is as easy as changing your import and passing two extra arguments to the constructor.
- Change the import of
Clientfromdask.distributedtoproxystore.ex.plugins.distributed. - Pass your
Storeand threshold object size. Serialized objects larger than the threshold size in bytes will be serialized using the store you provide and pass-by-proxy.
The custom Client behaves
exactly as a normal Dask client when ps_store is None. But when
ProxyStore is configured, function args, kwargs, and results from
passed to or from Client.submit() and
Client.map() will be scanned and proxied as
necessary based on their size.
When invoking a function, you can alter this behavior by passing
proxy_args=False and/or proxy_result=False to disable proxying for that
specific function submission to the scheduler.
Warning
There are some edge cases to be aware of when using the automatic
proxying. Please read the documentation for
Client.submit() and
Client.map() for
the most up to date details.