CML Workers API Integration
The cml.workers_v1 module is a Python API available inside CML sessions for launching and managing compute worker sessions. This project uses four API calls to orchestrate the Dask cluster.
Note:
cml.workers_v1is only available inside a running CML session. It cannot be imported in a local Python environment.
API Surface
launch_workers()
Launches one or more CML worker sessions, each executing the specified shell command.
workers_v1.launch_workers(
n=num_workers, # int — number of sessions to launch
cpu=cpu, # int — vCPUs per session
memory=memory, # int — GiB RAM per session
nvidia_gpu=0, # int — GPUs per session (optional)
code=code_string, # str — shell command to execute (prefixed with !)
)
Returns: A list of worker metadata dicts:
[{"id": "worker-id-string", "app_url": "https://..."}]
The code parameter uses CML’s shell escape syntax (!command). For the scheduler this is !dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{port}; for workers it is !dask-worker {scheduler_url}.
await_workers()
Blocks until launched workers are running or have failed.
workers_v1.await_workers(
workers, # list — return value from launch_workers()
wait_for_completion=False, # bool — False means return when workers are running (not finished)
timeout_seconds=90, # int — maximum wait time (scheduler only; workers use default)
)
Returns: A dict with a failures key:
{"failures": []} # empty list on success
If failures is non-empty, the cluster utility raises RuntimeError.
list_workers()
Returns metadata for all active CML worker sessions in the current project.
workers_v1.list_workers()
Returns: A list of worker metadata dicts:
[{"id": "worker-id-string", "ip_address": "100.66.x.x", ...}]
This is used by get_scheduler_url() to discover the scheduler’s IP address by matching worker IDs.
stop_workers()
Terminates one or more CML worker sessions by ID.
workers_v1.stop_workers(*worker_ids)
Used during cluster shutdown to stop all scheduler and worker sessions:
workers_v1.stop_workers(
*[w["id"] for w in cluster["scheduler"] + cluster["workers"]]
)
Parameters Used in This Project
| Parameter | Scheduler | Worker | Description |
|---|---|---|---|
n | 1 | User-specified (e.g., 2) | Number of CML worker sessions |
cpu | 1 | User-specified | vCPUs per session |
memory | 2 | User-specified | GiB RAM per session |
nvidia_gpu | 0 | 0 (configurable) | GPUs per session |
code | !dask-scheduler ... | !dask-worker {url} | Shell command executed in the session |
Environment Variables
| Variable | Source | Used By | Description |
|---|---|---|---|
CDSW_READONLY_PORT | CML runtime | dask_utils.py | Port for the Dask dashboard. Injected by CML into every session. |