The Ray API

Starting Ray

There are two main ways in which Ray can be used. First, you can start all of the relevant Ray processes and shut them all down within the scope of a single script. Second, you can connect to and use an existing Ray cluster.

Starting and stopping a cluster within a script

One use case is to start all of the relevant Ray processes when you call ray.init and shut them down when the script exits. These processes include local and global schedulers, an object store and an object manager, a redis server, and more.

Note: this approach is limited to a single machine.

This can be done as follows.

ray.init()

If there are GPUs available on the machine, you should specify this with the num_gpus argument. Similarly, you can also specify the number of CPUs with num_cpus.

ray.init(num_cpus=20, num_gpus=2)

By default, Ray will use psutil.cpu_count() to determine the number of CPUs, and by default the number of GPUs will be zero.

Instead of thinking about the number of “worker” processes on each node, we prefer to think in terms of the quantities of CPU and GPU resources on each node and to provide the illusion of an infinite pool of workers. Tasks will be assigned to workers based on the availability of resources so as to avoid contention and not based on the number of available worker processes.

Connecting to an existing cluster

Once a Ray cluster has been started, the only thing you need in order to connect to it is the address of the Redis server in the cluster. In this case, your script will not start up or shut down any processes. The cluster and all of its processes may be shared between multiple scripts and multiple users. To do this, you simply need to know the address of the cluster’s Redis server. This can be done with a command like the following.

ray.init(redis_address="12.345.67.89:6379")

In this case, you cannot specify num_cpus or num_gpus in ray.init because that information is passed into the cluster when the cluster is started, not when your script is started.

View the instructions for how to start a Ray cluster on multiple nodes.

ray.init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=0, redirect_output=False, num_cpus=None, num_gpus=None, num_redis_shards=None)

Either connect to an existing Ray cluster or start one and connect to it.

This method handles two cases. Either a Ray cluster already exists and we just attach this driver to it, or we start all of the processes associated with a Ray cluster and attach to the newly started cluster.

Parameters:
  • node_ip_address (str) – The IP address of the node that we are on.
  • redis_address (str) – The address of the Redis server to connect to. If this address is not provided, then this command will start Redis, a global scheduler, a local scheduler, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits.
  • object_id_seed (int) – Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs.
  • num_workers (int) – The number of workers to start. This is only provided if redis_address is not provided.
  • driver_mode (bool) – The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
  • redirect_output (bool) – True if stdout and stderr for all the processes should be redirected to files and false otherwise.
  • num_cpus (int) – Number of cpus the user wishes all local schedulers to be configured with.
  • num_gpus (int) – Number of gpus the user wishes all local schedulers to be configured with.
  • num_redis_shards – The number of Redis shards to start in addition to the primary Redis shard.
Returns:

Address information about the started processes.

Raises:

Exception – An exception is raised if an inappropriate combination of arguments is passed in.

Defining remote functions

Remote functions are used to create tasks. To define a remote function, the @ray.remote decorator is placed over the function definition.

The function can then be invoked with f.remote. Invoking the function creates a task which will be scheduled on and executed by some worker process in the Ray cluster. The call will return an object ID (essentially a future) representing the eventual return value of the task. Anyone with the object ID can retrieve its value, regardless of where the task was executed (see Getting values from object IDs).

When a task executes, its outputs will be serialized into a string of bytes and stored in the object store.

Note that arguments to remote functions can be values or object IDs.

@ray.remote
def f(x):
  return x + 1

x_id = f.remote(0)
ray.get(x_id)  # 1

y_id = f.remote(x_id)
ray.get(y_id)  # 2

If you want a remote function to return multiple object IDs, you can do that by passing the num_return_vals argument into the remote decorator.

@ray.remote(num_return_vals=2)
def f():
  return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2
ray.remote(*args, **kwargs)

This decorator is used to create remote functions.

Parameters:
  • num_return_vals (int) – The number of object IDs that a call to this function should return.
  • num_cpus (int) – The number of CPUs needed to execute this function. This should only be passed in when defining the remote function on the driver.
  • num_gpus (int) – The number of GPUs needed to execute this function. This should only be passed in when defining the remote function on the driver.

Getting values from object IDs

Object IDs can be converted into objects by calling ray.get on the object ID. Note that ray.get accepts either a single object ID or a list of object IDs.

@ray.remote
def f():
  return {'key1': ['value']}

# Get one object ID.
ray.get(f.remote())  # {'key1': ['value']}
# Get a list of object IDs.
ray.get([f.remote() for _ in range(2)])  # [{'key1': ['value']}, {'key1': ['value']}]

Numpy arrays

Numpy arrays are handled more efficiently than other data types, so use numpy arrays whenever possible.

Any numpy arrays that are part of the serialized object will not be copied out of the object store. They will remain in the object store and the resulting deserialized object will simply have a pointer to the relevant place in the object store’s memory.

Since objects in the object store are immutable, this means that if you want to mutate a numpy array that was returned by a remote function, you will have to first copy it.

ray.get(object_ids, worker=<ray.worker.Worker object>)

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned.

Parameters:object_ids – Object ID of the object to get or a list of object IDs to get.
Returns:A Python object or a list of Python objects.

Putting objects in the object store

The primary way that objects are placed in the object store is by being returned by a task. However, it is also possible to directly place objects in the object store using ray.put.

x_id = ray.put(1)
ray.get(x_id)  # 1

The main reason to use ray.put is that you want to pass the same large object into a number of tasks. By first doing ray.put and then passing the resulting object ID into each of the tasks, the large object is copied into the object store only once, whereas when we directly pass the object in, it is copied multiple times.

import numpy as np

@ray.remote
def f(x):
  pass

x = np.zeros(10 ** 6)

# Alternative 1: Here, x is copied into the object store 10 times.
[f.remote(x) for _ in range(10)]

# Alternative 2: Here, x is copied into the object store once.
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

Note that ray.put is called under the hood in a couple situations.

  • It is called on the values returned by a task.
  • It is called on the arguments to a task, unless the arguments are Python primitives like integers or short strings, lists, tuples, or dictionaries.
ray.put(value, worker=<ray.worker.Worker object>)

Store an object in the object store.

Parameters:value – The Python object to be stored.
Returns:The object ID assigned to this value.

Waiting for a subset of tasks to finish

It is often desirable to adapt the computation being done based on when different tasks finish. For example, if a bunch of tasks each take a variable length of time, and their results can be processed in any order, then it makes sense to simply process the results in the order that they finish. In other settings, it makes sense to discard straggler tasks whose results may not be needed.

To do this, we introduce the ray.wait primitive, which takes a list of object IDs and returns when a subset of them are available. By default it blocks until a single object is available, but the num_returns value can be specified to wait for a different number. If a timeout argument is passed in, it will block for at most that many milliseconds and may return a list with fewer than num_returns elements.

The ray.wait function returns two lists. The first list is a list of object IDs of available objects (of length at most num_returns), and the second list is a list of the remaining object IDs, so the combination of these two lists is equal to the list passed in to ray.wait (up to ordering).

import time
import numpy as np

@ray.remote
def f(n):
  time.sleep(n)
  return n

# Start 3 tasks with different durations.
results = [f.remote(i) for i in range(3)]
# Block until 2 of them have finished.
ready_ids, remaining_ids = ray.wait(results, num_returns=2)

# Start 5 tasks with different durations.
results = [f.remote(i) for i in range(3)]
# Block until 4 of them have finished or 2.5 seconds pass.
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

It is easy to use this construct to create an infinite loop in which multiple tasks are executing, and whenever one task finishes, a new one is launched.

@ray.remote
def f():
  return 1

# Start 5 tasks.
remaining_ids = [f.remote() for i in range(5)]
# Whenever one task finishes, start a new one.
for _ in range(100):
  ready_ids, remaining_ids = ray.wait(remaining_ids)
  # Get the available object and do something with it.
  print(ray.get(ready_ids))
  # Start a new task.
  remaining_ids.append(f.remote())
ray.wait(object_ids, num_returns=1, timeout=None, worker=<ray.worker.Worker object>)

Return a list of IDs that are ready and a list of IDs that are not ready.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of objectids.

This method returns two lists. The first list consists of object IDs that correspond to objects that are stored in the object store. The second list corresponds to the rest of the object IDs (which may or may not be ready).

Parameters:
  • object_ids (List[ObjectID]) – List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.
  • num_returns (int) – The number of object IDs that should be returned.
  • timeout (int) – The maximum amount of time in milliseconds to wait before returning.
Returns:

A list of object IDs that are ready and a list of the remaining object IDs.

Viewing errors

Keeping track of errors that occur in different processes throughout a cluster can be challenging. There are a couple mechanisms to help with this.

  1. If a task throws an exception, that exception will be printed in the background of the driver process.
  2. If ray.get is called on an object ID whose parent task threw an exception before creating the object, the exception will be re-raised by ray.get.

The errors will also be accumulated in Redis and can be accessed with ray.error_info. Normally, you shouldn’t need to do this, but it is possible.

@ray.remote
def f():
  raise Exception("This task failed!!")

f.remote()  # An error message will be printed in the background.

# Wait for the error to propagate to Redis.
import time
time.sleep(1)

ray.error_info()  # This returns a list containing the error message.
ray.error_info(worker=<ray.worker.Worker object>)

Return information about failed tasks.