API Docs

class dask_yarn.YarnCluster(environment=None, n_workers=None, worker_vcores=None, worker_memory=None, worker_restarts=None, worker_env=None, scheduler_vcores=None, scheduler_memory=None, deploy_mode=None, name=None, queue=None, tags=None, skein_client=None)

Start a Dask cluster on YARN.

You can define default values for this in Dask’s yarn.yaml configuration file. See http://docs.dask.org/en/latest/configuration.html for more information.

Parameters:
environment : str, optional

Path to an archived Python environment (either tar.gz or zip).

n_workers : int, optional

The number of workers to initially start.

worker_vcores : int, optional

The number of virtual cores to allocate per worker.

worker_memory : str, optional

The amount of memory to allocate per worker. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.

worker_restarts : int, optional

The maximum number of worker restarts to allow before failing the application. Default is unlimited.

worker_env : dict, optional

A mapping of environment variables to their values. These will be set in the worker containers before starting the dask workers.

scheduler_vcores : int, optional

The number of virtual cores to allocate per scheduler.

scheduler_memory : str, optional

The amount of memory to allocate to the scheduler. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.

deploy_mode : {‘remote’, ‘local’}, optional

The deploy mode to use. If 'remote', the scheduler will be deployed in a YARN container. If 'local', the scheduler will run locally, which can be nice for debugging. Default is 'remote'.

name : str, optional

The application name.

queue : str, optional

The queue to deploy to.

tags : sequence, optional

A set of strings to use as tags for this application.

skein_client : skein.Client, optional

The skein.Client to use. If not provided, one will be started.

Examples

>>> cluster = YarnCluster(environment='my-env.tar.gz', ...)
>>> cluster.scale(10)
close(**kwargs)

Close this cluster. An alias for shutdown.

See also

shutdown

Link to the dask dashboard. None if dashboard isn’t running

classmethod from_application_id(app_id, skein_client=None)

Connect to an existing YarnCluster with a given application id.

Parameters:
app_id : str

The existing cluster’s application id.

skein_client : skein.Client

The skein.Client to use. If not provided, one will be started.

Returns:
YarnCluster
classmethod from_current()

Connect to an existing YarnCluster from inside the cluster.

Returns:
YarnCluster
classmethod from_specification(spec, skein_client=None)

Start a dask cluster from a skein specification.

Parameters:
spec : skein.ApplicationSpec, dict, or filename

The application specification to use. Must define at least one service: 'dask.worker'. If no 'dask.scheduler' service is defined, a scheduler will be started locally.

skein_client : skein.Client, optional

The skein.Client to use. If not provided, one will be started.

scale(n)

Scale cluster to n workers.

Parameters:
n : int

Target number of workers

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
scale_down(workers)

Retire the selected workers.

Parameters:
workers: list

List of addresses of workers to close.

scale_up(n, workers=None)

Ensure there are atleast n dask workers available for this cluster.

No-op if n is less than the current number of workers.

Examples

>>> cluster.scale_up(20)  # ask for twenty workers
shutdown(status='SUCCEEDED', diagnostics=None)

Shutdown the application.

Parameters:
status : {‘SUCCEEDED’, ‘FAILED’, ‘KILLED’}, optional

The yarn application exit status.

diagnostics : str, optional

The application exit message, usually used for diagnosing failures. Can be seen in the YARN Web UI for completed applications under “diagnostics”. If not provided, a default will be used.

workers()

A list of all currently running worker containers.