10 Minutes to Dask-XGBoost

The RAPIDS Fork of Dask-XGBoost enables XGBoost with the distributed CUDA DataFrame via Dask-cuDF. A user may pass Dask-XGBoost a reference to a distributed cuDF object, and start a training session over an entire cluster from Python. The RAPIDS Fork of XGBoost enables XGBoost with the CUDA DataFrame, and we are actively working to unify all of this functionality into a single API consumable from DMLC XGBoost

Disable NCCL P2P. Only necessary for versions of NCCL < 2.4

[1]:
%env NCCL_P2P_DISABLE=1
env: NCCL_P2P_DISABLE=1

Import necessary modules and initialize the Dask-cuDF Cluster

Using LocalCUDACluster from Dask-CUDA to instantiate the single-node cluster.

A user may instantiate a Dask-cuDF cluster like this:

[2]:
import cudf
import dask
import dask_cudf
import dask_xgboost
import pandas as pd
import numpy as np

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

import subprocess

cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster)
client
[2]:

Client

Cluster

  • Workers: 1
  • Cores: 1
  • Memory: 270.37 GB

Note the use of from dask_cuda import LocalCUDACluster. Dask-CUDA is a lightweight set of utilities useful for setting up a Dask cluster. These calls instantiate a Dask-cuDF cluster in a single node environment. To instantiate a multi-node Dask-cuDF cluster, a user must use dask-scheduler and dask-cuda-worker. These are utilities available at the command-line to launch the scheduler, and its associated workers.

Initialize a Random Dataset

Use dask_cudf.DataFrame.query to split the dataset into train-and-test sets

[3]:
size = 1000000
npartitions = 8

pdf = pd.DataFrame({'x': np.random.randint(0, npartitions, size=size), 'y': np.random.normal(size=size)})
pdf = dask.dataframe.from_pandas(pdf, npartitions=npartitions)

ddf = dask_cudf.from_dask_dataframe(pdf)

x_train = ddf.query('y < 0.5')
y_train = x_train[['x']]
x_train = x_train[x_train.columns.difference(['x'])]

x_test = ddf.query('y > 0.5')
y_test = x_test[['x']]
x_test = x_test[x_test.columns.difference(['x'])]

Define Parameters and Train with XGBoost

Use dask_cudf.DataFrame.persist() to ensure each GPU worker has ownership of data before training for optimal load-balance. Please note: this is optional.

[4]:
params = {
  'num_rounds':   100,
  'max_depth':    8,
  'max_leaves':   2**8,
  'n_gpus':       1,
  'tree_method':  'gpu_hist',
  'objective':    'reg:squarederror',
  'grow_policy':  'lossguide'
}

## Optional: persist training data into memory
# x_train = x_train.persist()
# y_train = y_train.persist()

bst = dask_xgboost.train(client, params, x_train, y_train, num_boost_round=params['num_rounds'])

Inputs for dask_xgboost.train

  1. client: the dask.distributed.Client

  2. params: the training parameters for XGBoost. Note that it is a requirement to set 'n_gpus': 1, as it tells Dask-cuDF that each worker will have a single GPU to perform coordinated computation

  3. x_train: an instance of dask_cudf.DataFrame containing the data to be trained

  4. y_train: an instance of dask_cudf.Series containing the labels for the training data

  5. num_boost_round=params['num_rounds']: a specification on the number of boosting rounds for the training session

Compute Predictions and the RMSE Metric for the Model

Use dask.dataframe.multi.concat to build a dask_cudf.DataFrame from [dask_cudf.Series] to leverage a cleaner API for computing RMSE

[5]:
pred = dask_xgboost.predict(client, bst, x_test)
test = dask.dataframe.multi.concat([pred], axis=1)

test['squared_error'] = (test[0] - y_test['x'])**2

How to run prediction via dask_xgboost.predict

  1. client: the dask.distributed.Client

  2. bst: the Booster produced by the XGBoost training session

  3. x_test: an instance of dask_cudf.DataFrame containing the data to be inferenced (acquire predictions)

pred will be an instance of dask_cudf.Series

We can use dask.dataframe.multi.concat to construct a dask_cudf.DataFrame by concatenating the list of dask_cudf.Series instances ([pred])

test is a dask_cudf.DataFrame object with a single column named 0 (e.g.) test[0] returns pred. Additionally, the root-mean-squared-error (RMSE) can be computed by constructing a new column and assigning to it the value of the difference between predicted and labeled values squared. This is encoded in the assignment test['squared_error'] = (test[0] - y_test['x'])**2.

Finally, the mean can be computed by using an aggregator from the dask_cudf API. The entire computation is initiated via .compute(). We take the square-root of the result, leaving us with rmse = np.sqrt(test.squared_error.mean().compute()). Note: .squared_error is an accessor for test[squared_error]… Like so:

[6]:
rmse = np.sqrt(test.squared_error.mean().compute())
print('rmse value:', rmse)
/conda/envs/gdf/lib/python3.7/site-packages/distributed/worker.py:3101: UserWarning: Large object of size 2.00 MB detected in task graph:
  ('squared_error', 2, 0, 'x', ['x'], None, '_predic ... 5323a6de0d89f')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)
rmse value: 2.2922784627049544
[ ]: