Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

CML Workers API Integration

The cml.workers_v1 module is a Python API available inside CML sessions for launching and managing compute worker sessions. This project uses four API calls to orchestrate the Dask cluster.

Note: cml.workers_v1 is only available inside a running CML session. It cannot be imported in a local Python environment.

API Surface

launch_workers()

Launches one or more CML worker sessions, each executing the specified shell command.

workers_v1.launch_workers(
    n=num_workers,      # int — number of sessions to launch
    cpu=cpu,            # int — vCPUs per session
    memory=memory,      # int — GiB RAM per session
    nvidia_gpu=0,       # int — GPUs per session (optional)
    code=code_string,   # str — shell command to execute (prefixed with !)
)

Returns: A list of worker metadata dicts:

[{"id": "worker-id-string", "app_url": "https://..."}]

The code parameter uses CML’s shell escape syntax (!command). For the scheduler this is !dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{port}; for workers it is !dask-worker {scheduler_url}.

await_workers()

Blocks until launched workers are running or have failed.

workers_v1.await_workers(
    workers,                    # list — return value from launch_workers()
    wait_for_completion=False,  # bool — False means return when workers are running (not finished)
    timeout_seconds=90,         # int — maximum wait time (scheduler only; workers use default)
)

Returns: A dict with a failures key:

{"failures": []}  # empty list on success

If failures is non-empty, the cluster utility raises RuntimeError.

list_workers()

Returns metadata for all active CML worker sessions in the current project.

workers_v1.list_workers()

Returns: A list of worker metadata dicts:

[{"id": "worker-id-string", "ip_address": "100.66.x.x", ...}]

This is used by get_scheduler_url() to discover the scheduler’s IP address by matching worker IDs.

stop_workers()

Terminates one or more CML worker sessions by ID.

workers_v1.stop_workers(*worker_ids)

Used during cluster shutdown to stop all scheduler and worker sessions:

workers_v1.stop_workers(
    *[w["id"] for w in cluster["scheduler"] + cluster["workers"]]
)

Parameters Used in This Project

ParameterSchedulerWorkerDescription
n1User-specified (e.g., 2)Number of CML worker sessions
cpu1User-specifiedvCPUs per session
memory2User-specifiedGiB RAM per session
nvidia_gpu00 (configurable)GPUs per session
code!dask-scheduler ...!dask-worker {url}Shell command executed in the session

Environment Variables

VariableSourceUsed ByDescription
CDSW_READONLY_PORTCML runtimedask_utils.pyPort for the Dask dashboard. Injected by CML into every session.