API Docs

class dask_yarn.YarnCluster(environment=None, n_workers=None, worker_vcores=None, worker_memory=None, worker_restarts=None, worker_env=None, worker_class=None, worker_options=None, worker_gpus=None, scheduler_vcores=None, scheduler_gpus=None, scheduler_memory=None, deploy_mode=None, name=None, queue=None, tags=None, user=None, host=None, port=None, dashboard_address=None, skein_client=None, asynchronous=False, loop=None)

Start a Dask cluster on YARN.

You can define default values for this in Dask’s yarn.yaml configuration file. See http://docs.dask.org/en/latest/configuration.html for more information.

Parameters:
environment : str, optional

The Python environment to use. Can be one of the following:

  • A path to an archived Python environment
  • A path to a conda environment, specified as conda:///…
  • A path to a virtual environment, specified as venv:///…
  • A path to a python executable, specifed as python:///…

Note that if not an archive, the paths specified must be valid on all nodes in the cluster.

n_workers : int, optional

The number of workers to initially start.

worker_vcores : int, optional

The number of virtual cores to allocate per worker.

worker_memory : str, optional

The amount of memory to allocate per worker. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.

worker_restarts : int, optional

The maximum number of worker restarts to allow before failing the application. Default is unlimited.

worker_env : dict, optional

A mapping of environment variables to their values. These will be set in the worker containers before starting the dask workers.

worker_gpus : int, options

The number of gpus to allocate per worker

scheduler_vcores : int, optional

The number of virtual cores to allocate per scheduler.

scheduler_gpus : int, options

The number of gpus to allocate per scheduler

scheduler_memory : str, optional

The amount of memory to allocate to the scheduler. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.

deploy_mode : {‘remote’, ‘local’}, optional

The deploy mode to use. If 'remote', the scheduler will be deployed in a YARN container. If 'local', the scheduler will run locally, which can be nice for debugging. Default is 'remote'.

name : str, optional

The application name.

queue : str, optional

The queue to deploy to.

tags : sequence, optional

A set of strings to use as tags for this application.

user : str, optional

The user to submit the application on behalf of. Default is the current user - submitting as a different user requires user permissions, see the YARN documentation for more information.

host : str, optional

Host address on which the scheduler will listen. Only used if deploy_mode='local'. Defaults to '0.0.0.0'.

port : int, optional

The port on which the scheduler will listen. Only used if deploy_mode='local'. Defaults to 0 for a random port.

dashboard_address : str

Address on which to the dashboard server will listen. Only used if deploy_mode='local'. Defaults to ‘:0’ for a random port.

skein_client : skein.Client, optional

The skein.Client to use. If not provided, one will be started.

asynchronous : bool, optional

If true, starts the cluster in asynchronous mode, where it can be used in other async code.

loop : IOLoop, optional

The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.

Examples

>>> cluster = YarnCluster(environment='my-env.tar.gz', ...)
>>> cluster.scale(10)
adapt(minimum=0, maximum=inf, interval='1s', wait_count=3, target_duration='5s', **kwargs)

Turn on adaptivity

This scales Dask clusters automatically based on scheduler activity.

Parameters:
minimum : int, optional

Minimum number of workers. Defaults to 0.

maximum : int, optional

Maximum number of workers. Defaults to inf.

interval : timedelta or str, optional

Time between worker add/remove recommendations.

wait_count : int, optional

Number of consecutive times that a worker should be suggested for removal before we remove it.

target_duration : timedelta or str, optional

Amount of time we want a computation to take. This affects how aggressively we scale up.

**kwargs :

Additional parameters to pass to distributed.Scheduler.workers_to_close.

Examples

>>> cluster.adapt(minimum=0, maximum=10)
close(**kwargs)

Close this cluster. An alias for shutdown.

See also

shutdown

Link to the dask dashboard. None if dashboard isn’t running

classmethod from_application_id(app_id, skein_client=None, asynchronous=False, loop=None)

Connect to an existing YarnCluster with a given application id.

Parameters:
app_id : str

The existing cluster’s application id.

skein_client : skein.Client

The skein.Client to use. If not provided, one will be started.

asynchronous : bool, optional

If true, starts the cluster in asynchronous mode, where it can be used in other async code.

loop : IOLoop, optional

The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.

Returns:
YarnCluster
classmethod from_current(asynchronous=False, loop=None)

Connect to an existing YarnCluster from inside the cluster.

Parameters:
asynchronous : bool, optional

If true, starts the cluster in asynchronous mode, where it can be used in other async code.

loop : IOLoop, optional

The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.

Returns:
YarnCluster
classmethod from_specification(spec, skein_client=None, asynchronous=False, loop=None)

Start a dask cluster from a skein specification.

Parameters:
spec : skein.ApplicationSpec, dict, or filename

The application specification to use. Must define at least one service: 'dask.worker'. If no 'dask.scheduler' service is defined, a scheduler will be started locally.

skein_client : skein.Client, optional

The skein.Client to use. If not provided, one will be started.

asynchronous : bool, optional

If true, starts the cluster in asynchronous mode, where it can be used in other async code.

loop : IOLoop, optional

The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.

logs(scheduler=True, workers=True)

Return logs for the scheduler and/or workers

Parameters:
scheduler : boolean, optional

Whether or not to collect logs for the scheduler

workers : boolean or iterable, optional

A list of worker addresses to select. Defaults to all workers if True or no workers if False

Returns:
logs : dict

A dictionary of name -> logs.

scale(n)

Scale cluster to n workers.

Parameters:
n : int

Target number of workers

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
shutdown(status='SUCCEEDED', diagnostics=None)

Shutdown the application.

Parameters:
status : {‘SUCCEEDED’, ‘FAILED’, ‘KILLED’}, optional

The yarn application exit status.

diagnostics : str, optional

The application exit message, usually used for diagnosing failures. Can be seen in the YARN Web UI for completed applications under “diagnostics”. If not provided, a default will be used.

workers()

A list of all currently running worker containers.