{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 10 Minutes to Dask-XGBoost\n", "\n", "The [RAPIDS Fork of Dask-XGBoost](https://github.com/rapidsai/dask-xgboost/ \"RAPIDS Dask-XGBoost\") enables XGBoost with the distributed CUDA DataFrame via Dask-cuDF. A user may pass Dask-XGBoost a reference to a distributed cuDF object, and start a training session over an entire cluster from Python. [The RAPIDS Fork of XGBoost](https://github.com/rapidsai/xgboost \"RAPIDS XGBoost\") enables XGBoost with the CUDA DataFrame, and we are actively working to unify all of this functionality into a single API consumable from [DMLC XGBoost](https://github.com/dmlc/xgboost \"DMLC XGBoost\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Disable NCCL P2P. Only necessary for versions of NCCL < 2.4" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%env NCCL_P2P_DISABLE=1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import necessary modules and initialize the Dask-cuDF Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `LocalCUDACluster` from Dask-CUDA to instantiate the single-node cluster.\n", "\n", "A user may instantiate a Dask-cuDF cluster like this:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import cudf\n", "import dask\n", "import dask_cudf\n", "import dask_xgboost\n", "import pandas as pd\n", "import numpy as np\n", "\n", "from dask.distributed import Client, wait\n", "from dask_cuda import LocalCUDACluster\n", "\n", "import subprocess\n", "\n", "cmd = \"hostname --all-ip-addresses\"\n", "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n", "output, error = process.communicate()\n", "IPADDR = str(output.decode()).split()[0]\n", "\n", "cluster = LocalCUDACluster(ip=IPADDR)\n", "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note the use of `from dask_cuda import LocalCUDACluster`. [Dask-CUDA](https://github.com/rapidsai/dask-cuda) is a lightweight set of utilities useful for setting up a Dask cluster. These calls instantiate a Dask-cuDF cluster in a single node environment. To instantiate a multi-node Dask-cuDF cluster, a user must use `dask-scheduler` and `dask-cuda-worker`. These are utilities available at the command-line to launch the scheduler, and its associated workers." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initialize a Random Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `dask_cudf.DataFrame.query` to split the dataset into train-and-test sets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "size = 1000000\n", "npartitions = 8\n", "\n", "pdf = pd.DataFrame({'x': np.random.randint(0, npartitions, size=size), 'y': np.random.normal(size=size)})\n", "pdf = dask.dataframe.from_pandas(pdf, npartitions=npartitions)\n", "\n", "ddf = dask_cudf.from_dask_dataframe(pdf)\n", "\n", "x_train = ddf.query('y < 0.5')\n", "y_train = x_train[['x']]\n", "x_train = x_train[x_train.columns.difference(['x'])]\n", "\n", "x_test = ddf.query('y > 0.5')\n", "y_test = x_test[['x']]\n", "x_test = x_test[x_test.columns.difference(['x'])]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define Parameters and Train with XGBoost" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `dask_cudf.DataFrame.persist()` to ensure each GPU worker has ownership of data before training for optimal load-balance. Please note: this is optional." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "params = {\n", " 'num_rounds': 100,\n", " 'max_depth': 8,\n", " 'max_leaves': 2**8,\n", " 'n_gpus': 1,\n", " 'tree_method': 'gpu_hist',\n", " 'objective': 'reg:squarederror',\n", " 'grow_policy': 'lossguide'\n", "}\n", "\n", "## Optional: persist training data into memory\n", "# x_train = x_train.persist()\n", "# y_train = y_train.persist()\n", "\n", "bst = dask_xgboost.train(client, params, x_train, y_train, num_boost_round=params['num_rounds'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inputs for `dask_xgboost.train`\n", "\n", "1. `client`: the `dask.distributed.Client`\n", "2. `params`: the training parameters for XGBoost. Note that it is a requirement to set `'n_gpus': 1`, as it tells Dask-cuDF that each worker will have a single GPU to perform coordinated computation\n", "3. `x_train`: an instance of `dask_cudf.DataFrame` containing the data to be trained\n", "4. `y_train`: an instance of `dask_cudf.Series` containing the labels for the training data\n", "5. `num_boost_round=params['num_rounds']`: a specification on the number of boosting rounds for the training session" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Compute Predictions and the RMSE Metric for the Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `dask.dataframe.multi.concat` to build a `dask_cudf.DataFrame` from `[dask_cudf.Series]` to leverage a cleaner API for computing RMSE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pred = dask_xgboost.predict(client, bst, x_test)\n", "test = dask.dataframe.multi.concat([pred], axis=1)\n", "\n", "test['squared_error'] = (test[0] - y_test['x'])**2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### How to run prediction via `dask_xgboost.predict`\n", "\n", "1. `client`: the `dask.distributed.Client`\n", "2. `bst`: the Booster produced by the XGBoost training session\n", "3. `x_test`: an instance of `dask_cudf.DataFrame` containing the data to be inferenced (acquire predictions)\n", "\n", "`pred` will be an instance of `dask_cudf.Series`\n", "\n", "We can use `dask.dataframe.multi.concat` to construct a `dask_cudf.DataFrame` by concatenating the list of `dask_cudf.Series` instances (`[pred]`)\n", "\n", "`test` is a `dask_cudf.DataFrame` object with a single column named `0` (e.g.) `test[0]` returns `pred`. Additionally, the root-mean-squared-error (RMSE) can be computed by constructing a new column and assigning to it the value of the difference between predicted and labeled values squared. This is encoded in the assignment `test['squared_error'] = (test[0] - y_test['x'])**2`.\n", "\n", "Finally, the mean can be computed by using an aggregator from the `dask_cudf` API. The entire computation is initiated via `.compute()`. We take the square-root of the result, leaving us with `rmse = np.sqrt(test.squared_error.mean().compute())`. Note: `.squared_error` is an accessor for `test[squared_error]`... Like so:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rmse = np.sqrt(test.squared_error.mean().compute())\n", "print('rmse value:', rmse)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 2 }