Introduction
This Applied ML Prototype (AMP) demonstrates distributed XGBoost training using Dask on Cloudera Machine Learning (CML). The use case is credit card fraud detection under the memory-constrained distributed computing paradigm — data is partitioned across worker nodes while the model is replicated, enabling training on datasets that exceed single-node memory. A Dask cluster is orchestrated on-demand via the CML Workers API, used for training, then torn down.
This guide serves two audiences:
| If you are… | Start here |
|---|---|
| Building a distributed ML pipeline on CML using Dask or a similar framework | Architecture Reference |
| Building a validation SDK or CI/CD pipeline for ML artifacts targeting CML | Data & Model Specification and Validation Rules |
Terminology
| Term | Definition |
|---|---|
| AMP | Applied ML Prototype — a portable, declarative CML project with a .project-metadata.yaml that automates setup, dependency installation, and deployment. |
| CML Workers API | cml.workers_v1 — the Python API for launching and managing compute worker sessions within a CML project. Used here to orchestrate Dask scheduler and worker processes. |
| Dask Cluster | An ephemeral distributed compute cluster consisting of a scheduler, one or more workers, and a client. Orchestrated on CML via the Workers API. |
| Dask Scheduler | A single CML worker process running dask-scheduler, responsible for coordinating task distribution among Dask workers. Listens on TCP port 8786. |
| Dask Worker | A CML worker process running dask-worker, executing distributed computation tasks assigned by the scheduler. |
| Dask Client | A Python object (dask.distributed.Client) instantiated in the notebook session that submits work to the scheduler. |
| DaskDMatrix | An XGBoost data structure (xgb.dask.DaskDMatrix) optimized for distributed training. Partitions data across the Dask cluster. |
| Model Endpoint | A CML-hosted REST endpoint that loads a serialized XGBoost model and serves predictions via a predict_fraud(args) function. |
| Booster | The trained XGBoost model object (xgb.core.Booster). Serialized to disk as a binary file and loaded at inference time. |
| AUCPR | Area Under the Precision-Recall Curve — the primary evaluation metric, chosen over accuracy due to extreme class imbalance (0.16% fraud rate). |
End-to-End Lifecycle
The upstream validation SDK’s responsibility ends at structural and contract validation — ensuring the project conforms to the structure documented in the Data & Model Specification and Deployment chapters. CML handles runtime orchestration and endpoint serving.
System Overview
The system comprises four layers: a JupyterLab session (user entry point), an ephemeral Dask cluster (distributed compute), a CML Model Endpoint (inference serving), and the CML platform services that orchestrate everything.
Repository Layout
.
├── .project-metadata.yaml AMP declarative config
├── cdsw-build.sh Model endpoint build script
├── requirements.txt Pinned dependencies (Dask 2022.5.1, XGBoost 1.6.1)
├── setup.py Local utils package (cdsw-dask-utils 0.1.0)
├── data/
│ └── creditcardsample.csv Sample dataset (94,926 rows × 31 columns)
├── model/
│ └── best-xgboost-model Serialized XGBoost Booster (binary)
├── notebooks/
│ ├── dask-intro.ipynb Dask concepts introduction
│ └── distributed-xgboost-with-dask.ipynb Full ML pipeline
├── scripts/
│ ├── install_dependencies.py Dependency installation (CML job)
│ └── predict_fraud.py Inference endpoint function
└── utils/
├── __init__.py
└── dask_utils.py Dask cluster orchestration
Design Principles
-
CML Workers as compute fabric. All distributed compute runs as CML worker sessions managed via
cml.workers_v1. No external cluster manager (YARN, Kubernetes scheduler) is required. -
Ephemeral clusters. The Dask cluster exists only for the duration of training. Workers are explicitly stopped after training completes, freeing CML resources.
-
Notebook-driven development. Notebooks are the primary deliverables. The utility library (
dask_utils.py) exists solely to simplify cluster orchestration from notebooks. -
Decoupled inference. The model endpoint uses standard (non-Dask) XGBoost calls. Inference does not require a distributed cluster — a single CML session loads the serialized Booster and applies a threshold.
-
Pinned dependencies. All library versions are locked in
requirements.txtto ensure reproducibility across CML deployments.
CML Workers API Integration
The cml.workers_v1 module is a Python API available inside CML sessions for launching and managing compute worker sessions. This project uses four API calls to orchestrate the Dask cluster.
Note:
cml.workers_v1is only available inside a running CML session. It cannot be imported in a local Python environment.
API Surface
launch_workers()
Launches one or more CML worker sessions, each executing the specified shell command.
workers_v1.launch_workers(
n=num_workers, # int — number of sessions to launch
cpu=cpu, # int — vCPUs per session
memory=memory, # int — GiB RAM per session
nvidia_gpu=0, # int — GPUs per session (optional)
code=code_string, # str — shell command to execute (prefixed with !)
)
Returns: A list of worker metadata dicts:
[{"id": "worker-id-string", "app_url": "https://..."}]
The code parameter uses CML’s shell escape syntax (!command). For the scheduler this is !dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{port}; for workers it is !dask-worker {scheduler_url}.
await_workers()
Blocks until launched workers are running or have failed.
workers_v1.await_workers(
workers, # list — return value from launch_workers()
wait_for_completion=False, # bool — False means return when workers are running (not finished)
timeout_seconds=90, # int — maximum wait time (scheduler only; workers use default)
)
Returns: A dict with a failures key:
{"failures": []} # empty list on success
If failures is non-empty, the cluster utility raises RuntimeError.
list_workers()
Returns metadata for all active CML worker sessions in the current project.
workers_v1.list_workers()
Returns: A list of worker metadata dicts:
[{"id": "worker-id-string", "ip_address": "100.66.x.x", ...}]
This is used by get_scheduler_url() to discover the scheduler’s IP address by matching worker IDs.
stop_workers()
Terminates one or more CML worker sessions by ID.
workers_v1.stop_workers(*worker_ids)
Used during cluster shutdown to stop all scheduler and worker sessions:
workers_v1.stop_workers(
*[w["id"] for w in cluster["scheduler"] + cluster["workers"]]
)
Parameters Used in This Project
| Parameter | Scheduler | Worker | Description |
|---|---|---|---|
n | 1 | User-specified (e.g., 2) | Number of CML worker sessions |
cpu | 1 | User-specified | vCPUs per session |
memory | 2 | User-specified | GiB RAM per session |
nvidia_gpu | 0 | 0 (configurable) | GPUs per session |
code | !dask-scheduler ... | !dask-worker {url} | Shell command executed in the session |
Environment Variables
| Variable | Source | Used By | Description |
|---|---|---|---|
CDSW_READONLY_PORT | CML runtime | dask_utils.py | Port for the Dask dashboard. Injected by CML into every session. |
Dask Cluster Lifecycle
The Dask cluster is orchestrated by utils/dask_utils.py, which wraps the CML Workers API into a simple startup/shutdown sequence. The cluster is ephemeral — created at the start of a training session and torn down when training completes.
Lifecycle Sequence
Function Specifications
run_dask_cluster(num_workers, cpu, memory, nvidia_gpu=0, dashboard_port=DASHBOARD_PORT)
Entry point. Orchestrates the full cluster creation sequence by calling the four functions below in order.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
num_workers | int | — | Number of Dask worker nodes |
cpu | int | — | vCPUs per worker |
memory | int | — | GiB RAM per worker |
nvidia_gpu | int | 0 | GPUs per worker |
dashboard_port | str | CDSW_READONLY_PORT | Port for the Dask dashboard |
Returns:
{
"scheduler": [{"id": str, "app_url": str}],
"workers": [{"id": str, "app_url": str}, ...],
"scheduler_address": "tcp://{ip}:8786",
"dashboard_address": "https://{domain}/status"
}
run_scheduler(dashboard_port, num_workers=1, cpu=1, memory=2)
Launches a single CML worker running dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:{dashboard_port}. Waits up to 90 seconds for the scheduler to become available. Raises RuntimeError if the scheduler fails to launch.
get_scheduler_url(dask_scheduler)
Queries workers_v1.list_workers() to find the scheduler’s IP address by matching worker IDs. Returns tcp://{ip}:8786.
get_dashboard_url(dask_scheduler)
Constructs the dashboard URL from the scheduler worker’s app_url field, appending /status.
run_dask_workers(scheduler_url, num_workers, cpu, memory, nvidia_gpu=0)
Launches N CML workers, each running dask-worker {scheduler_url}. Raises RuntimeError if any worker fails to launch.
Shutdown
After training completes, all cluster sessions must be explicitly stopped:
from cml import workers_v1
workers_v1.stop_workers(
*[w["id"] for w in dask_cluster["scheduler"] + dask_cluster["workers"]]
)
This frees CML compute resources. There is no automatic cleanup — if the notebook kernel dies without calling stop_workers(), the sessions remain running until they hit the CML idle timeout.
ML Pipeline Stages
The ML pipeline is implemented in notebooks/distributed-xgboost-with-dask.ipynb. It proceeds through five stages: data loading, feature engineering, train/dev/val split, model training (with baselines and distributed XGBoost), and model validation with threshold selection.
Stage 1: Data Loading
dask_df = dd.read_csv("../data/creditcardsample.csv", assume_missing=True)
Data is loaded as a lazy Dask DataFrame. No computation occurs until .compute() is called. The assume_missing=True flag tells Dask to treat ambiguous columns as nullable, avoiding type inference errors on partitioned data.
Input: data/creditcardsample.csv — 94,926 rows, 31 columns (Time, V1–V28, Amount, Class).
Stage 2: Feature Engineering
- Drop the
Timecolumn (not useful for point-like fraud predictions). - Separate
Classas the target variabley. - Remaining 29 columns (V1–V28, Amount) become the feature matrix
X. - Scale all features via
StandardScaler(zero mean, unit variance).
See Feature Engineering Contract for the exact transformation rules.
Stage 3: Train/Dev/Val Split
Two successive calls to dask_ml.model_selection.train_test_split(shuffle=True):
- Split into 70% training and 30% holdout.
- Split the 30% holdout into 20% development and 10% validation (relative to original).
All splits maintain the class balance (~0.16% fraud). Data remains as distributed Dask arrays throughout training — only the final validation set is converted to NumPy for threshold analysis.
Stage 4: Model Training
Baselines (single-node)
Two baseline models establish performance floors:
| Model | Method | Dev AUCPR |
|---|---|---|
| DummyClassifier (majority) | Always predicts non-fraud | ~0.50 |
| LogisticRegression | No regularization penalty, sklearn Pipeline with StandardScaler | ~0.75 |
Distributed XGBoost
- Convert Dask arrays to
DaskDMatrixobjects (memory-optimized for distributed training). - Train with
xgb.dask.train()using fixed parameters (tree_method=hist,objective=reg:logistic,eval_metric=aucpr). - Tune hyperparameters via sequential random search (20 samples from 8-dimensional search space).
- Select the best model by dev AUCPR.
See XGBoost Training Configuration and Hyperparameter Search Space for full parameter specifications.
Stage 5: Validation and Threshold Selection
The best model is evaluated on the held-out validation set (10% of data, never seen during training or hyperparameter selection).
- Compute continuous predictions via
xgb.dask.predict()on the validation Dask DataFrame, then convert to NumPy. - Calculate AUCPR on the validation set.
- Generate precision-recall curve to select a classification threshold.
- The threshold of 0.35 is chosen to balance precision and recall for the fraud detection use case.
- Serialize the best model:
results["best_model"].save_model("../model/best-xgboost-model")
See Model Serialization Format for details on the saved artifact.
Dataset Schema
The project uses a sample of the Kaggle credit card fraud dataset, curated by the Machine Learning Group at Université Libre de Bruxelles.
Raw Schema
The CSV file contains 31 columns and 94,926 rows.
| Column | Type | Range | Description |
|---|---|---|---|
Time | float64 | 0 – 172,792 | Seconds elapsed from the first transaction in the dataset. Dropped during feature engineering — not useful for point-like fraud predictions. |
V1 | float64 | ≈ −56 to 2 | PCA component 1 (privacy-preserving transformation applied by the dataset curators) |
V2 | float64 | ≈ −73 to 22 | PCA component 2 |
V3 | float64 | ≈ −48 to 10 | PCA component 3 |
V4 – V28 | float64 | varies | PCA components 4 through 28. Exact ranges vary per component. All are the result of a PCA transformation; original feature names are undisclosed for confidentiality. |
Amount | float64 | 0 – 25,691 | Transaction amount in original currency units |
Class | float64 | 0.0 or 1.0 | Target variable. 0 = legitimate transaction, 1 = fraudulent transaction. |
Class Distribution
| Class | Count | Percentage |
|---|---|---|
| 0 (legitimate) | 94,777 | 99.84% |
| 1 (fraud) | 149 | 0.16% |
The dataset is extremely imbalanced. This motivates the use of AUCPR (Area Under the Precision-Recall Curve) rather than accuracy as the primary evaluation metric.
Raw vs. Model-Input Schema
| Schema | Columns | Purpose |
|---|---|---|
| Raw (CSV) | 31: Time, V1–V28, Amount, Class | As loaded from data/creditcardsample.csv |
| Model input | 29: V1–V28, Amount | After dropping Time and separating Class as the target. Features are ordered alphabetically by column name (Amount, V1, V2, …, V28). |
| Model target | 1: Class | Binary fraud label |
The distinction between raw and model-input schema is critical for building a compatible inference client — the Model Endpoint Contract expects exactly 29 features in the model-input ordering.
Feature Engineering Contract
This page documents the exact transformations applied between raw CSV data and model input. A compatible implementation must reproduce these steps to generate features that the trained model can score correctly.
Transformation Steps
Step 1: Drop Time
dask_df = dask_df.drop(columns=["Time"])
The Time column represents seconds since the first transaction in the dataset. It is not useful for point-like fraud detection (individual transaction scoring) and is dropped before any further processing.
Step 2: Separate Target Variable
y = dask_df["Class"]
X = dask_df.drop(columns=["Class"])
After this step, X contains 29 feature columns and y contains the binary fraud label.
Step 3: Feature Ordering
The 29 columns in X are ordered alphabetically by column name:
Amount, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13,
V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26,
V27, V28
This ordering is inherited from the Dask DataFrame column order and is significant — the model endpoint receives features as a positional array, so the ordering must match exactly.
Step 4: StandardScaler
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
All 29 features are scaled to zero mean and unit variance. The PCA features (V1–V28) are already approximately centered by the PCA transformation, so the scaler primarily affects the Amount column, which has a much larger raw range (0 – 25,691).
Note: The scaler is fitted on the training split only, then applied to dev and validation splits. This prevents data leakage from the evaluation sets into the scaling parameters.
Summary
| Step | Input | Output | Notes |
|---|---|---|---|
| Drop Time | 31 columns | 30 columns | Removes non-predictive temporal feature |
| Separate Class | 30 columns | 29 features + 1 target | Class becomes the label y |
| StandardScaler | 29 features (raw scale) | 29 features (zero mean, unit variance) | Fitted on training split only |
XGBoost Training Configuration
This page documents the fixed XGBoost parameters used during both initial training and hyperparameter tuning.
Fixed Parameters
| Parameter | Value | Rationale |
|---|---|---|
tree_method | hist | Histogram-based tree construction. Required for distributed training with Dask — the exact method does not support distribution. |
objective | reg:logistic | Outputs a continuous probability in [0.0, 1.0], suitable for threshold-based binary classification. |
eval_metric | aucpr | Area Under the Precision-Recall Curve. Appropriate for highly imbalanced datasets where accuracy is misleading (a majority-class classifier achieves 99.84%). |
verbosity | 2 | Detailed logging during training. |
num_round | 5 | Number of boosting rounds. Kept low for the sample dataset; increase for larger data. |
Evaluation Protocol
Both training and development sets are monitored per boosting round:
eval_list = [(ddev, "dev"), (dtrain, "train")]
The dev AUCPR at the final boosting round is the primary metric for model selection during hyperparameter tuning.
Training API
Training uses the Dask-aware XGBoost API, not the scikit-learn wrapper:
import xgboost as xgb
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
ddev = xgb.dask.DaskDMatrix(client, X_dev, y_dev)
result = xgb.dask.train(
client,
params, # dict of fixed + tunable parameters
dtrain,
num_boost_round=5,
evals=[(ddev, "dev"), (dtrain, "train")],
)
Return Value
xgb.dask.train() returns a dict with two keys:
{
"booster": xgb.core.Booster, # trained model
"history": {
"dev": OrderedDict([("aucpr", [0.647, 0.822, ...])]),
"train": OrderedDict([("aucpr", [0.806, 0.846, ...])])
}
}
booster— the trained XGBoost model, anxgb.core.Boosterobject (not a scikit-learn estimator).history— per-round evaluation metrics for each dataset inevals. The list length equalsnum_boost_round.
Distributed Prediction
predictions = xgb.dask.predict(client, booster, dask_dataframe)
Returns predictions distributed across the Dask cluster. Convert to NumPy with .compute() for local analysis.
For non-distributed inference (model endpoint), use booster.inplace_predict(numpy_array) instead. See Model Endpoint Contract.
Hyperparameter Search Space
Hyperparameter tuning uses a sequential random search over eight XGBoost parameters. The search is sequential (not parallel) because the Dask cluster is already fully occupied with distributed data — each trial trains a complete distributed model.
Search Space
| Parameter | Distribution | Range | Description |
|---|---|---|---|
learning_rate | Uniform | [0, 1] | Step size shrinkage applied after each boosting round to prevent overfitting |
gamma | Log-Uniform | [1×10⁻⁶, 10] | Minimum loss reduction required to make a further partition on a leaf node |
max_depth | Uniform Integer | [1, 20) | Maximum depth of each tree |
min_child_weight | Uniform | [0, 10] | Minimum sum of instance weight (hessian) in a child node |
max_delta_step | Uniform | [0, 10] | Maximum delta step allowed for each tree’s weight estimation |
subsample | Uniform | [0, 1] | Fraction of training instances sampled per tree |
lambda | Uniform | [0, 1] | L2 regularization term on weights |
alpha | Uniform | [0, 1] | L1 regularization term on weights |
Search Strategy
from sklearn.model_selection import ParameterSampler
from scipy.stats import uniform, loguniform, randint
search_space = {
"learning_rate": uniform(0, 1),
"gamma": loguniform(1e-6, 1e+1),
"max_depth": randint(1, 20),
"min_child_weight": uniform(0, 10),
"max_delta_step": uniform(0, 10),
"subsample": uniform(0, 1),
"lambda": uniform(0, 1),
"alpha": uniform(0, 1),
}
sampler = ParameterSampler(search_space, n_iter=20, random_state=42)
Each sample is merged with the fixed parameters and used to train a complete distributed XGBoost model. The model with the highest dev AUCPR is selected.
To switch to grid search, replace ParameterSampler with sklearn.model_selection.ParameterGrid and provide discrete values instead of distributions.
tune_xgboost() Contract
The tuning function is defined inline in the notebook. Its effective signature and return value are:
def tune_xgboost(client, dtrain, params, search_space, num_samples,
random_state=42) -> dict:
"""
Returns:
{
"best_model": xgb.core.Booster, # best model by dev AUCPR
"best_params": str, # string repr of best parameter dict
"best_score": float, # best dev AUCPR
"hp_history": pd.DataFrame, # all trials with parameters and scores
}
"""
Reference Results (Sample Dataset)
These results are from the 94,926-row sample dataset with 20 random search samples:
| Metric | Value |
|---|---|
| Best dev AUCPR (tuning) | 0.8040 |
| Validation AUCPR (holdout) | 0.9325 |
| Default model dev AUCPR (no tuning) | 0.8402 |
Results will vary with different random seeds and with the full Kaggle dataset.
Model Serialization Format
Serialization
The best model from hyperparameter tuning is saved using XGBoost’s native serialization:
results["best_model"].save_model("../model/best-xgboost-model")
| Property | Value |
|---|---|
| Path | model/best-xgboost-model |
| CML path | /home/cdsw/model/best-xgboost-model |
| Format | XGBoost binary (default when no file extension is provided) |
| Type | xgb.core.Booster — the raw XGBoost model, not a scikit-learn wrapper |
Deserialization
import xgboost as xgb
booster = xgb.Booster(model_file="/home/cdsw/model/best-xgboost-model")
The model is loaded at module scope in scripts/predict_fraud.py, meaning it is loaded once when the CML Model Endpoint starts and reused for all subsequent prediction requests.
Inference Methods
| Method | Context | Input Type | Output |
|---|---|---|---|
booster.inplace_predict(np.array(...)) | Production (model endpoint) | NumPy array | Float array, each value in [0.0, 1.0] |
xgb.dask.predict(client, booster, dask_df) | Training (notebook) | Dask DataFrame | Distributed predictions |
The model endpoint uses inplace_predict because inference is single-node — no Dask cluster is required at serving time.
Threshold Contract
The model outputs a continuous probability. A threshold is applied to produce a binary classification:
| Condition | Output | Meaning |
|---|---|---|
prediction[0] <= 0.35 | 0 | Not fraud |
prediction[0] > 0.35 | 1 | Fraud |
The threshold of 0.35 is hardcoded in scripts/predict_fraud.py. It was selected by analyzing the precision-recall curve on the validation set — it produces a fraud prediction rate approximately matching the base rate (~0.16%). The optimal threshold depends on the business use case (cost of false positives vs. false negatives) and should be re-evaluated when training on new data.
Compatibility Notes
- The serialized model requires
xgboost >= 1.6.1to load. Older versions may fail to deserialize. - The model expects exactly 29 input features in the order defined by the Feature Engineering Contract.
- The model was trained with
tree_method=histandobjective=reg:logistic. These are embedded in the serialized model and do not need to be specified at inference time.
AMP Project Structure
An Applied ML Prototype (AMP) is a portable, declarative CML project. Its behavior is defined by .project-metadata.yaml, which tells CML how to set up the runtime, install dependencies, and configure resources.
.project-metadata.yaml Specification
| Field | Value | Description |
|---|---|---|
name | Distributed XGBoost with Dask on CML | Display name in the AMP catalog |
specification_version | 1.0 | AMP specification version |
prototype_version | 1.0 | Project version |
date | 2022-07-30 | Publication date |
Runtime
runtimes:
- editor: JupyterLab
kernel: Python 3.9
edition: Standard
The project requires a JupyterLab editor with a Python 3.9+ kernel. The Standard edition provides the base CML runtime without GPU drivers.
Tasks
The AMP defines two tasks that run automatically during project setup:
| Task | Type | Script | Resources | Description |
|---|---|---|---|---|
| Install Dependencies | create_job | scripts/install_dependencies.py | 1 vCPU, 2 GiB | Creates a CML job for dependency installation |
| (same) | run_job | — | (inherited) | Executes the dependency installation job |
The install_dependencies.py script uses CML’s shell escape syntax:
!pip3 install -r requirements.txt
Note: This is not valid standalone Python. The
!prefix is a CML session feature that executes shell commands.
Build Script
cdsw-build.sh is the build script for CML Model Endpoints. It runs during endpoint deployment to install runtime dependencies:
pip3 install -r requirements.txt
This ensures the inference script (scripts/predict_fraud.py) has access to xgboost and numpy at serving time.
Deployment Methods
There are three ways to deploy this AMP on CML:
-
AMP Catalog — Navigate to the AMP Catalog in a CML workspace, select the “Distributed XGBoost with Dask on CML” tile, and follow the setup wizard.
-
AMP Prototype URL — In a CML workspace, create a new project with “AMP” as the initial setup option and provide the Git repository URL.
-
Manual Git Clone — Create a new project with “Git” as the initial setup option. In this case, run
!pip install -r requirements.txtmanually in a JupyterLab session before executing the notebooks.
After deployment, run either notebook by starting a Python 3.9+ JupyterLab session with at least 1 vCPU / 2 GiB.
Model Endpoint Contract
This is the authoritative specification for the inference API. An external client must conform to this contract to receive correct predictions from the deployed CML Model Endpoint.
Function Signature
def predict_fraud(args) -> int
CML invokes this function for each prediction request, passing the request body as args.
Input Contract
{
"features": [[<29 float values>]]
}
The features value is a list containing a single list of 29 floats. The feature order must match the training schema — alphabetical by column name:
Amount, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13,
V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26,
V27, V28
See Feature Engineering Contract for details on how these features are derived from raw transaction data.
Output Contract
Returns an integer:
| Value | Meaning |
|---|---|
0 | Not fraud |
1 | Fraud |
Internal Logic
-
Module load (once, at endpoint startup):
booster = xgb.Booster(model_file='/home/cdsw/model/best-xgboost-model') threshold = 0.35 -
Per-request:
prediction = booster.inplace_predict(np.array(args['features'])) if prediction[0] <= threshold: return 0 # not fraud return 1 # fraud
The model outputs a continuous probability in [0.0, 1.0]. The threshold of 0.35 converts this to a binary classification.
Sample Request
{
"features": [[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
-0.33832077, 0.462387778, 0.239598554, 0.0986979013,
0.36378697, 0.090794172, -0.551599533, -0.617800856,
-0.991389847, -0.311169354, 1.46817697, -0.470400525,
0.207971242, 0.0257905802, 0.40399296, 0.251412098,
-0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
0.128539358, -0.189114844, 0.133558377, -0.0210530535,
149.62]]
}
Sample Response
0
CML Model Endpoint Configuration
| Setting | Value |
|---|---|
| Build File | cdsw-build.sh |
| Target File | scripts/predict_fraud.py |
| Function | predict_fraud |
The build file runs pip3 install -r requirements.txt to ensure xgboost and numpy are available. The target file loads the model at import time and exposes the predict_fraud function for CML to invoke.
Runtime Configuration
Environment Variables
| Variable | Source | Used By | Description |
|---|---|---|---|
CDSW_READONLY_PORT | CML runtime | utils/dask_utils.py | Port for the Dask dashboard. Injected by CML into every session automatically. |
No additional environment variables are required. The project does not use API keys, database connections, or external service credentials.
Session Resources
Notebook Session
The JupyterLab session from which you run the notebooks:
| Resource | Minimum | Recommended |
|---|---|---|
| vCPU | 1 | 1 |
| Memory | 2 GiB | 2 GiB |
| GPU | 0 | 0 |
Dask Cluster
Resources allocated via run_dask_cluster(). Each component runs as a separate CML worker session.
| Component | Count | CPU | Memory | GPU |
|---|---|---|---|---|
| Scheduler | 1 | 1 vCPU | 2 GiB | 0 |
| Workers | User-specified (default: 2) | User-specified | User-specified | Optional |
Total footprint with 2 workers: 4 CML sessions (1 notebook + 1 scheduler + 2 workers), 4 vCPU, 8 GiB RAM.
The scheduler always uses 1 vCPU / 2 GiB. Worker resources are passed as arguments to run_dask_cluster() and can be scaled up for larger datasets.
Pinned Dependencies
All library versions are locked in requirements.txt:
| Package | Version | Role |
|---|---|---|
dask[complete] | 2022.5.1 | Distributed computing framework (all optional dependencies included) |
dask-ml | 2022.1.22 | Machine learning extensions for Dask (train/test split, etc.) |
matplotlib | 3.3.4 | Plotting (precision-recall curves, class distributions) |
numpy | 1.22.4 | Numerical arrays (inference input/output) |
pandas | 1.4.2 | DataFrames (hyperparameter history, data exploration) |
scikit-learn | 1.0.2 | Baseline models, preprocessing (StandardScaler), metrics |
scipy | 1.8.1 | Statistical distributions for hyperparameter search |
seaborn | 0.11.2 | Statistical visualization |
tqdm | 4.64.0 | Progress bars for hyperparameter search |
xgboost | 1.6.1 | Gradient boosted trees (distributed training and inference) |
-e . | (local) | Installs the utils package from setup.py in editable mode |
Local Package
The -e . entry installs cdsw-dask-utils (version 0.1.0) from setup.py. This makes utils.dask_utils importable as a regular Python package. The editable install means changes to dask_utils.py take effect immediately without reinstallation.
Validation Rules Reference
This page defines the complete set of validation rules for a CML AMP project conforming to the Distributed XGBoost with Dask pattern. Rules are categorized by domain and severity.
Severity levels:
- ERROR — the artifact is invalid and will fail at runtime.
- WARNING — the artifact may work but deviates from the expected contract.
Structural Rules
| Rule | Severity | Description |
|---|---|---|
S-001 | ERROR | Repository must contain .project-metadata.yaml at root |
S-002 | ERROR | .project-metadata.yaml must parse as valid YAML |
S-003 | ERROR | requirements.txt must exist at root |
S-004 | ERROR | scripts/predict_fraud.py must exist |
S-005 | ERROR | model/best-xgboost-model must exist (post-training only) |
S-006 | ERROR | cdsw-build.sh must exist |
S-007 | ERROR | utils/dask_utils.py must exist |
S-008 | ERROR | setup.py must exist at root |
Dependency Rules
| Rule | Severity | Description |
|---|---|---|
D-001 | ERROR | requirements.txt must include xgboost |
D-002 | ERROR | requirements.txt must include dask (with or without [complete] extra) |
D-003 | ERROR | requirements.txt must include scikit-learn |
D-004 | ERROR | requirements.txt must include -e . (local utils package) |
D-005 | ERROR | requirements.txt must include numpy |
D-W01 | WARNING | All packages should have pinned versions (==) for reproducibility |
Model Endpoint Rules
| Rule | Severity | Description |
|---|---|---|
E-001 | ERROR | scripts/predict_fraud.py must parse as valid Python (no syntax errors) |
E-002 | ERROR | scripts/predict_fraud.py must define a function named predict_fraud |
E-003 | ERROR | predict_fraud must accept exactly one parameter (args) |
E-004 | ERROR | Module must load a model from /home/cdsw/model/best-xgboost-model |
E-005 | ERROR | Module must define a threshold variable |
E-W01 | WARNING | threshold should be a float between 0.0 and 1.0 |
E-W02 | WARNING | predict_fraud should return an integer (0 or 1) |
AMP Configuration Rules
| Rule | Severity | Description |
|---|---|---|
A-001 | ERROR | .project-metadata.yaml must contain a runtimes list with at least one entry |
A-002 | ERROR | Runtime kernel must specify Python 3.9 or higher |
A-003 | ERROR | Runtime editor must be JupyterLab |
A-004 | ERROR | At least one task must reference scripts/install_dependencies.py |
A-W01 | WARNING | specification_version should be 1.0 |
Cluster Utility Rules
| Rule | Severity | Description |
|---|---|---|
C-W01 | WARNING | utils/dask_utils.py should define run_dask_cluster |
C-W02 | WARNING | run_dask_cluster should accept num_workers, cpu, memory parameters |
C-W03 | WARNING | run_dask_cluster should return a dict with keys: scheduler, workers, scheduler_address, dashboard_address |
C-W04 | WARNING | Scheduler should listen on TCP port 8786 |
Build Script Rules
| Rule | Severity | Description |
|---|---|---|
B-001 | ERROR | cdsw-build.sh must be a valid shell script (starts with a command, not a syntax error) |
B-W01 | WARNING | cdsw-build.sh should install dependencies from requirements.txt |
Building a Validation SDK
This guide walks through building a Python SDK that validates a CML AMP project against the Validation Rules Reference. The SDK can be used standalone or integrated into a CI/CD pipeline.
Data Model
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
class Severity(Enum):
ERROR = "error"
WARNING = "warning"
@dataclass
class ValidationIssue:
rule: str # e.g., "S-001"
severity: Severity
message: str # human-readable description
path: str | None = None # file path that triggered the issue
@dataclass
class ValidationResult:
issues: list[ValidationIssue] = field(default_factory=list)
@property
def passed(self) -> bool:
return not any(i.severity == Severity.ERROR for i in self.issues)
@property
def errors(self) -> list[ValidationIssue]:
return [i for i in self.issues if i.severity == Severity.ERROR]
@property
def warnings(self) -> list[ValidationIssue]:
return [i for i in self.issues if i.severity == Severity.WARNING]
Validation Pipeline
The SDK runs five validation steps in sequence. Each step appends issues to the shared ValidationResult.
Step 1: Check Repository Structure (S-rules)
def validate_structure(root: Path, result: ValidationResult) -> None:
required_files = {
"S-001": ".project-metadata.yaml",
"S-003": "requirements.txt",
"S-004": "scripts/predict_fraud.py",
"S-006": "cdsw-build.sh",
"S-007": "utils/dask_utils.py",
"S-008": "setup.py",
}
for rule, path in required_files.items():
if not (root / path).exists():
result.issues.append(ValidationIssue(
rule=rule,
severity=Severity.ERROR,
message=f"Required file missing: {path}",
path=path,
))
Rule S-005 (model/best-xgboost-model) should only be checked post-training. Pass a flag to control this:
if check_model and not (root / "model" / "best-xgboost-model").exists():
result.issues.append(ValidationIssue(
rule="S-005",
severity=Severity.ERROR,
message="Trained model missing: model/best-xgboost-model",
path="model/best-xgboost-model",
))
Step 2: Parse AMP Configuration (A-rules)
import yaml
def validate_amp_config(root: Path, result: ValidationResult) -> None:
config_path = root / ".project-metadata.yaml"
try:
config = yaml.safe_load(config_path.read_text())
except yaml.YAMLError:
result.issues.append(ValidationIssue(
rule="S-002", severity=Severity.ERROR,
message=".project-metadata.yaml is not valid YAML",
path=".project-metadata.yaml",
))
return
# A-001: runtimes must exist
runtimes = config.get("runtimes", [])
if not runtimes:
result.issues.append(ValidationIssue(
rule="A-001", severity=Severity.ERROR,
message="No runtimes defined in .project-metadata.yaml",
))
return
runtime = runtimes[0]
# A-002: Python 3.9+
kernel = runtime.get("kernel", "")
if "Python" not in kernel:
result.issues.append(ValidationIssue(
rule="A-002", severity=Severity.ERROR,
message=f"Runtime kernel must be Python 3.9+, got: {kernel}",
))
else:
try:
version = float(kernel.split("Python")[1].strip())
if version < 3.9:
result.issues.append(ValidationIssue(
rule="A-002", severity=Severity.ERROR,
message=f"Python version must be >= 3.9, got: {version}",
))
except (ValueError, IndexError):
pass
# A-003: JupyterLab
if runtime.get("editor") != "JupyterLab":
result.issues.append(ValidationIssue(
rule="A-003", severity=Severity.ERROR,
message=f"Runtime editor must be JupyterLab, got: {runtime.get('editor')}",
))
Step 3: Validate Dependencies (D-rules)
def validate_dependencies(root: Path, result: ValidationResult) -> None:
req_path = root / "requirements.txt"
if not req_path.exists():
return # already caught by S-003
content = req_path.read_text().lower()
checks = {
"D-001": "xgboost",
"D-002": "dask",
"D-003": "scikit-learn",
"D-005": "numpy",
}
for rule, pkg in checks.items():
if pkg not in content:
result.issues.append(ValidationIssue(
rule=rule, severity=Severity.ERROR,
message=f"Required package missing from requirements.txt: {pkg}",
path="requirements.txt",
))
if "-e ." not in content and "-e." not in content:
result.issues.append(ValidationIssue(
rule="D-004", severity=Severity.ERROR,
message="Local package install (-e .) missing from requirements.txt",
path="requirements.txt",
))
Step 4: Validate Endpoint Contract (E-rules)
import ast
def validate_endpoint(root: Path, result: ValidationResult) -> None:
script_path = root / "scripts" / "predict_fraud.py"
if not script_path.exists():
return # already caught by S-004
source = script_path.read_text()
# E-001: valid Python
try:
tree = ast.parse(source)
except SyntaxError as e:
result.issues.append(ValidationIssue(
rule="E-001", severity=Severity.ERROR,
message=f"Syntax error in predict_fraud.py: {e}",
path="scripts/predict_fraud.py",
))
return
# E-002 & E-003: function exists with correct signature
func = None
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == "predict_fraud":
func = node
break
if func is None:
result.issues.append(ValidationIssue(
rule="E-002", severity=Severity.ERROR,
message="Function 'predict_fraud' not found in predict_fraud.py",
path="scripts/predict_fraud.py",
))
return
args = func.args
total_args = len(args.args) - len(args.defaults) # positional-only
if len(args.args) != 1:
result.issues.append(ValidationIssue(
rule="E-003", severity=Severity.ERROR,
message=f"predict_fraud must accept exactly 1 parameter, found {len(args.args)}",
path="scripts/predict_fraud.py",
))
# E-004 & E-005: check for model path and threshold in source
if "best-xgboost-model" not in source:
result.issues.append(ValidationIssue(
rule="E-004", severity=Severity.ERROR,
message="Model path '/home/cdsw/model/best-xgboost-model' not found",
path="scripts/predict_fraud.py",
))
if "threshold" not in source:
result.issues.append(ValidationIssue(
rule="E-005", severity=Severity.ERROR,
message="'threshold' variable not found in predict_fraud.py",
path="scripts/predict_fraud.py",
))
Step 5: Validate Cluster Utilities (C-rules)
def validate_cluster_utils(root: Path, result: ValidationResult) -> None:
utils_path = root / "utils" / "dask_utils.py"
if not utils_path.exists():
return # already caught by S-007
source = utils_path.read_text()
try:
tree = ast.parse(source)
except SyntaxError:
return
func_names = {
node.name for node in ast.walk(tree)
if isinstance(node, ast.FunctionDef)
}
if "run_dask_cluster" not in func_names:
result.issues.append(ValidationIssue(
rule="C-W01", severity=Severity.WARNING,
message="Function 'run_dask_cluster' not found in dask_utils.py",
path="utils/dask_utils.py",
))
if "8786" not in source:
result.issues.append(ValidationIssue(
rule="C-W04", severity=Severity.WARNING,
message="Scheduler port 8786 not referenced in dask_utils.py",
path="utils/dask_utils.py",
))
Running the SDK
from pathlib import Path
def validate(project_root: str, check_model: bool = False) -> ValidationResult:
root = Path(project_root)
result = ValidationResult()
validate_structure(root, result)
validate_amp_config(root, result)
validate_dependencies(root, result)
validate_endpoint(root, result)
validate_cluster_utils(root, result)
return result
if __name__ == "__main__":
result = validate(".")
for issue in result.issues:
print(f"[{issue.severity.value.upper()}] {issue.rule}: {issue.message}")
if result.passed:
print("\nValidation passed.")
else:
print(f"\nValidation failed with {len(result.errors)} error(s).")
raise SystemExit(1)
Optional: Model Smoke Test
If the trained model is available, you can add a smoke test that loads it and runs inference with sample data:
def smoke_test_model(root: Path, result: ValidationResult) -> None:
import numpy as np
import xgboost as xgb
model_path = root / "model" / "best-xgboost-model"
booster = xgb.Booster(model_file=str(model_path))
sample = np.array([[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
-0.33832077, 0.462387778, 0.239598554, 0.0986979013,
0.36378697, 0.090794172, -0.551599533, -0.617800856,
-0.991389847, -0.311169354, 1.46817697, -0.470400525,
0.207971242, 0.0257905802, 0.40399296, 0.251412098,
-0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
0.128539358, -0.189114844, 0.133558377, -0.0210530535,
149.62]])
prediction = booster.inplace_predict(sample)
assert 0.0 <= prediction[0] <= 1.0, f"Prediction out of range: {prediction[0]}"
binary = 0 if prediction[0] <= 0.35 else 1
assert binary in (0, 1)
GitHub Actions Integration
This page provides a GitHub Actions workflow for validating the AMP project structure and optionally running a model smoke test in CI.
Validation Workflow
name: Validate AMP
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
validate:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.9"
- name: Install validation dependencies
run: pip install pyyaml
- name: Run structural validation
run: python scripts/validate_amp.py
smoke-test:
runs-on: ubuntu-latest
if: hashFiles('model/best-xgboost-model') != ''
needs: validate
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.9"
- name: Install dependencies
run: |
pip install numpy xgboost
- name: Run model smoke test
run: |
python -c "
import numpy as np
import xgboost as xgb
booster = xgb.Booster(model_file='model/best-xgboost-model')
sample = np.array([[-1.35980713, -0.0727811733, 2.53634674, 1.37815522,
-0.33832077, 0.462387778, 0.239598554, 0.0986979013,
0.36378697, 0.090794172, -0.551599533, -0.617800856,
-0.991389847, -0.311169354, 1.46817697, -0.470400525,
0.207971242, 0.0257905802, 0.40399296, 0.251412098,
-0.0183067779, 0.277837576, -0.11047391, 0.0669280749,
0.128539358, -0.189114844, 0.133558377, -0.0210530535,
149.62]])
prediction = booster.inplace_predict(sample)
assert 0.0 <= prediction[0] <= 1.0, f'Out of range: {prediction[0]}'
binary = 0 if prediction[0] <= 0.35 else 1
assert binary in (0, 1), f'Unexpected output: {binary}'
print(f'Prediction: {prediction[0]:.4f} -> {binary}')
print('Smoke test passed.')
"
Workflow Details
validate job
Runs on every push and pull request to master. Requires only pyyaml (no heavy ML dependencies). Executes a validation script that checks:
- Repository structure (S-rules)
- AMP configuration (A-rules)
- Dependency manifest (D-rules)
- Endpoint contract via AST parsing (E-rules)
- Cluster utility structure (C-rules)
smoke-test job
Runs only when the trained model file exists in the repository (model/best-xgboost-model). Installs numpy and xgboost, loads the model, runs a sample prediction, and asserts the output is valid.
This job is conditional — it will be skipped if the model has not been committed (e.g., during initial development before training).
Setting Up the Validation Script
The workflow assumes scripts/validate_amp.py exists. Create this file using the SDK code from Building a Validation SDK:
# scripts/validate_amp.py
# Paste the ValidationResult, ValidationIssue, Severity classes
# and all validate_* functions from the SDK guide,
# then add the __main__ block:
if __name__ == "__main__":
result = validate(".")
for issue in result.issues:
print(f"[{issue.severity.value.upper()}] {issue.rule}: {issue.message}")
if not result.passed:
raise SystemExit(1)
print("All validation checks passed.")
Extending the Workflow
To add custom validation rules:
- Define a new rule ID (e.g.,
X-001) and severity in the Validation Rules Reference. - Add a
validate_custom()function to the SDK. - Call it from the
validate()entry point.
To validate dependency versions (not just presence):
import re
def validate_pinned_versions(root, result):
content = (root / "requirements.txt").read_text()
for line in content.strip().splitlines():
line = line.strip()
if line and not line.startswith("#") and not line.startswith("-"):
if "==" not in line:
result.issues.append(ValidationIssue(
rule="D-W01", severity=Severity.WARNING,
message=f"Package not pinned: {line}",
path="requirements.txt",
))
SIGDG Financial Transaction Ontology
The Signals Data Governance ontology (prefix SIGDG) is grounded in BFO 2020 (Basic Formal Ontology, ISO/IEC 21838-2:2021). BFO provides the upper-level categories that make ontologies from separate teams composable — preventing the class of integration failures Barry Smith illustrates with the 2006 Airbus A380 wiring debacle, where incompatible data representations across engineering teams cost $6 billion to correct. A shared top-level ontology is the mechanism that makes data descriptions joinable without ad-hoc reconciliation.
This page defines the financial transaction risk extension of SIGDG. It expands the existing SIGDG:0050 TransactionInformation node — which the Signals-360 metadata classification project defines as a leaf — into a full subtree covering fraud detection, transaction context, risk indicators, and regulatory scope. Both ontologies share the same CURIE prefix and BFO grounding, so categories from either project can be composed in a single governance policy.
BFO Grounding
BFO:0000031 generically dependent continuant
└── SIGDG:0001 InformationEntity ── "data that can be stored and transferred"
└── SIGDG:0050 TransactionInformation ── "event records: orders, sessions, timestamps"
└── SIGDG:0100 FinancialTransaction ── "monetary transfer event between parties"
BFO:0000019 quality
├── SIGDG:1001 SensitivityLevel ── "inheres in an information entity"
└── SIGDG:1050 TransactionRiskLevel ── "inheres in a financial transaction record"
BFO:0000023 role
├── SIGDG:2001 DataSubjectRole ── "externally grounded in governance context"
└���─ SIGDG:2040 RegulatoryFrameworkRole ── "externally grounded in regulatory scope"
BFO:0000015 process
├── SIGDG:3001 DataLifecycleProcess ── "transformation, migration, classification"
└── SIGDG:3040 FraudDetectionProcess ── "scoring, retraining, threshold calibration"
BFO’s generically dependent continuant (GDC) is the natural home for information entities — they depend on some material carrier (disk, memory, wire) but can be copied and transferred between carriers. A financial transaction record is a GDC: it describes a real-world monetary event but exists independently of any single storage medium. Risk levels are qualities that inhere in transaction records. Regulatory scope is a role — the same transaction carries different obligations under PCI-DSS vs. AML/KYC.
CURIE Prefix
SIGDG → https://signals360.dev/ontology/dg/
BFO → http://purl.obolibrary.org/obo/BFO_
The SIGDG prefix is shared with the Signals-360 metadata governance ontology. Code ranges are partitioned to avoid collision:
| Range | Owner | Domain |
|---|---|---|
0001–0091 | Signals-360 | Metadata column classification |
0100–0199 | This project | Financial transaction risk |
1010–1040 | Signals-360 | Data sensitivity levels |
1050–1059 | This project | Transaction risk levels |
2010–2030 | Signals-360 | Data subject roles |
2040–2049 | This project | Regulatory framework roles |
3010–3030 | Signals-360 | Data lifecycle processes |
3040–3049 | This project | Fraud detection processes |
Financial Transaction Hierarchy
SIGDG:0050 TransactionInformation (⊑ SIGDG:0001, from Signals-360)
└── SIGDG:0100 FinancialTransaction "monetary transfer event between parties"
├── SIGDG:0110 TransactionRiskClassification "risk assessment outcome for a transaction"
│ ├── SIGDG:0111 LegitimateTransaction "transaction consistent with cardholder behavior"
│ └── SIGDG:0112 FraudulentTransaction "unauthorized or deceptive transaction"
│ ├── SIGDG:0113 CardNotPresentFraud "fraud via remote channel without physical card"
│ ├── SIGDG:0114 CounterfeitCardFraud "fraud using cloned or fabricated card"
│ ├── SIGDG:0115 LostStolenCardFraud "fraud using a card obtained without consent"
│ └── SIGDG:0116 AccountTakeoverFraud "fraud via compromised account credentials"
├── SIGDG:0120 TransactionContext "observable properties of a transaction event"
│ ├── SIGDG:0121 MonetaryContext "amount, currency, denomination"
│ ├── SIGDG:0122 TemporalContext "timestamp, time-of-day, day-of-week patterns"
│ ├── SIGDG:0123 BehavioralContext "latent patterns derived from transaction history"
│ └── SIGDG:0124 MerchantContext "merchant category, location, channel"
└── SIGDG:0130 RiskIndicator "anomalous pattern suggesting elevated risk"
├── SIGDG:0131 AmountAnomaly "amount deviates from cardholder norm"
├── SIGDG:0132 VelocityAnomaly "transaction frequency exceeds cardholder baseline"
├── SIGDG:0133 GeographicAnomaly "location inconsistent with cardholder history"
└���─ SIGDG:0134 BehavioralAnomaly "latent behavioral pattern deviates from norm"
Hierarchy Rationale
TransactionRiskClassification (0110) models the outcome of the fraud detection model. In the current binary implementation, only LegitimateTransaction and FraudulentTransaction are used. The four fraud subtypes (0113–0116) are provided for future multi-class models and for downstream governance policies that distinguish fraud mechanisms — a lost-card fraud may trigger a card replacement workflow while an account takeover triggers a credential reset.
TransactionContext (0120) models what the features describe. The credit card fraud dataset’s 29 features map to this subtree:
| Feature(s) | SIGDG Category | Notes |
|---|---|---|
Amount | SIGDG:0121 MonetaryContext | Raw transaction amount; the only non-PCA feature |
Time (dropped) | SIGDG:0122 TemporalContext | Seconds since first transaction; dropped in preprocessing |
V1–V28 | SIGDG:0123 BehavioralContext | PCA-transformed latent dimensions; original features undisclosed |
| (not in dataset) | SIGDG:0124 MerchantContext | Placeholder for merchant data in future datasets |
The PCA features are deliberately anonymous — the dataset curators applied PCA for confidentiality. Each V-component encodes a linear combination of original transaction attributes. We classify them collectively as BehavioralContext rather than attempting to reverse-engineer individual semantics, because the ontology should describe what the features represent (behavioral patterns), not their mathematical form (principal components).
RiskIndicator (0130) models the evidence supporting a risk classification. These correspond to interpretable signals that a human analyst would recognize:
| Indicator | What it captures | Dataset proxy |
|---|---|---|
AmountAnomaly | Unusually large or small transaction | Extreme values in Amount |
VelocityAnomaly | Burst of transactions in short window | (requires Time; not used in current model) |
GeographicAnomaly | Transaction from unexpected location | (encoded in PCA components; not directly observable) |
BehavioralAnomaly | Deviation from established spending pattern | High-magnitude PCA components |
Not all indicators are directly observable in the current dataset. The ontology defines them for completeness — a production fraud system would populate all four from raw transaction data.
Transaction Risk Levels
Risk level is modeled as a BFO quality (BFO:0000019) that inheres in a financial transaction record. This parallels the sensitivity levels in Signals-360 (SIGDG:1010–1040), but describes fraud risk rather than data sensitivity.
| CURIE | Code | Label | Definition | Threshold Guidance |
|---|---|---|---|---|
SIGDG:1051 | 1051 | LowRisk | Transaction consistent with all behavioral baselines | Model score ≤ 0.10 |
SIGDG:1052 | 1052 | ElevatedRisk | Transaction deviates on one or more risk indicators | 0.10 < score ≤ 0.35 |
SIGDG:1053 | 1053 | HighRisk | Transaction strongly suggests fraudulent intent | 0.35 < score ≤ 0.80 |
SIGDG:1054 | 1054 | ConfirmedFraud | Transaction verified as unauthorized post-investigation | score > 0.80 or manual label |
The current binary model applies a single threshold at 0.35 (the boundary between ElevatedRisk and HighRisk). The four-level scheme supports graduated response:
- LowRisk: Approve automatically.
- ElevatedRisk: Approve but flag for batch review.
- HighRisk: Require step-up authentication or manual approval.
- ConfirmedFraud: Block and initiate chargeback/investigation.
Relationship to Sensitivity Levels
Transaction risk levels and data sensitivity levels are orthogonal qualities that can co-occur on the same entity:
| Entity | Risk Level | Sensitivity Level | Implication |
|---|---|---|---|
| Transaction record with PAN | HighRisk | Restricted (PCI-DSS) | Block transaction AND encrypt PAN at rest |
| Transaction record without PII | LowRisk | Internal | Approve; standard access controls |
| Aggregated fraud statistics | N/A | Confidential | No per-transaction risk; protect business intelligence |
Regulatory Framework Roles
The same transaction may be subject to multiple regulatory frameworks simultaneously. These are modeled as BFO roles (BFO:0000023) — externally grounded in the legal/regulatory context rather than intrinsic to the data.
| CURIE | Code | Label | Definition |
|---|---|---|---|
SIGDG:2041 | 2041 | PCIDSSScope | Transaction involves payment card data; subject to PCI-DSS requirements |
SIGDG:2042 | 2042 | AMLKYCScope | Transaction subject to Anti-Money Laundering / Know Your Customer rules |
SIGDG:2043 | 2043 | EMVLiabilityScope | Transaction subject to EMV chip liability shift rules |
A single transaction can carry all three roles. The fraud detection model’s output feeds into each framework differently:
- PCI-DSS (
2041): AHighRiskclassification triggers additional logging and encryption requirements for the transaction’s card data fields. - AML/KYC (
2042): Patterns ofElevatedRisktransactions across accounts trigger Suspicious Activity Report (SAR) filing obligations. - EMV Liability (
2043): The party that failed to support chip-based authentication bears fraud liability; the risk classification determines which party initiates the chargeback.
Fraud Detection Processes
Classification and model maintenance are modeled as BFO processes (BFO:0000015):
| CURIE | Code | Label | Definition |
|---|---|---|---|
SIGDG:3041 | 3041 | RealTimeScoring | Sub-second risk scoring at transaction authorization time |
SIGDG:3042 | 3042 | BatchRetraining | Periodic model retraining on accumulated labeled transactions |
SIGDG:3043 | 3043 | ThresholdCalibration | Adjusting the decision boundary to optimize precision-recall tradeoff |
In the current AMP implementation:
- RealTimeScoring corresponds to the
predict_fraud(args)model endpoint. - BatchRetraining corresponds to the notebook-driven distributed XGBoost/CatBoost training pipeline.
- ThresholdCalibration corresponds to the precision-recall curve analysis that selected the 0.35 threshold.
Mapping to the Credit Card Fraud Dataset
The Kaggle credit card fraud dataset maps to this ontology as follows:
Class Label Mapping
The binary Class column in the dataset maps to the first two children of TransactionRiskClassification:
| Class Value | SIGDG Category | Count | Rate |
|---|---|---|---|
| 0 | SIGDG:0111 LegitimateTransaction | 94,777 | 99.84% |
| 1 | SIGDG:0112 FraudulentTransaction | 149 | 0.16% |
The fraud subtypes (0113–0116) are not distinguishable in this dataset — the Kaggle data does not include fraud mechanism labels. A production deployment would label confirmed fraud cases with the appropriate subtype during investigation, enabling future multi-class models.
Extension Points
This ontology is intentionally minimal — it covers exactly what the current dataset and model support, with clearly marked placeholders for production extensions.
Near-term extensions (Phase 2–3 of CatBoost migration)
| Category | What it enables |
|---|---|
MerchantContext (0124) subtypes | MCC-based risk stratification when merchant data is available |
Fraud subtype labels (0113–0116) | Multi-class classification instead of binary |
Additional RiskIndicator subtypes | Device fingerprint anomaly, IP geolocation anomaly |
Cross-ontology composition with Signals-360
Because both ontologies share the SIGDG prefix and BFO grounding, they compose naturally. A governance policy can reference both:
“Any column classified as
SIGDG:0070 PaymentCardData(from Signals-360) whose transaction records are scoredSIGDG:1053 HighRisk(from this extension) must be encrypted at rest and trigger a Suspicious Activity Report.”
This is the integration that BFO’s top-level alignment makes possible — the column-level metadata classification from Signals and the row-level transaction risk classification from this project are two views of the same governed data, connected through a shared formal ontology.
CatBoost Migration Roadmap
This roadmap details how to migrate the distributed fraud detection pipeline from XGBoost to CatBoost, drawing on the production CatBoost patterns established in the Signals-360 metadata classification project. Signals uses CatBoost as one of five evidence sources in a Dempster-Shafer fusion pipeline for classifying database columns into a 174-category SIGDG taxonomy — a harder problem (extreme class imbalance, 2 samples per category) that validates CatBoost’s suitability for the comparatively straightforward binary fraud detection task here.
Why CatBoost
| Concern | XGBoost (current) | CatBoost (target) |
|---|---|---|
| Ordered boosting | Not available | posterior_sampling=True — prevents prediction shift in low-data regimes |
| Categorical features | Requires one-hot or label encoding | Native categorical handling via cat_features parameter |
| Regularization | L1/L2 on weights | L2 on leaves + Bayesian priors via ordered boosting |
| GPU training | tree_method="gpu_hist" | task_type="GPU" with automatic multi-GPU |
| Dask integration | First-class (xgb.dask.train, DaskDMatrix) | None — this is the primary migration challenge |
| Model format | Binary (.ubj) | CatBoost binary (.cbm) + sidecar class mapping (.classes.json) |
| Feature importance | Built-in SHAP | Built-in TreeSHAP via get_feature_importance(type="ShapValues") |
Migration Phases
Phase 1: Drop-in Replacement
Goal: Replace XGBoost with CatBoost while keeping the existing Dask infrastructure for data loading and preprocessing. CatBoost trains on collected (non-distributed) data.
Architecture Change
The current pipeline uses Dask end-to-end: DaskDMatrix → xgb.dask.train() → distributed predictions. CatBoost has no Dask integration, so the architecture splits into two stages: Dask for ETL, CatBoost for training.
Dependency Changes
# requirements.txt
- xgboost==1.6.1
+ catboost>=1.2.7
dask[complete]==2022.5.1
scikit-learn==1.0.2
numpy==1.22.4
Training Code
Replace xgb.dask.train() with CatBoostClassifier.fit():
from catboost import CatBoostClassifier, Pool
# Collect Dask arrays to NumPy (data must fit in driver memory)
X_train_np = X_train.compute()
y_train_np = y_train.compute()
X_dev_np = X_dev.compute()
y_dev_np = y_dev.compute()
# Create CatBoost Pools
train_pool = Pool(X_train_np, y_train_np)
dev_pool = Pool(X_dev_np, y_dev_np)
cb = CatBoostClassifier(
loss_function="Logloss", # Binary classification
depth=6,
iterations=100,
l2_leaf_reg=0.5,
learning_rate=0.1,
random_seed=42,
verbose=10,
eval_metric="PRAUC", # Equivalent to XGBoost's aucpr
auto_class_weights="Balanced", # Handle 0.16% fraud imbalance
)
cb.fit(
train_pool,
eval_set=dev_pool,
early_stopping_rounds=10,
)
Key differences from XGBoost:
Loglossreplacesreg:logistic(binary classification)PRAUCreplacesaucpr(same metric, different name)auto_class_weights="Balanced"addresses the 636:1 class imbalance natively — XGBoost required manualscale_pos_weightearly_stopping_roundsreplaces fixednum_round=5- No
tree_methodneeded — CatBoost uses symmetric trees by default
Inference Changes
# scripts/predict_fraud.py
import numpy as np
from catboost import CatBoostClassifier
booster = CatBoostClassifier()
booster.load_model('/home/cdsw/model/best-catboost-model.cbm')
threshold = 0.35
def predict_fraud(args):
features = np.array(args['features'])
prediction = booster.predict_proba(features)[:, 1] # P(fraud)
if prediction[0] <= threshold:
return 0
return 1
Key change: CatBoost’s predict_proba() returns a 2-column matrix [P(legit), P(fraud)]. Index [:, 1] extracts the fraud probability, replacing XGBoost’s inplace_predict() which returned a single value.
Model Serialization
# Save
cb.save_model("../model/best-catboost-model.cbm")
# Load
loaded = CatBoostClassifier()
loaded.load_model("../model/best-catboost-model.cbm")
The .cbm extension is CatBoost’s native binary format. Unlike XGBoost, the class mapping is embedded in the model file for binary classification, so no sidecar .classes.json is needed (that pattern from Signals is for multi-class).
What Stays the Same
- Dask cluster orchestration (
utils/dask_utils.py) — unchanged - Data loading (
dd.read_csv()) — unchanged - Feature engineering (drop Time, StandardScaler) — unchanged
- Train/dev/val split ratios — unchanged
- Threshold selection logic — unchanged
- CML AMP structure (
.project-metadata.yaml,cdsw-build.sh) — unchanged
Phase 2: CatBoost-Native Features
Goal: Adopt CatBoost-specific capabilities that XGBoost lacks, following patterns proven in Signals.
Ordered Boosting
Signals uses posterior_sampling=True in all CatBoost configurations to prevent prediction shift — a form of target leakage where training samples influence their own gradient estimates. This is particularly valuable in the fraud dataset’s extreme imbalance regime.
cb = CatBoostClassifier(
loss_function="Logloss",
depth=8, # Deeper (Signals uses 8 for complex tasks)
iterations=500, # More rounds (Signals uses 500)
l2_leaf_reg=0.3, # Lighter regularization
learning_rate=0.08, # Slower learning rate
bootstrap_type="Bernoulli", # Bernoulli subsampling
subsample=0.8, # 80% per iteration
posterior_sampling=True, # Ordered boosting (CatBoost-unique)
min_data_in_leaf=2, # From Signals: critical for rare classes
random_seed=42,
eval_metric="PRAUC",
auto_class_weights="Balanced",
)
GPU caveat (from Signals):
posterior_sampling=Trueandrsm(random subspace method) are not supported on CatBoost GPU for classification. Training must run on CPU when these are enabled. Signals handles this by detecting GPU availability and forcing CPU when ordered boosting is active.
GPU Acceleration
For configurations that don’t use ordered boosting (e.g., initial exploration):
def _catboost_gpu_kwargs(devices=None):
"""GPU detection pattern from Signals."""
import torch
if torch.cuda.is_available():
n = torch.cuda.device_count()
if devices is None:
devices = ":".join(str(i) for i in range(n))
return {"task_type": "GPU", "devices": devices}
return {}
cb = CatBoostClassifier(
**_catboost_gpu_kwargs(),
# ... other params (without posterior_sampling)
)
TreeSHAP Feature Importance
Signals uses CatBoost’s built-in TreeSHAP for per-prediction explanations. Apply the same pattern for fraud detection:
from catboost import Pool
pool = Pool(X_val_np)
shap_values = cb.get_feature_importance(
type="ShapValues",
data=pool,
)
# shap_values shape: (n_samples, n_features + 1)
# Last column is the base value (bias term)
feature_shap = shap_values[:, :-1] # Per-sample, per-feature importance
This replaces the need for a separate SHAP library and is much faster for tree models.
Hyperparameter Search Space
Adapted from XGBoost space with CatBoost equivalents:
| XGBoost Parameter | CatBoost Equivalent | Recommended Range |
|---|---|---|
learning_rate | learning_rate | [0.01, 0.3] |
gamma | min_data_in_leaf | [1, 20] (integer) |
max_depth | depth | [4, 10] |
min_child_weight | min_data_in_leaf | (merged with gamma) |
max_delta_step | (not needed) | — |
subsample | subsample | [0.6, 1.0] |
lambda (L2) | l2_leaf_reg | [0.1, 10] (log-uniform) |
alpha (L1) | (not directly available) | — |
CatBoost-specific parameters to add:
| Parameter | Range | Description |
|---|---|---|
random_strength | [0.1, 10] | Score randomization at each split |
bagging_temperature | [0, 5] | Bayesian bootstrap intensity |
border_count | [32, 255] | Number of splits per feature |
Phase 3: Distributed Strategy
Goal: Restore memory-constrained distributed training capability without Dask integration.
CatBoost does not support Dask. Three strategies can replace xgb.dask.train() for datasets that exceed driver memory:
Strategy A: Dask Preprocessing + Quantized Pools
Use Dask for data loading and preprocessing, then convert to CatBoost’s quantized pool format which uses ~8x less memory than raw floats:
# 1. Dask loads and preprocesses (distributed)
dask_df = dd.read_csv("large_dataset.csv", assume_missing=True)
dask_df = dask_df.drop(columns=["Time"])
# ... feature engineering on Dask ...
# 2. Save preprocessed partitions to disk
dask_df.to_parquet("/tmp/preprocessed/", engine="pyarrow")
# 3. CatBoost loads quantized (memory-efficient)
from catboost import Pool
train_pool = Pool(
"/tmp/preprocessed/train.parquet",
column_description="column_desc.cd",
)
# Quantize to ~8x memory reduction
train_pool.quantize()
Strategy B: CatBoost Multi-Node Training
CatBoost has its own distributed training mode via --node-count:
# On each CML worker:
catboost fit \
--loss-function Logloss \
--learn-set /data/train.tsv \
--node-count 3 \
--node-port 8788 \
--file-with-hosts hosts.txt
This could be orchestrated via dask_utils.py’s worker-launching pattern — replace !dask-worker with !catboost fit --node-count N.
Strategy C: Partition-Level Ensembling
Train separate CatBoost models on each Dask partition, then ensemble predictions:
# On each Dask worker
def train_partition(partition_df):
cb = CatBoostClassifier(...)
cb.fit(partition_df[features], partition_df["Class"])
return cb
# Collect models and average predictions
models = client.map(train_partition, dask_df.to_delayed())
predictions = np.mean([m.predict_proba(X_val) for m in models], axis=0)
This is the simplest approach but sacrifices some accuracy compared to true distributed training where the full dataset informs each tree.
Recommended Strategy
For datasets up to ~10M rows: Strategy A (quantized pools). CatBoost’s quantization reduces a 10M × 29 float64 matrix from ~2.3 GB to ~290 MB, fitting comfortably in a single 4 GiB CML session.
For datasets beyond ~10M rows: Strategy B (multi-node). Requires modifying dask_utils.py to launch CatBoost worker processes instead of Dask workers.
Phase 4: Signals Patterns (Advanced)
Goal: Adopt advanced techniques from Signals that would improve fraud detection beyond what basic CatBoost provides.
Self-Training
Signals implements multi-round self-training where high-confidence predictions are injected back as training labels. This is valuable when labeled fraud data is scarce:
# Round 1: Train on labeled data
cb.fit(X_train, y_train)
proba = cb.predict_proba(X_unlabeled)
# Pseudo-label high-confidence predictions
confident_mask = proba.max(axis=1) >= 0.80 # Signals default threshold
pseudo_X = X_unlabeled[confident_mask]
pseudo_y = proba[confident_mask].argmax(axis=1)
# Round 2: Retrain with pseudo-labels added
X_aug = np.vstack([X_train, pseudo_X])
y_aug = np.concatenate([y_train, pseudo_y])
cb.fit(X_aug, y_aug)
Signals achieves 99.4% accuracy with self-training vs 81.6% without — though the fraud detection domain may see smaller gains since the signal-to-noise ratio in transaction features is higher than in column metadata.
Synthetic Data Augmentation
Signals generates 50 synthetic variants per category to handle classes with only 2 real samples. For fraud detection, a similar approach could generate synthetic fraud transactions:
- Fit a distribution to real fraud transactions (V1–V28 are already PCA-transformed, so multivariate normal is reasonable).
- Generate
Nsynthetic fraud samples. - Include synthetic samples in training with reduced sample weight.
Evidence Fusion
The Signals architecture combines five evidence sources via Dempster-Shafer theory. For fraud detection, the CatBoost model could become one source alongside:
| Source | Mass Function | Discount |
|---|---|---|
| CatBoost probability | catboost_to_mass() | 0.15 (Signals default for well-calibrated CatBoost) |
| Pattern detection | Rule-based (card number regex, etc.) | 0.10 (high-reliability rules) |
| Amount anomaly | Z-score of transaction amount | 0.30 |
| Velocity check | Transactions per time window | 0.30 |
This produces belief intervals [Bel(fraud), Pl(fraud)] instead of a single confidence score — uncertainty-aware decisions where the gap between belief and plausibility flags transactions needing human review. The evidence sources and risk levels map to the SIGDG Financial Transaction Ontology, which provides BFO-grounded categories for transaction risk classification, regulatory scope, and fraud detection processes.
Migration Checklist
Phase 1 (minimum viable)
- Replace
xgboostwithcatboostinrequirements.txt - Add
.compute()calls to collect Dask arrays before training - Replace
xgb.dask.train()withCatBoostClassifier.fit() - Replace
xgb.dask.DaskDMatrixwithcatboost.Pool - Update
predict_fraud.py:load_model()+predict_proba()[:, 1] - Update model path from
best-xgboost-modeltobest-catboost-model.cbm - Update hyperparameter search space for CatBoost parameters
- Update evaluation: CatBoost uses
PRAUCnotaucpr - Update
cdsw-build.shif dependency names changed - Run threshold selection on validation set (threshold may shift)
Phase 2 (CatBoost-native)
- Enable
posterior_sampling=True(ordered boosting) - Add
auto_class_weights="Balanced"for imbalance handling - Add GPU detection with CPU fallback (from Signals)
- Replace SHAP library with CatBoost’s built-in TreeSHAP
- Tune CatBoost-specific parameters (
random_strength,bagging_temperature)
Phase 3 (distributed)
- Implement quantized Pool loading for large datasets
- Or: modify
dask_utils.pyto launch CatBoost multi-node workers - Benchmark memory usage: quantized vs raw on target dataset size
Phase 4 (Signals patterns)
- Implement self-training loop with confidence threshold
- Evaluate synthetic fraud augmentation
- Prototype evidence fusion with rule-based sources
Files Modified
| File | Change |
|---|---|
requirements.txt | xgboost → catboost |
notebooks/distributed-xgboost-with-dask.ipynb | All training/prediction cells |
scripts/predict_fraud.py | Model loading and inference API |
utils/dask_utils.py | No change (Phase 1–2); modify for Phase 3B |
.project-metadata.yaml | No change |
cdsw-build.sh | No change (dependencies installed via requirements.txt) |