Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

This Applied ML Prototype (AMP) demonstrates distributed XGBoost training using Dask on Cloudera Machine Learning (CML). The use case is credit card fraud detection under the memory-constrained distributed computing paradigm — data is partitioned across worker nodes while the model is replicated, enabling training on datasets that exceed single-node memory. A Dask cluster is orchestrated on-demand via the CML Workers API, used for training, then torn down.

This guide serves two audiences:

If you are…Start here
Building a distributed ML pipeline on CML using Dask or a similar frameworkArchitecture Reference
Building a validation SDK or CI/CD pipeline for ML artifacts targeting CMLData & Model Specification and Validation Rules

Terminology

TermDefinition
AMPApplied ML Prototype — a portable, declarative CML project with a .project-metadata.yaml that automates setup, dependency installation, and deployment.
CML Workers APIcml.workers_v1 — the Python API for launching and managing compute worker sessions within a CML project. Used here to orchestrate Dask scheduler and worker processes.
Dask ClusterAn ephemeral distributed compute cluster consisting of a scheduler, one or more workers, and a client. Orchestrated on CML via the Workers API.
Dask SchedulerA single CML worker process running dask-scheduler, responsible for coordinating task distribution among Dask workers. Listens on TCP port 8786.
Dask WorkerA CML worker process running dask-worker, executing distributed computation tasks assigned by the scheduler.
Dask ClientA Python object (dask.distributed.Client) instantiated in the notebook session that submits work to the scheduler.
DaskDMatrixAn XGBoost data structure (xgb.dask.DaskDMatrix) optimized for distributed training. Partitions data across the Dask cluster.
Model EndpointA CML-hosted REST endpoint that loads a serialized XGBoost model and serves predictions via a predict_fraud(args) function.
BoosterThe trained XGBoost model object (xgb.core.Booster). Serialized to disk as a binary file and loaded at inference time.
AUCPRArea Under the Precision-Recall Curve — the primary evaluation metric, chosen over accuracy due to extreme class imbalance (0.16% fraud rate).

End-to-End Lifecycle

The upstream validation SDK’s responsibility ends at structural and contract validation — ensuring the project conforms to the structure documented in the Data & Model Specification and Deployment chapters. CML handles runtime orchestration and endpoint serving.

System Overview

The system comprises four layers: a JupyterLab session (user entry point), an ephemeral Dask cluster (distributed compute), a CML Model Endpoint (inference serving), and the CML platform services that orchestrate everything.

Repository Layout

.
├── .project-metadata.yaml          AMP declarative config
├── cdsw-build.sh                   Model endpoint build script
├── requirements.txt                Pinned dependencies (Dask 2022.5.1, XGBoost 1.6.1)
├── setup.py                        Local utils package (cdsw-dask-utils 0.1.0)
├── data/
│   └── creditcardsample.csv        Sample dataset (94,926 rows × 31 columns)
├── model/
│   └── best-xgboost-model          Serialized XGBoost Booster (binary)
├── notebooks/
│   ├── dask-intro.ipynb            Dask concepts introduction
│   └── distributed-xgboost-with-dask.ipynb   Full ML pipeline
├── scripts/
│   ├── install_dependencies.py     Dependency installation (CML job)
│   └── predict_fraud.py            Inference endpoint function
└── utils/
    ├── __init__.py
    └── dask_utils.py               Dask cluster orchestration

Design Principles

  1. CML Workers as compute fabric. All distributed compute runs as CML worker sessions managed via cml.workers_v1. No external cluster manager (YARN, Kubernetes scheduler) is required.

  2. Ephemeral clusters. The Dask cluster exists only for the duration of training. Workers are explicitly stopped after training completes, freeing CML resources.

  3. Notebook-driven development. Notebooks are the primary deliverables. The utility library (dask_utils.py) exists solely to simplify cluster orchestration from notebooks.

  4. Decoupled inference. The model endpoint uses standard (non-Dask) XGBoost calls. Inference does not require a distributed cluster — a single CML session loads the serialized Booster and applies a threshold.

  5. Pinned dependencies. All library versions are locked in requirements.txt to ensure reproducibility across CML deployments.

CML Workers API Integration

The cml.workers_v1 module is a Python API available inside CML sessions for launching and managing compute worker sessions. This project uses four API calls to orchestrate the Dask cluster.

Note: cml.workers_v1 is only available inside a running CML session. It cannot be imported in a local Python environment.

API Surface

launch_workers()

Launches one or more CML worker sessions, each executing the specified shell command.

workers_v1.launch_workers(
    n=num_workers,      # int — number of sessions to launch
    cpu=cpu,            # int — vCPUs per session
    memory=memory,      # int — GiB RAM per session
    nvidia_gpu=0,       # int — GPUs per session (optional)
    code=code_string,   # str — shell command to execute (prefixed with !)
)

Returns: A list of worker metadata dicts:

[{"id": "worker-id-string", "app_url": "https://..."}]

The code parameter uses CML’s shell escape syntax (!command). For the scheduler this is !dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{port}; for workers it is !dask-worker {scheduler_url}.

await_workers()

Blocks until launched workers are running or have failed.

workers_v1.await_workers(
    workers,                    # list — return value from launch_workers()
    wait_for_completion=False,  # bool — False means return when workers are running (not finished)
    timeout_seconds=90,         # int — maximum wait time (scheduler only; workers use default)
)

Returns: A dict with a failures key:

{"failures": []}  # empty list on success

If failures is non-empty, the cluster utility raises RuntimeError.

list_workers()

Returns metadata for all active CML worker sessions in the current project.

workers_v1.list_workers()

Returns: A list of worker metadata dicts:

[{"id": "worker-id-string", "ip_address": "100.66.x.x", ...}]

This is used by get_scheduler_url() to discover the scheduler’s IP address by matching worker IDs.

stop_workers()

Terminates one or more CML worker sessions by ID.

workers_v1.stop_workers(*worker_ids)

Used during cluster shutdown to stop all scheduler and worker sessions:

workers_v1.stop_workers(
    *[w["id"] for w in cluster["scheduler"] + cluster["workers"]]
)

Parameters Used in This Project

ParameterSchedulerWorkerDescription
n1User-specified (e.g., 2)Number of CML worker sessions
cpu1User-specifiedvCPUs per session
memory2User-specifiedGiB RAM per session
nvidia_gpu00 (configurable)GPUs per session
code!dask-scheduler ...!dask-worker {url}Shell command executed in the session

Environment Variables

VariableSourceUsed ByDescription
CDSW_READONLY_PORTCML runtimedask_utils.pyPort for the Dask dashboard. Injected by CML into every session.

Dask Cluster Lifecycle

The Dask cluster is orchestrated by utils/dask_utils.py, which wraps the CML Workers API into a simple startup/shutdown sequence. The cluster is ephemeral — created at the start of a training session and torn down when training completes.

Lifecycle Sequence

Function Specifications

run_dask_cluster(num_workers, cpu, memory, nvidia_gpu=0, dashboard_port=DASHBOARD_PORT)

Entry point. Orchestrates the full cluster creation sequence by calling the four functions below in order.

Parameters:

ParameterTypeDefaultDescription
num_workersintNumber of Dask worker nodes
cpuintvCPUs per worker
memoryintGiB RAM per worker
nvidia_gpuint0GPUs per worker
dashboard_portstrCDSW_READONLY_PORTPort for the Dask dashboard

Returns:

{
    "scheduler": [{"id": str, "app_url": str}],
    "workers": [{"id": str, "app_url": str}, ...],
    "scheduler_address": "tcp://{ip}:8786",
    "dashboard_address": "https://{domain}/status"
}

run_scheduler(dashboard_port, num_workers=1, cpu=1, memory=2)

Launches a single CML worker running dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{dashboard_port}. Waits up to 90 seconds for the scheduler to become available. Raises RuntimeError if the scheduler fails to launch.

get_scheduler_url(dask_scheduler)

Queries workers_v1.list_workers() to find the scheduler’s IP address by matching worker IDs. Returns tcp://{ip}:8786.

get_dashboard_url(dask_scheduler)

Constructs the dashboard URL from the scheduler worker’s app_url field, appending /status.

run_dask_workers(scheduler_url, num_workers, cpu, memory, nvidia_gpu=0)

Launches N CML workers, each running dask-worker {scheduler_url}. Raises RuntimeError if any worker fails to launch.

Shutdown

After training completes, all cluster sessions must be explicitly stopped:

from cml import workers_v1

workers_v1.stop_workers(
    *[w["id"] for w in dask_cluster["scheduler"] + dask_cluster["workers"]]
)

This frees CML compute resources. There is no automatic cleanup — if the notebook kernel dies without calling stop_workers(), the sessions remain running until they hit the CML idle timeout.

ML Pipeline Stages

The ML pipeline is implemented in notebooks/distributed-xgboost-with-dask.ipynb. It proceeds through five stages: data loading, feature engineering, train/dev/val split, model training (with baselines and distributed XGBoost), and model validation with threshold selection.

Stage 1: Data Loading

dask_df = dd.read_csv("../data/creditcardsample.csv", assume_missing=True)

Data is loaded as a lazy Dask DataFrame. No computation occurs until .compute() is called. The assume_missing=True flag tells Dask to treat ambiguous columns as nullable, avoiding type inference errors on partitioned data.

Input: data/creditcardsample.csv — 94,926 rows, 31 columns (Time, V1–V28, Amount, Class).

Stage 2: Feature Engineering

  1. Drop the Time column (not useful for point-like fraud predictions).
  2. Separate Class as the target variable y.
  3. Remaining 29 columns (V1–V28, Amount) become the feature matrix X.
  4. Scale all features via StandardScaler (zero mean, unit variance).

See Feature Engineering Contract for the exact transformation rules.

Stage 3: Train/Dev/Val Split

Two successive calls to dask_ml.model_selection.train_test_split(shuffle=True):

  1. Split into 70% training and 30% holdout.
  2. Split the 30% holdout into 20% development and 10% validation (relative to original).

All splits maintain the class balance (~0.16% fraud). Data remains as distributed Dask arrays throughout training — only the final validation set is converted to NumPy for threshold analysis.

Stage 4: Model Training

Baselines (single-node)

Two baseline models establish performance floors:

ModelMethodDev AUCPR
DummyClassifier (majority)Always predicts non-fraud~0.50
LogisticRegressionNo regularization penalty, sklearn Pipeline with StandardScaler~0.75

Distributed XGBoost

  1. Convert Dask arrays to DaskDMatrix objects (memory-optimized for distributed training).
  2. Train with xgb.dask.train() using fixed parameters (tree_method=hist, objective=reg:logistic, eval_metric=aucpr).
  3. Tune hyperparameters via sequential random search (20 samples from 8-dimensional search space).
  4. Select the best model by dev AUCPR.

See XGBoost Training Configuration and Hyperparameter Search Space for full parameter specifications.

Stage 5: Validation and Threshold Selection

The best model is evaluated on the held-out validation set (10% of data, never seen during training or hyperparameter selection).

  1. Compute continuous predictions via xgb.dask.predict() on the validation Dask DataFrame, then convert to NumPy.
  2. Calculate AUCPR on the validation set.
  3. Generate precision-recall curve to select a classification threshold.
  4. The threshold of 0.35 is chosen to balance precision and recall for the fraud detection use case.
  5. Serialize the best model:
results["best_model"].save_model("../model/best-xgboost-model")

See Model Serialization Format for details on the saved artifact.

Dataset Schema

The project uses a sample of the Kaggle credit card fraud dataset, curated by the Machine Learning Group at Université Libre de Bruxelles.

Raw Schema

The CSV file contains 31 columns and 94,926 rows.

ColumnTypeRangeDescription
Timefloat640 – 172,792Seconds elapsed from the first transaction in the dataset. Dropped during feature engineering — not useful for point-like fraud predictions.
V1float64≈ −56 to 2PCA component 1 (privacy-preserving transformation applied by the dataset curators)
V2float64≈ −73 to 22PCA component 2
V3float64≈ −48 to 10PCA component 3
V4V28float64variesPCA components 4 through 28. Exact ranges vary per component. All are the result of a PCA transformation; original feature names are undisclosed for confidentiality.
Amountfloat640 – 25,691Transaction amount in original currency units
Classfloat640.0 or 1.0Target variable. 0 = legitimate transaction, 1 = fraudulent transaction.

Class Distribution

ClassCountPercentage
0 (legitimate)94,77799.84%
1 (fraud)1490.16%

The dataset is extremely imbalanced. This motivates the use of AUCPR (Area Under the Precision-Recall Curve) rather than accuracy as the primary evaluation metric.

Raw vs. Model-Input Schema

SchemaColumnsPurpose
Raw (CSV)31: Time, V1–V28, Amount, ClassAs loaded from data/creditcardsample.csv
Model input29: V1–V28, AmountAfter dropping Time and separating Class as the target. Features are ordered alphabetically by column name (Amount, V1, V2, …, V28).
Model target1: ClassBinary fraud label

The distinction between raw and model-input schema is critical for building a compatible inference client — the Model Endpoint Contract expects exactly 29 features in the model-input ordering.

Feature Engineering Contract

This page documents the exact transformations applied between raw CSV data and model input. A compatible implementation must reproduce these steps to generate features that the trained model can score correctly.

Transformation Steps

Step 1: Drop Time

dask_df = dask_df.drop(columns=["Time"])

The Time column represents seconds since the first transaction in the dataset. It is not useful for point-like fraud detection (individual transaction scoring) and is dropped before any further processing.

Step 2: Separate Target Variable

y = dask_df["Class"]
X = dask_df.drop(columns=["Class"])

After this step, X contains 29 feature columns and y contains the binary fraud label.

Step 3: Feature Ordering

The 29 columns in X are ordered alphabetically by column name:

Amount, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13,
V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26,
V27, V28

This ordering is inherited from the Dask DataFrame column order and is significant — the model endpoint receives features as a positional array, so the ordering must match exactly.

Step 4: StandardScaler

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

All 29 features are scaled to zero mean and unit variance. The PCA features (V1–V28) are already approximately centered by the PCA transformation, so the scaler primarily affects the Amount column, which has a much larger raw range (0 – 25,691).

Note: The scaler is fitted on the training split only, then applied to dev and validation splits. This prevents data leakage from the evaluation sets into the scaling parameters.

Summary

StepInputOutputNotes
Drop Time31 columns30 columnsRemoves non-predictive temporal feature
Separate Class30 columns29 features + 1 targetClass becomes the label y
StandardScaler29 features (raw scale)29 features (zero mean, unit variance)Fitted on training split only

XGBoost Training Configuration

This page documents the fixed XGBoost parameters used during both initial training and hyperparameter tuning.

Fixed Parameters

ParameterValueRationale
tree_methodhistHistogram-based tree construction. Required for distributed training with Dask — the exact method does not support distribution.
objectivereg:logisticOutputs a continuous probability in [0.0, 1.0], suitable for threshold-based binary classification.
eval_metricaucprArea Under the Precision-Recall Curve. Appropriate for highly imbalanced datasets where accuracy is misleading (a majority-class classifier achieves 99.84%).
verbosity2Detailed logging during training.
num_round5Number of boosting rounds. Kept low for the sample dataset; increase for larger data.

Evaluation Protocol

Both training and development sets are monitored per boosting round:

eval_list = [(ddev, "dev"), (dtrain, "train")]

The dev AUCPR at the final boosting round is the primary metric for model selection during hyperparameter tuning.

Training API

Training uses the Dask-aware XGBoost API, not the scikit-learn wrapper:

import xgboost as xgb

dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
ddev = xgb.dask.DaskDMatrix(client, X_dev, y_dev)

result = xgb.dask.train(
    client,
    params,          # dict of fixed + tunable parameters
    dtrain,
    num_boost_round=5,
    evals=[(ddev, "dev"), (dtrain, "train")],
)

Return Value

xgb.dask.train() returns a dict with two keys:

{
    "booster": xgb.core.Booster,   # trained model
    "history": {
        "dev": OrderedDict([("aucpr", [0.647, 0.822, ...])]),
        "train": OrderedDict([("aucpr", [0.806, 0.846, ...])])
    }
}
  • booster — the trained XGBoost model, an xgb.core.Booster object (not a scikit-learn estimator).
  • history — per-round evaluation metrics for each dataset in evals. The list length equals num_boost_round.

Distributed Prediction

predictions = xgb.dask.predict(client, booster, dask_dataframe)

Returns predictions distributed across the Dask cluster. Convert to NumPy with .compute() for local analysis.

For non-distributed inference (model endpoint), use booster.inplace_predict(numpy_array) instead. See Model Endpoint Contract.

Hyperparameter Search Space

Hyperparameter tuning uses a sequential random search over eight XGBoost parameters. The search is sequential (not parallel) because the Dask cluster is already fully occupied with distributed data — each trial trains a complete distributed model.

Search Space

ParameterDistributionRangeDescription
learning_rateUniform[0, 1]Step size shrinkage applied after each boosting round to prevent overfitting
gammaLog-Uniform[1×10⁻⁶, 10]Minimum loss reduction required to make a further partition on a leaf node
max_depthUniform Integer[1, 20)Maximum depth of each tree
min_child_weightUniform[0, 10]Minimum sum of instance weight (hessian) in a child node
max_delta_stepUniform[0, 10]Maximum delta step allowed for each tree’s weight estimation
subsampleUniform[0, 1]Fraction of training instances sampled per tree
lambdaUniform[0, 1]L2 regularization term on weights
alphaUniform[0, 1]L1 regularization term on weights

Search Strategy

from sklearn.model_selection import ParameterSampler
from scipy.stats import uniform, loguniform, randint

search_space = {
    "learning_rate": uniform(0, 1),
    "gamma": loguniform(1e-6, 1e+1),
    "max_depth": randint(1, 20),
    "min_child_weight": uniform(0, 10),
    "max_delta_step": uniform(0, 10),
    "subsample": uniform(0, 1),
    "lambda": uniform(0, 1),
    "alpha": uniform(0, 1),
}

sampler = ParameterSampler(search_space, n_iter=20, random_state=42)

Each sample is merged with the fixed parameters and used to train a complete distributed XGBoost model. The model with the highest dev AUCPR is selected.

To switch to grid search, replace ParameterSampler with sklearn.model_selection.ParameterGrid and provide discrete values instead of distributions.

tune_xgboost() Contract

The tuning function is defined inline in the notebook. Its effective signature and return value are:

def tune_xgboost(client, dtrain, params, search_space, num_samples,
                 random_state=42) -> dict:
    """
    Returns:
        {
            "best_model": xgb.core.Booster,   # best model by dev AUCPR
            "best_params": str,                # string repr of best parameter dict
            "best_score": float,               # best dev AUCPR
            "hp_history": pd.DataFrame,        # all trials with parameters and scores
        }
    """

Reference Results (Sample Dataset)

These results are from the 94,926-row sample dataset with 20 random search samples:

MetricValue
Best dev AUCPR (tuning)0.8040
Validation AUCPR (holdout)0.9325
Default model dev AUCPR (no tuning)0.8402

Results will vary with different random seeds and with the full Kaggle dataset.

Model Serialization Format

Serialization

The best model from hyperparameter tuning is saved using XGBoost’s native serialization:

results["best_model"].save_model("../model/best-xgboost-model")
PropertyValue
Pathmodel/best-xgboost-model
CML path/home/cdsw/model/best-xgboost-model
FormatXGBoost binary (default when no file extension is provided)
Typexgb.core.Booster — the raw XGBoost model, not a scikit-learn wrapper

Deserialization

import xgboost as xgb

booster = xgb.Booster(model_file="/home/cdsw/model/best-xgboost-model")

The model is loaded at module scope in scripts/predict_fraud.py, meaning it is loaded once when the CML Model Endpoint starts and reused for all subsequent prediction requests.

Inference Methods

MethodContextInput TypeOutput
booster.inplace_predict(np.array(...))Production (model endpoint)NumPy arrayFloat array, each value in [0.0, 1.0]
xgb.dask.predict(client, booster, dask_df)Training (notebook)Dask DataFrameDistributed predictions

The model endpoint uses inplace_predict because inference is single-node — no Dask cluster is required at serving time.

Threshold Contract

The model outputs a continuous probability. A threshold is applied to produce a binary classification:

ConditionOutputMeaning
prediction[0] <= 0.350Not fraud
prediction[0] > 0.351Fraud

The threshold of 0.35 is hardcoded in scripts/predict_fraud.py. It was selected by analyzing the precision-recall curve on the validation set — it produces a fraud prediction rate approximately matching the base rate (~0.16%). The optimal threshold depends on the business use case (cost of false positives vs. false negatives) and should be re-evaluated when training on new data.

Compatibility Notes

  • The serialized model requires xgboost >= 1.6.1 to load. Older versions may fail to deserialize.
  • The model expects exactly 29 input features in the order defined by the Feature Engineering Contract.
  • The model was trained with tree_method=hist and objective=reg:logistic. These are embedded in the serialized model and do not need to be specified at inference time.

AMP Project Structure

An Applied ML Prototype (AMP) is a portable, declarative CML project. Its behavior is defined by .project-metadata.yaml, which tells CML how to set up the runtime, install dependencies, and configure resources.

.project-metadata.yaml Specification

FieldValueDescription
nameDistributed XGBoost with Dask on CMLDisplay name in the AMP catalog
specification_version1.0AMP specification version
prototype_version1.0Project version
date2022-07-30Publication date

Runtime

runtimes:
  - editor: JupyterLab
    kernel: Python 3.9
    edition: Standard

The project requires a JupyterLab editor with a Python 3.9+ kernel. The Standard edition provides the base CML runtime without GPU drivers.

Tasks

The AMP defines two tasks that run automatically during project setup:

TaskTypeScriptResourcesDescription
Install Dependenciescreate_jobscripts/install_dependencies.py1 vCPU, 2 GiBCreates a CML job for dependency installation
(same)run_job(inherited)Executes the dependency installation job

The install_dependencies.py script uses CML’s shell escape syntax:

!pip3 install -r requirements.txt

Note: This is not valid standalone Python. The ! prefix is a CML session feature that executes shell commands.

Build Script

cdsw-build.sh is the build script for CML Model Endpoints. It runs during endpoint deployment to install runtime dependencies:

pip3 install -r requirements.txt

This ensures the inference script (scripts/predict_fraud.py) has access to xgboost and numpy at serving time.

Deployment Methods

There are three ways to deploy this AMP on CML:

  1. AMP Catalog — Navigate to the AMP Catalog in a CML workspace, select the “Distributed XGBoost with Dask on CML” tile, and follow the setup wizard.

  2. AMP Prototype URL — In a CML workspace, create a new project with “AMP” as the initial setup option and provide the Git repository URL.

  3. Manual Git Clone — Create a new project with “Git” as the initial setup option. In this case, run !pip install -r requirements.txt manually in a JupyterLab session before executing the notebooks.

After deployment, run either notebook by starting a Python 3.9+ JupyterLab session with at least 1 vCPU / 2 GiB.

Model Endpoint Contract

This is the authoritative specification for the inference API. An external client must conform to this contract to receive correct predictions from the deployed CML Model Endpoint.

Function Signature

def predict_fraud(args) -> int

CML invokes this function for each prediction request, passing the request body as args.

Input Contract

{
    "features": [[<29 float values>]]
}

The features value is a list containing a single list of 29 floats. The feature order must match the training schema — alphabetical by column name:

Amount, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13,
V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26,
V27, V28

See Feature Engineering Contract for details on how these features are derived from raw transaction data.

Output Contract

Returns an integer:

ValueMeaning
0Not fraud
1Fraud

Internal Logic

  1. Module load (once, at endpoint startup):

    booster = xgb.Booster(model_file='/home/cdsw/model/best-xgboost-model')
    threshold = 0.35
    
  2. Per-request:

    prediction = booster.inplace_predict(np.array(args['features']))
    if prediction[0] <= threshold:
        return 0  # not fraud
    return 1      # fraud
    

The model outputs a continuous probability in [0.0, 1.0]. The threshold of 0.35 converts this to a binary classification.

Sample Request

{
    "features": [[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
      -0.33832077, 0.462387778, 0.239598554, 0.0986979013,
      0.36378697, 0.090794172, -0.551599533, -0.617800856,
      -0.991389847, -0.311169354, 1.46817697, -0.470400525,
      0.207971242, 0.0257905802, 0.40399296, 0.251412098,
      -0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
      0.128539358, -0.189114844, 0.133558377, -0.0210530535,
      149.62]]
}

Sample Response

0

CML Model Endpoint Configuration

SettingValue
Build Filecdsw-build.sh
Target Filescripts/predict_fraud.py
Functionpredict_fraud

The build file runs pip3 install -r requirements.txt to ensure xgboost and numpy are available. The target file loads the model at import time and exposes the predict_fraud function for CML to invoke.

Runtime Configuration

Environment Variables

VariableSourceUsed ByDescription
CDSW_READONLY_PORTCML runtimeutils/dask_utils.pyPort for the Dask dashboard. Injected by CML into every session automatically.

No additional environment variables are required. The project does not use API keys, database connections, or external service credentials.

Session Resources

Notebook Session

The JupyterLab session from which you run the notebooks:

ResourceMinimumRecommended
vCPU11
Memory2 GiB2 GiB
GPU00

Dask Cluster

Resources allocated via run_dask_cluster(). Each component runs as a separate CML worker session.

ComponentCountCPUMemoryGPU
Scheduler11 vCPU2 GiB0
WorkersUser-specified (default: 2)User-specifiedUser-specifiedOptional

Total footprint with 2 workers: 4 CML sessions (1 notebook + 1 scheduler + 2 workers), 4 vCPU, 8 GiB RAM.

The scheduler always uses 1 vCPU / 2 GiB. Worker resources are passed as arguments to run_dask_cluster() and can be scaled up for larger datasets.

Pinned Dependencies

All library versions are locked in requirements.txt:

PackageVersionRole
dask[complete]2022.5.1Distributed computing framework (all optional dependencies included)
dask-ml2022.1.22Machine learning extensions for Dask (train/test split, etc.)
matplotlib3.3.4Plotting (precision-recall curves, class distributions)
numpy1.22.4Numerical arrays (inference input/output)
pandas1.4.2DataFrames (hyperparameter history, data exploration)
scikit-learn1.0.2Baseline models, preprocessing (StandardScaler), metrics
scipy1.8.1Statistical distributions for hyperparameter search
seaborn0.11.2Statistical visualization
tqdm4.64.0Progress bars for hyperparameter search
xgboost1.6.1Gradient boosted trees (distributed training and inference)
-e .(local)Installs the utils package from setup.py in editable mode

Local Package

The -e . entry installs cdsw-dask-utils (version 0.1.0) from setup.py. This makes utils.dask_utils importable as a regular Python package. The editable install means changes to dask_utils.py take effect immediately without reinstallation.

Validation Rules Reference

This page defines the complete set of validation rules for a CML AMP project conforming to the Distributed XGBoost with Dask pattern. Rules are categorized by domain and severity.

Severity levels:

  • ERROR — the artifact is invalid and will fail at runtime.
  • WARNING — the artifact may work but deviates from the expected contract.

Structural Rules

RuleSeverityDescription
S-001ERRORRepository must contain .project-metadata.yaml at root
S-002ERROR.project-metadata.yaml must parse as valid YAML
S-003ERRORrequirements.txt must exist at root
S-004ERRORscripts/predict_fraud.py must exist
S-005ERRORmodel/best-xgboost-model must exist (post-training only)
S-006ERRORcdsw-build.sh must exist
S-007ERRORutils/dask_utils.py must exist
S-008ERRORsetup.py must exist at root

Dependency Rules

RuleSeverityDescription
D-001ERRORrequirements.txt must include xgboost
D-002ERRORrequirements.txt must include dask (with or without [complete] extra)
D-003ERRORrequirements.txt must include scikit-learn
D-004ERRORrequirements.txt must include -e . (local utils package)
D-005ERRORrequirements.txt must include numpy
D-W01WARNINGAll packages should have pinned versions (==) for reproducibility

Model Endpoint Rules

RuleSeverityDescription
E-001ERRORscripts/predict_fraud.py must parse as valid Python (no syntax errors)
E-002ERRORscripts/predict_fraud.py must define a function named predict_fraud
E-003ERRORpredict_fraud must accept exactly one parameter (args)
E-004ERRORModule must load a model from /home/cdsw/model/best-xgboost-model
E-005ERRORModule must define a threshold variable
E-W01WARNINGthreshold should be a float between 0.0 and 1.0
E-W02WARNINGpredict_fraud should return an integer (0 or 1)

AMP Configuration Rules

RuleSeverityDescription
A-001ERROR.project-metadata.yaml must contain a runtimes list with at least one entry
A-002ERRORRuntime kernel must specify Python 3.9 or higher
A-003ERRORRuntime editor must be JupyterLab
A-004ERRORAt least one task must reference scripts/install_dependencies.py
A-W01WARNINGspecification_version should be 1.0

Cluster Utility Rules

RuleSeverityDescription
C-W01WARNINGutils/dask_utils.py should define run_dask_cluster
C-W02WARNINGrun_dask_cluster should accept num_workers, cpu, memory parameters
C-W03WARNINGrun_dask_cluster should return a dict with keys: scheduler, workers, scheduler_address, dashboard_address
C-W04WARNINGScheduler should listen on TCP port 8786

Build Script Rules

RuleSeverityDescription
B-001ERRORcdsw-build.sh must be a valid shell script (starts with a command, not a syntax error)
B-W01WARNINGcdsw-build.sh should install dependencies from requirements.txt

Building a Validation SDK

This guide walks through building a Python SDK that validates a CML AMP project against the Validation Rules Reference. The SDK can be used standalone or integrated into a CI/CD pipeline.

Data Model

from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path


class Severity(Enum):
    ERROR = "error"
    WARNING = "warning"


@dataclass
class ValidationIssue:
    rule: str              # e.g., "S-001"
    severity: Severity
    message: str           # human-readable description
    path: str | None = None  # file path that triggered the issue


@dataclass
class ValidationResult:
    issues: list[ValidationIssue] = field(default_factory=list)

    @property
    def passed(self) -> bool:
        return not any(i.severity == Severity.ERROR for i in self.issues)

    @property
    def errors(self) -> list[ValidationIssue]:
        return [i for i in self.issues if i.severity == Severity.ERROR]

    @property
    def warnings(self) -> list[ValidationIssue]:
        return [i for i in self.issues if i.severity == Severity.WARNING]

Validation Pipeline

The SDK runs five validation steps in sequence. Each step appends issues to the shared ValidationResult.

Step 1: Check Repository Structure (S-rules)

def validate_structure(root: Path, result: ValidationResult) -> None:
    required_files = {
        "S-001": ".project-metadata.yaml",
        "S-003": "requirements.txt",
        "S-004": "scripts/predict_fraud.py",
        "S-006": "cdsw-build.sh",
        "S-007": "utils/dask_utils.py",
        "S-008": "setup.py",
    }
    for rule, path in required_files.items():
        if not (root / path).exists():
            result.issues.append(ValidationIssue(
                rule=rule,
                severity=Severity.ERROR,
                message=f"Required file missing: {path}",
                path=path,
            ))

Rule S-005 (model/best-xgboost-model) should only be checked post-training. Pass a flag to control this:

    if check_model and not (root / "model" / "best-xgboost-model").exists():
        result.issues.append(ValidationIssue(
            rule="S-005",
            severity=Severity.ERROR,
            message="Trained model missing: model/best-xgboost-model",
            path="model/best-xgboost-model",
        ))

Step 2: Parse AMP Configuration (A-rules)

import yaml

def validate_amp_config(root: Path, result: ValidationResult) -> None:
    config_path = root / ".project-metadata.yaml"
    try:
        config = yaml.safe_load(config_path.read_text())
    except yaml.YAMLError:
        result.issues.append(ValidationIssue(
            rule="S-002", severity=Severity.ERROR,
            message=".project-metadata.yaml is not valid YAML",
            path=".project-metadata.yaml",
        ))
        return

    # A-001: runtimes must exist
    runtimes = config.get("runtimes", [])
    if not runtimes:
        result.issues.append(ValidationIssue(
            rule="A-001", severity=Severity.ERROR,
            message="No runtimes defined in .project-metadata.yaml",
        ))
        return

    runtime = runtimes[0]

    # A-002: Python 3.9+
    kernel = runtime.get("kernel", "")
    if "Python" not in kernel:
        result.issues.append(ValidationIssue(
            rule="A-002", severity=Severity.ERROR,
            message=f"Runtime kernel must be Python 3.9+, got: {kernel}",
        ))
    else:
        try:
            version = float(kernel.split("Python")[1].strip())
            if version < 3.9:
                result.issues.append(ValidationIssue(
                    rule="A-002", severity=Severity.ERROR,
                    message=f"Python version must be >= 3.9, got: {version}",
                ))
        except (ValueError, IndexError):
            pass

    # A-003: JupyterLab
    if runtime.get("editor") != "JupyterLab":
        result.issues.append(ValidationIssue(
            rule="A-003", severity=Severity.ERROR,
            message=f"Runtime editor must be JupyterLab, got: {runtime.get('editor')}",
        ))

Step 3: Validate Dependencies (D-rules)

def validate_dependencies(root: Path, result: ValidationResult) -> None:
    req_path = root / "requirements.txt"
    if not req_path.exists():
        return  # already caught by S-003

    content = req_path.read_text().lower()
    checks = {
        "D-001": "xgboost",
        "D-002": "dask",
        "D-003": "scikit-learn",
        "D-005": "numpy",
    }
    for rule, pkg in checks.items():
        if pkg not in content:
            result.issues.append(ValidationIssue(
                rule=rule, severity=Severity.ERROR,
                message=f"Required package missing from requirements.txt: {pkg}",
                path="requirements.txt",
            ))

    if "-e ." not in content and "-e." not in content:
        result.issues.append(ValidationIssue(
            rule="D-004", severity=Severity.ERROR,
            message="Local package install (-e .) missing from requirements.txt",
            path="requirements.txt",
        ))

Step 4: Validate Endpoint Contract (E-rules)

import ast

def validate_endpoint(root: Path, result: ValidationResult) -> None:
    script_path = root / "scripts" / "predict_fraud.py"
    if not script_path.exists():
        return  # already caught by S-004

    source = script_path.read_text()

    # E-001: valid Python
    try:
        tree = ast.parse(source)
    except SyntaxError as e:
        result.issues.append(ValidationIssue(
            rule="E-001", severity=Severity.ERROR,
            message=f"Syntax error in predict_fraud.py: {e}",
            path="scripts/predict_fraud.py",
        ))
        return

    # E-002 & E-003: function exists with correct signature
    func = None
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef) and node.name == "predict_fraud":
            func = node
            break

    if func is None:
        result.issues.append(ValidationIssue(
            rule="E-002", severity=Severity.ERROR,
            message="Function 'predict_fraud' not found in predict_fraud.py",
            path="scripts/predict_fraud.py",
        ))
        return

    args = func.args
    total_args = len(args.args) - len(args.defaults)  # positional-only
    if len(args.args) != 1:
        result.issues.append(ValidationIssue(
            rule="E-003", severity=Severity.ERROR,
            message=f"predict_fraud must accept exactly 1 parameter, found {len(args.args)}",
            path="scripts/predict_fraud.py",
        ))

    # E-004 & E-005: check for model path and threshold in source
    if "best-xgboost-model" not in source:
        result.issues.append(ValidationIssue(
            rule="E-004", severity=Severity.ERROR,
            message="Model path '/home/cdsw/model/best-xgboost-model' not found",
            path="scripts/predict_fraud.py",
        ))

    if "threshold" not in source:
        result.issues.append(ValidationIssue(
            rule="E-005", severity=Severity.ERROR,
            message="'threshold' variable not found in predict_fraud.py",
            path="scripts/predict_fraud.py",
        ))

Step 5: Validate Cluster Utilities (C-rules)

def validate_cluster_utils(root: Path, result: ValidationResult) -> None:
    utils_path = root / "utils" / "dask_utils.py"
    if not utils_path.exists():
        return  # already caught by S-007

    source = utils_path.read_text()

    try:
        tree = ast.parse(source)
    except SyntaxError:
        return

    func_names = {
        node.name for node in ast.walk(tree)
        if isinstance(node, ast.FunctionDef)
    }

    if "run_dask_cluster" not in func_names:
        result.issues.append(ValidationIssue(
            rule="C-W01", severity=Severity.WARNING,
            message="Function 'run_dask_cluster' not found in dask_utils.py",
            path="utils/dask_utils.py",
        ))

    if "8786" not in source:
        result.issues.append(ValidationIssue(
            rule="C-W04", severity=Severity.WARNING,
            message="Scheduler port 8786 not referenced in dask_utils.py",
            path="utils/dask_utils.py",
        ))

Running the SDK

from pathlib import Path

def validate(project_root: str, check_model: bool = False) -> ValidationResult:
    root = Path(project_root)
    result = ValidationResult()

    validate_structure(root, result)
    validate_amp_config(root, result)
    validate_dependencies(root, result)
    validate_endpoint(root, result)
    validate_cluster_utils(root, result)

    return result


if __name__ == "__main__":
    result = validate(".")
    for issue in result.issues:
        print(f"[{issue.severity.value.upper()}] {issue.rule}: {issue.message}")
    if result.passed:
        print("\nValidation passed.")
    else:
        print(f"\nValidation failed with {len(result.errors)} error(s).")
        raise SystemExit(1)

Optional: Model Smoke Test

If the trained model is available, you can add a smoke test that loads it and runs inference with sample data:

def smoke_test_model(root: Path, result: ValidationResult) -> None:
    import numpy as np
    import xgboost as xgb

    model_path = root / "model" / "best-xgboost-model"
    booster = xgb.Booster(model_file=str(model_path))

    sample = np.array([[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
        -0.33832077, 0.462387778, 0.239598554, 0.0986979013,
        0.36378697, 0.090794172, -0.551599533, -0.617800856,
        -0.991389847, -0.311169354, 1.46817697, -0.470400525,
        0.207971242, 0.0257905802, 0.40399296, 0.251412098,
        -0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
        0.128539358, -0.189114844, 0.133558377, -0.0210530535,
        149.62]])

    prediction = booster.inplace_predict(sample)
    assert 0.0 <= prediction[0] <= 1.0, f"Prediction out of range: {prediction[0]}"
    binary = 0 if prediction[0] <= 0.35 else 1
    assert binary in (0, 1)

GitHub Actions Integration

This page provides a GitHub Actions workflow for validating the AMP project structure and optionally running a model smoke test in CI.

Validation Workflow

name: Validate AMP

on:
  push:
    branches: [master]
  pull_request:
    branches: [master]

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.9"

      - name: Install validation dependencies
        run: pip install pyyaml

      - name: Run structural validation
        run: python scripts/validate_amp.py

  smoke-test:
    runs-on: ubuntu-latest
    if: hashFiles('model/best-xgboost-model') != ''
    needs: validate
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.9"

      - name: Install dependencies
        run: |
          pip install numpy xgboost

      - name: Run model smoke test
        run: |
          python -c "
          import numpy as np
          import xgboost as xgb

          booster = xgb.Booster(model_file='model/best-xgboost-model')
          sample = np.array([[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
              -0.33832077, 0.462387778, 0.239598554, 0.0986979013,
              0.36378697, 0.090794172, -0.551599533, -0.617800856,
              -0.991389847, -0.311169354, 1.46817697, -0.470400525,
              0.207971242, 0.0257905802, 0.40399296, 0.251412098,
              -0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
              0.128539358, -0.189114844, 0.133558377, -0.0210530535,
              149.62]])

          prediction = booster.inplace_predict(sample)
          assert 0.0 <= prediction[0] <= 1.0, f'Out of range: {prediction[0]}'
          binary = 0 if prediction[0] <= 0.35 else 1
          assert binary in (0, 1), f'Unexpected output: {binary}'
          print(f'Prediction: {prediction[0]:.4f} -> {binary}')
          print('Smoke test passed.')
          "

Workflow Details

validate job

Runs on every push and pull request to master. Requires only pyyaml (no heavy ML dependencies). Executes a validation script that checks:

  • Repository structure (S-rules)
  • AMP configuration (A-rules)
  • Dependency manifest (D-rules)
  • Endpoint contract via AST parsing (E-rules)
  • Cluster utility structure (C-rules)

smoke-test job

Runs only when the trained model file exists in the repository (model/best-xgboost-model). Installs numpy and xgboost, loads the model, runs a sample prediction, and asserts the output is valid.

This job is conditional — it will be skipped if the model has not been committed (e.g., during initial development before training).

Setting Up the Validation Script

The workflow assumes scripts/validate_amp.py exists. Create this file using the SDK code from Building a Validation SDK:

# scripts/validate_amp.py
# Paste the ValidationResult, ValidationIssue, Severity classes
# and all validate_* functions from the SDK guide,
# then add the __main__ block:

if __name__ == "__main__":
    result = validate(".")
    for issue in result.issues:
        print(f"[{issue.severity.value.upper()}] {issue.rule}: {issue.message}")
    if not result.passed:
        raise SystemExit(1)
    print("All validation checks passed.")

Extending the Workflow

To add custom validation rules:

  1. Define a new rule ID (e.g., X-001) and severity in the Validation Rules Reference.
  2. Add a validate_custom() function to the SDK.
  3. Call it from the validate() entry point.

To validate dependency versions (not just presence):

import re

def validate_pinned_versions(root, result):
    content = (root / "requirements.txt").read_text()
    for line in content.strip().splitlines():
        line = line.strip()
        if line and not line.startswith("#") and not line.startswith("-"):
            if "==" not in line:
                result.issues.append(ValidationIssue(
                    rule="D-W01", severity=Severity.WARNING,
                    message=f"Package not pinned: {line}",
                    path="requirements.txt",
                ))

SIGDG Financial Transaction Ontology

The Signals Data Governance ontology (prefix SIGDG) is grounded in BFO 2020 (Basic Formal Ontology, ISO/IEC 21838-2:2021). BFO provides the upper-level categories that make ontologies from separate teams composable — preventing the class of integration failures Barry Smith illustrates with the 2006 Airbus A380 wiring debacle, where incompatible data representations across engineering teams cost $6 billion to correct. A shared top-level ontology is the mechanism that makes data descriptions joinable without ad-hoc reconciliation.

This page defines the financial transaction risk extension of SIGDG. It expands the existing SIGDG:0050 TransactionInformation node — which the Signals-360 metadata classification project defines as a leaf — into a full subtree covering fraud detection, transaction context, risk indicators, and regulatory scope. Both ontologies share the same CURIE prefix and BFO grounding, so categories from either project can be composed in a single governance policy.

BFO Grounding

BFO:0000031 generically dependent continuant
  └── SIGDG:0001 InformationEntity             ── "data that can be stored and transferred"
        └── SIGDG:0050 TransactionInformation   ── "event records: orders, sessions, timestamps"
              └── SIGDG:0100 FinancialTransaction  ── "monetary transfer event between parties"

BFO:0000019 quality
  ├── SIGDG:1001 SensitivityLevel              ── "inheres in an information entity"
  └── SIGDG:1050 TransactionRiskLevel          ── "inheres in a financial transaction record"

BFO:0000023 role
  ├── SIGDG:2001 DataSubjectRole               ── "externally grounded in governance context"
  └���─ SIGDG:2040 RegulatoryFrameworkRole       ── "externally grounded in regulatory scope"

BFO:0000015 process
  ├── SIGDG:3001 DataLifecycleProcess          ── "transformation, migration, classification"
  └── SIGDG:3040 FraudDetectionProcess         ── "scoring, retraining, threshold calibration"

BFO’s generically dependent continuant (GDC) is the natural home for information entities — they depend on some material carrier (disk, memory, wire) but can be copied and transferred between carriers. A financial transaction record is a GDC: it describes a real-world monetary event but exists independently of any single storage medium. Risk levels are qualities that inhere in transaction records. Regulatory scope is a role — the same transaction carries different obligations under PCI-DSS vs. AML/KYC.

CURIE Prefix

SIGDG → https://signals360.dev/ontology/dg/
BFO   → http://purl.obolibrary.org/obo/BFO_

The SIGDG prefix is shared with the Signals-360 metadata governance ontology. Code ranges are partitioned to avoid collision:

RangeOwnerDomain
00010091Signals-360Metadata column classification
01000199This projectFinancial transaction risk
10101040Signals-360Data sensitivity levels
10501059This projectTransaction risk levels
20102030Signals-360Data subject roles
20402049This projectRegulatory framework roles
30103030Signals-360Data lifecycle processes
30403049This projectFraud detection processes

Financial Transaction Hierarchy

SIGDG:0050  TransactionInformation                 (⊑ SIGDG:0001, from Signals-360)
└── SIGDG:0100  FinancialTransaction               "monetary transfer event between parties"
    ├── SIGDG:0110  TransactionRiskClassification   "risk assessment outcome for a transaction"
    │   ├── SIGDG:0111  LegitimateTransaction       "transaction consistent with cardholder behavior"
    │   └── SIGDG:0112  FraudulentTransaction        "unauthorized or deceptive transaction"
    │       ├── SIGDG:0113  CardNotPresentFraud      "fraud via remote channel without physical card"
    │       ├── SIGDG:0114  CounterfeitCardFraud     "fraud using cloned or fabricated card"
    │       ├── SIGDG:0115  LostStolenCardFraud      "fraud using a card obtained without consent"
    │       └── SIGDG:0116  AccountTakeoverFraud     "fraud via compromised account credentials"
    ├── SIGDG:0120  TransactionContext               "observable properties of a transaction event"
    │   ├── SIGDG:0121  MonetaryContext              "amount, currency, denomination"
    │   ├── SIGDG:0122  TemporalContext              "timestamp, time-of-day, day-of-week patterns"
    │   ├── SIGDG:0123  BehavioralContext            "latent patterns derived from transaction history"
    │   └── SIGDG:0124  MerchantContext              "merchant category, location, channel"
    └── SIGDG:0130  RiskIndicator                    "anomalous pattern suggesting elevated risk"
        ├── SIGDG:0131  AmountAnomaly                "amount deviates from cardholder norm"
        ├── SIGDG:0132  VelocityAnomaly              "transaction frequency exceeds cardholder baseline"
        ├── SIGDG:0133  GeographicAnomaly            "location inconsistent with cardholder history"
        └���─ SIGDG:0134  BehavioralAnomaly            "latent behavioral pattern deviates from norm"

Hierarchy Rationale

TransactionRiskClassification (0110) models the outcome of the fraud detection model. In the current binary implementation, only LegitimateTransaction and FraudulentTransaction are used. The four fraud subtypes (01130116) are provided for future multi-class models and for downstream governance policies that distinguish fraud mechanisms — a lost-card fraud may trigger a card replacement workflow while an account takeover triggers a credential reset.

TransactionContext (0120) models what the features describe. The credit card fraud dataset’s 29 features map to this subtree:

Feature(s)SIGDG CategoryNotes
AmountSIGDG:0121 MonetaryContextRaw transaction amount; the only non-PCA feature
Time (dropped)SIGDG:0122 TemporalContextSeconds since first transaction; dropped in preprocessing
V1V28SIGDG:0123 BehavioralContextPCA-transformed latent dimensions; original features undisclosed
(not in dataset)SIGDG:0124 MerchantContextPlaceholder for merchant data in future datasets

The PCA features are deliberately anonymous — the dataset curators applied PCA for confidentiality. Each V-component encodes a linear combination of original transaction attributes. We classify them collectively as BehavioralContext rather than attempting to reverse-engineer individual semantics, because the ontology should describe what the features represent (behavioral patterns), not their mathematical form (principal components).

RiskIndicator (0130) models the evidence supporting a risk classification. These correspond to interpretable signals that a human analyst would recognize:

IndicatorWhat it capturesDataset proxy
AmountAnomalyUnusually large or small transactionExtreme values in Amount
VelocityAnomalyBurst of transactions in short window(requires Time; not used in current model)
GeographicAnomalyTransaction from unexpected location(encoded in PCA components; not directly observable)
BehavioralAnomalyDeviation from established spending patternHigh-magnitude PCA components

Not all indicators are directly observable in the current dataset. The ontology defines them for completeness — a production fraud system would populate all four from raw transaction data.

Transaction Risk Levels

Risk level is modeled as a BFO quality (BFO:0000019) that inheres in a financial transaction record. This parallels the sensitivity levels in Signals-360 (SIGDG:10101040), but describes fraud risk rather than data sensitivity.

CURIECodeLabelDefinitionThreshold Guidance
SIGDG:10511051LowRiskTransaction consistent with all behavioral baselinesModel score ≤ 0.10
SIGDG:10521052ElevatedRiskTransaction deviates on one or more risk indicators0.10 < score ≤ 0.35
SIGDG:10531053HighRiskTransaction strongly suggests fraudulent intent0.35 < score ≤ 0.80
SIGDG:10541054ConfirmedFraudTransaction verified as unauthorized post-investigationscore > 0.80 or manual label

The current binary model applies a single threshold at 0.35 (the boundary between ElevatedRisk and HighRisk). The four-level scheme supports graduated response:

  • LowRisk: Approve automatically.
  • ElevatedRisk: Approve but flag for batch review.
  • HighRisk: Require step-up authentication or manual approval.
  • ConfirmedFraud: Block and initiate chargeback/investigation.

Relationship to Sensitivity Levels

Transaction risk levels and data sensitivity levels are orthogonal qualities that can co-occur on the same entity:

EntityRisk LevelSensitivity LevelImplication
Transaction record with PANHighRiskRestricted (PCI-DSS)Block transaction AND encrypt PAN at rest
Transaction record without PIILowRiskInternalApprove; standard access controls
Aggregated fraud statisticsN/AConfidentialNo per-transaction risk; protect business intelligence

Regulatory Framework Roles

The same transaction may be subject to multiple regulatory frameworks simultaneously. These are modeled as BFO roles (BFO:0000023) — externally grounded in the legal/regulatory context rather than intrinsic to the data.

CURIECodeLabelDefinition
SIGDG:20412041PCIDSSScopeTransaction involves payment card data; subject to PCI-DSS requirements
SIGDG:20422042AMLKYCScopeTransaction subject to Anti-Money Laundering / Know Your Customer rules
SIGDG:20432043EMVLiabilityScopeTransaction subject to EMV chip liability shift rules

A single transaction can carry all three roles. The fraud detection model’s output feeds into each framework differently:

  • PCI-DSS (2041): A HighRisk classification triggers additional logging and encryption requirements for the transaction’s card data fields.
  • AML/KYC (2042): Patterns of ElevatedRisk transactions across accounts trigger Suspicious Activity Report (SAR) filing obligations.
  • EMV Liability (2043): The party that failed to support chip-based authentication bears fraud liability; the risk classification determines which party initiates the chargeback.

Fraud Detection Processes

Classification and model maintenance are modeled as BFO processes (BFO:0000015):

CURIECodeLabelDefinition
SIGDG:30413041RealTimeScoringSub-second risk scoring at transaction authorization time
SIGDG:30423042BatchRetrainingPeriodic model retraining on accumulated labeled transactions
SIGDG:30433043ThresholdCalibrationAdjusting the decision boundary to optimize precision-recall tradeoff

In the current AMP implementation:

  • RealTimeScoring corresponds to the predict_fraud(args) model endpoint.
  • BatchRetraining corresponds to the notebook-driven distributed XGBoost/CatBoost training pipeline.
  • ThresholdCalibration corresponds to the precision-recall curve analysis that selected the 0.35 threshold.

Mapping to the Credit Card Fraud Dataset

The Kaggle credit card fraud dataset maps to this ontology as follows:

Class Label Mapping

The binary Class column in the dataset maps to the first two children of TransactionRiskClassification:

Class ValueSIGDG CategoryCountRate
0SIGDG:0111 LegitimateTransaction94,77799.84%
1SIGDG:0112 FraudulentTransaction1490.16%

The fraud subtypes (01130116) are not distinguishable in this dataset — the Kaggle data does not include fraud mechanism labels. A production deployment would label confirmed fraud cases with the appropriate subtype during investigation, enabling future multi-class models.

Extension Points

This ontology is intentionally minimal — it covers exactly what the current dataset and model support, with clearly marked placeholders for production extensions.

Near-term extensions (Phase 2–3 of CatBoost migration)

CategoryWhat it enables
MerchantContext (0124) subtypesMCC-based risk stratification when merchant data is available
Fraud subtype labels (01130116)Multi-class classification instead of binary
Additional RiskIndicator subtypesDevice fingerprint anomaly, IP geolocation anomaly

Cross-ontology composition with Signals-360

Because both ontologies share the SIGDG prefix and BFO grounding, they compose naturally. A governance policy can reference both:

“Any column classified as SIGDG:0070 PaymentCardData (from Signals-360) whose transaction records are scored SIGDG:1053 HighRisk (from this extension) must be encrypted at rest and trigger a Suspicious Activity Report.”

This is the integration that BFO’s top-level alignment makes possible — the column-level metadata classification from Signals and the row-level transaction risk classification from this project are two views of the same governed data, connected through a shared formal ontology.

CatBoost Migration Roadmap

This roadmap details how to migrate the distributed fraud detection pipeline from XGBoost to CatBoost, drawing on the production CatBoost patterns established in the Signals-360 metadata classification project. Signals uses CatBoost as one of five evidence sources in a Dempster-Shafer fusion pipeline for classifying database columns into a 174-category SIGDG taxonomy — a harder problem (extreme class imbalance, 2 samples per category) that validates CatBoost’s suitability for the comparatively straightforward binary fraud detection task here.

Why CatBoost

ConcernXGBoost (current)CatBoost (target)
Ordered boostingNot availableposterior_sampling=True — prevents prediction shift in low-data regimes
Categorical featuresRequires one-hot or label encodingNative categorical handling via cat_features parameter
RegularizationL1/L2 on weightsL2 on leaves + Bayesian priors via ordered boosting
GPU trainingtree_method="gpu_hist"task_type="GPU" with automatic multi-GPU
Dask integrationFirst-class (xgb.dask.train, DaskDMatrix)None — this is the primary migration challenge
Model formatBinary (.ubj)CatBoost binary (.cbm) + sidecar class mapping (.classes.json)
Feature importanceBuilt-in SHAPBuilt-in TreeSHAP via get_feature_importance(type="ShapValues")

Migration Phases


Phase 1: Drop-in Replacement

Goal: Replace XGBoost with CatBoost while keeping the existing Dask infrastructure for data loading and preprocessing. CatBoost trains on collected (non-distributed) data.

Architecture Change

The current pipeline uses Dask end-to-end: DaskDMatrixxgb.dask.train() → distributed predictions. CatBoost has no Dask integration, so the architecture splits into two stages: Dask for ETL, CatBoost for training.

Dependency Changes

# requirements.txt
- xgboost==1.6.1
+ catboost>=1.2.7
  dask[complete]==2022.5.1
  scikit-learn==1.0.2
  numpy==1.22.4

Training Code

Replace xgb.dask.train() with CatBoostClassifier.fit():

from catboost import CatBoostClassifier, Pool

# Collect Dask arrays to NumPy (data must fit in driver memory)
X_train_np = X_train.compute()
y_train_np = y_train.compute()
X_dev_np = X_dev.compute()
y_dev_np = y_dev.compute()

# Create CatBoost Pools
train_pool = Pool(X_train_np, y_train_np)
dev_pool = Pool(X_dev_np, y_dev_np)

cb = CatBoostClassifier(
    loss_function="Logloss",        # Binary classification
    depth=6,
    iterations=100,
    l2_leaf_reg=0.5,
    learning_rate=0.1,
    random_seed=42,
    verbose=10,
    eval_metric="PRAUC",            # Equivalent to XGBoost's aucpr
    auto_class_weights="Balanced",  # Handle 0.16% fraud imbalance
)

cb.fit(
    train_pool,
    eval_set=dev_pool,
    early_stopping_rounds=10,
)

Key differences from XGBoost:

  • Logloss replaces reg:logistic (binary classification)
  • PRAUC replaces aucpr (same metric, different name)
  • auto_class_weights="Balanced" addresses the 636:1 class imbalance natively — XGBoost required manual scale_pos_weight
  • early_stopping_rounds replaces fixed num_round=5
  • No tree_method needed — CatBoost uses symmetric trees by default

Inference Changes

# scripts/predict_fraud.py
import numpy as np
from catboost import CatBoostClassifier

booster = CatBoostClassifier()
booster.load_model('/home/cdsw/model/best-catboost-model.cbm')
threshold = 0.35

def predict_fraud(args):
    features = np.array(args['features'])
    prediction = booster.predict_proba(features)[:, 1]  # P(fraud)
    if prediction[0] <= threshold:
        return 0
    return 1

Key change: CatBoost’s predict_proba() returns a 2-column matrix [P(legit), P(fraud)]. Index [:, 1] extracts the fraud probability, replacing XGBoost’s inplace_predict() which returned a single value.

Model Serialization

# Save
cb.save_model("../model/best-catboost-model.cbm")

# Load
loaded = CatBoostClassifier()
loaded.load_model("../model/best-catboost-model.cbm")

The .cbm extension is CatBoost’s native binary format. Unlike XGBoost, the class mapping is embedded in the model file for binary classification, so no sidecar .classes.json is needed (that pattern from Signals is for multi-class).

What Stays the Same

  • Dask cluster orchestration (utils/dask_utils.py) — unchanged
  • Data loading (dd.read_csv()) — unchanged
  • Feature engineering (drop Time, StandardScaler) — unchanged
  • Train/dev/val split ratios — unchanged
  • Threshold selection logic — unchanged
  • CML AMP structure (.project-metadata.yaml, cdsw-build.sh) — unchanged

Phase 2: CatBoost-Native Features

Goal: Adopt CatBoost-specific capabilities that XGBoost lacks, following patterns proven in Signals.

Ordered Boosting

Signals uses posterior_sampling=True in all CatBoost configurations to prevent prediction shift — a form of target leakage where training samples influence their own gradient estimates. This is particularly valuable in the fraud dataset’s extreme imbalance regime.

cb = CatBoostClassifier(
    loss_function="Logloss",
    depth=8,                        # Deeper (Signals uses 8 for complex tasks)
    iterations=500,                 # More rounds (Signals uses 500)
    l2_leaf_reg=0.3,               # Lighter regularization
    learning_rate=0.08,            # Slower learning rate
    bootstrap_type="Bernoulli",    # Bernoulli subsampling
    subsample=0.8,                 # 80% per iteration
    posterior_sampling=True,       # Ordered boosting (CatBoost-unique)
    min_data_in_leaf=2,            # From Signals: critical for rare classes
    random_seed=42,
    eval_metric="PRAUC",
    auto_class_weights="Balanced",
)

GPU caveat (from Signals): posterior_sampling=True and rsm (random subspace method) are not supported on CatBoost GPU for classification. Training must run on CPU when these are enabled. Signals handles this by detecting GPU availability and forcing CPU when ordered boosting is active.

GPU Acceleration

For configurations that don’t use ordered boosting (e.g., initial exploration):

def _catboost_gpu_kwargs(devices=None):
    """GPU detection pattern from Signals."""
    import torch
    if torch.cuda.is_available():
        n = torch.cuda.device_count()
        if devices is None:
            devices = ":".join(str(i) for i in range(n))
        return {"task_type": "GPU", "devices": devices}
    return {}

cb = CatBoostClassifier(
    **_catboost_gpu_kwargs(),
    # ... other params (without posterior_sampling)
)

TreeSHAP Feature Importance

Signals uses CatBoost’s built-in TreeSHAP for per-prediction explanations. Apply the same pattern for fraud detection:

from catboost import Pool

pool = Pool(X_val_np)
shap_values = cb.get_feature_importance(
    type="ShapValues",
    data=pool,
)
# shap_values shape: (n_samples, n_features + 1)
# Last column is the base value (bias term)
feature_shap = shap_values[:, :-1]  # Per-sample, per-feature importance

This replaces the need for a separate SHAP library and is much faster for tree models.

Hyperparameter Search Space

Adapted from XGBoost space with CatBoost equivalents:

XGBoost ParameterCatBoost EquivalentRecommended Range
learning_ratelearning_rate[0.01, 0.3]
gammamin_data_in_leaf[1, 20] (integer)
max_depthdepth[4, 10]
min_child_weightmin_data_in_leaf(merged with gamma)
max_delta_step(not needed)
subsamplesubsample[0.6, 1.0]
lambda (L2)l2_leaf_reg[0.1, 10] (log-uniform)
alpha (L1)(not directly available)

CatBoost-specific parameters to add:

ParameterRangeDescription
random_strength[0.1, 10]Score randomization at each split
bagging_temperature[0, 5]Bayesian bootstrap intensity
border_count[32, 255]Number of splits per feature

Phase 3: Distributed Strategy

Goal: Restore memory-constrained distributed training capability without Dask integration.

CatBoost does not support Dask. Three strategies can replace xgb.dask.train() for datasets that exceed driver memory:

Strategy A: Dask Preprocessing + Quantized Pools

Use Dask for data loading and preprocessing, then convert to CatBoost’s quantized pool format which uses ~8x less memory than raw floats:

# 1. Dask loads and preprocesses (distributed)
dask_df = dd.read_csv("large_dataset.csv", assume_missing=True)
dask_df = dask_df.drop(columns=["Time"])
# ... feature engineering on Dask ...

# 2. Save preprocessed partitions to disk
dask_df.to_parquet("/tmp/preprocessed/", engine="pyarrow")

# 3. CatBoost loads quantized (memory-efficient)
from catboost import Pool

train_pool = Pool(
    "/tmp/preprocessed/train.parquet",
    column_description="column_desc.cd",
)
# Quantize to ~8x memory reduction
train_pool.quantize()

Strategy B: CatBoost Multi-Node Training

CatBoost has its own distributed training mode via --node-count:

# On each CML worker:
catboost fit \
    --loss-function Logloss \
    --learn-set /data/train.tsv \
    --node-count 3 \
    --node-port 8788 \
    --file-with-hosts hosts.txt

This could be orchestrated via dask_utils.py’s worker-launching pattern — replace !dask-worker with !catboost fit --node-count N.

Strategy C: Partition-Level Ensembling

Train separate CatBoost models on each Dask partition, then ensemble predictions:

# On each Dask worker
def train_partition(partition_df):
    cb = CatBoostClassifier(...)
    cb.fit(partition_df[features], partition_df["Class"])
    return cb

# Collect models and average predictions
models = client.map(train_partition, dask_df.to_delayed())
predictions = np.mean([m.predict_proba(X_val) for m in models], axis=0)

This is the simplest approach but sacrifices some accuracy compared to true distributed training where the full dataset informs each tree.

For datasets up to ~10M rows: Strategy A (quantized pools). CatBoost’s quantization reduces a 10M × 29 float64 matrix from ~2.3 GB to ~290 MB, fitting comfortably in a single 4 GiB CML session.

For datasets beyond ~10M rows: Strategy B (multi-node). Requires modifying dask_utils.py to launch CatBoost worker processes instead of Dask workers.


Phase 4: Signals Patterns (Advanced)

Goal: Adopt advanced techniques from Signals that would improve fraud detection beyond what basic CatBoost provides.

Self-Training

Signals implements multi-round self-training where high-confidence predictions are injected back as training labels. This is valuable when labeled fraud data is scarce:

# Round 1: Train on labeled data
cb.fit(X_train, y_train)
proba = cb.predict_proba(X_unlabeled)

# Pseudo-label high-confidence predictions
confident_mask = proba.max(axis=1) >= 0.80  # Signals default threshold
pseudo_X = X_unlabeled[confident_mask]
pseudo_y = proba[confident_mask].argmax(axis=1)

# Round 2: Retrain with pseudo-labels added
X_aug = np.vstack([X_train, pseudo_X])
y_aug = np.concatenate([y_train, pseudo_y])
cb.fit(X_aug, y_aug)

Signals achieves 99.4% accuracy with self-training vs 81.6% without — though the fraud detection domain may see smaller gains since the signal-to-noise ratio in transaction features is higher than in column metadata.

Synthetic Data Augmentation

Signals generates 50 synthetic variants per category to handle classes with only 2 real samples. For fraud detection, a similar approach could generate synthetic fraud transactions:

  1. Fit a distribution to real fraud transactions (V1–V28 are already PCA-transformed, so multivariate normal is reasonable).
  2. Generate N synthetic fraud samples.
  3. Include synthetic samples in training with reduced sample weight.

Evidence Fusion

The Signals architecture combines five evidence sources via Dempster-Shafer theory. For fraud detection, the CatBoost model could become one source alongside:

SourceMass FunctionDiscount
CatBoost probabilitycatboost_to_mass()0.15 (Signals default for well-calibrated CatBoost)
Pattern detectionRule-based (card number regex, etc.)0.10 (high-reliability rules)
Amount anomalyZ-score of transaction amount0.30
Velocity checkTransactions per time window0.30

This produces belief intervals [Bel(fraud), Pl(fraud)] instead of a single confidence score — uncertainty-aware decisions where the gap between belief and plausibility flags transactions needing human review. The evidence sources and risk levels map to the SIGDG Financial Transaction Ontology, which provides BFO-grounded categories for transaction risk classification, regulatory scope, and fraud detection processes.


Migration Checklist

Phase 1 (minimum viable)

  • Replace xgboost with catboost in requirements.txt
  • Add .compute() calls to collect Dask arrays before training
  • Replace xgb.dask.train() with CatBoostClassifier.fit()
  • Replace xgb.dask.DaskDMatrix with catboost.Pool
  • Update predict_fraud.py: load_model() + predict_proba()[:, 1]
  • Update model path from best-xgboost-model to best-catboost-model.cbm
  • Update hyperparameter search space for CatBoost parameters
  • Update evaluation: CatBoost uses PRAUC not aucpr
  • Update cdsw-build.sh if dependency names changed
  • Run threshold selection on validation set (threshold may shift)

Phase 2 (CatBoost-native)

  • Enable posterior_sampling=True (ordered boosting)
  • Add auto_class_weights="Balanced" for imbalance handling
  • Add GPU detection with CPU fallback (from Signals)
  • Replace SHAP library with CatBoost’s built-in TreeSHAP
  • Tune CatBoost-specific parameters (random_strength, bagging_temperature)

Phase 3 (distributed)

  • Implement quantized Pool loading for large datasets
  • Or: modify dask_utils.py to launch CatBoost multi-node workers
  • Benchmark memory usage: quantized vs raw on target dataset size

Phase 4 (Signals patterns)

  • Implement self-training loop with confidence threshold
  • Evaluate synthetic fraud augmentation
  • Prototype evidence fusion with rule-based sources

Files Modified

FileChange
requirements.txtxgboostcatboost
notebooks/distributed-xgboost-with-dask.ipynbAll training/prediction cells
scripts/predict_fraud.pyModel loading and inference API
utils/dask_utils.pyNo change (Phase 1–2); modify for Phase 3B
.project-metadata.yamlNo change
cdsw-build.shNo change (dependencies installed via requirements.txt)