10 Minutes to cuDF and Dask-cuDF

Modeled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly for new users.

What are these Libraries?

cuDF is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating data.

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple.

Dask-cuDF is a library that provides a partitioned, GPU-backed dataframe, using Dask.

When to use cuDF and Dask-cuDF

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF.

[1]:
import os

import numpy as np
import pandas as pd
import cudf
import dask_cudf

np.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

Object Creation

Creating a cudf.Series and dask_cudf.Series.

[2]:
s = cudf.Series([1,2,3,None,4])
s
[2]:
0       1
1       2
2       3
3    null
4       4
dtype: int64
[3]:
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.compute()
[3]:
0       1
1       2
2       3
3    null
4       4
dtype: int64

Creating a cudf.DataFrame and a dask_cudf.DataFrame by specifying values for each column.

[4]:
df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])
df
[4]:
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
4 4 15 4
5 5 14 5
6 6 13 6
7 7 12 7
8 8 11 8
9 9 10 9
10 10 9 10
11 11 8 11
12 12 7 12
13 13 6 13
14 14 5 14
15 15 4 15
16 16 3 16
17 17 2 17
18 18 1 18
19 19 0 19
[5]:
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.compute()
[5]:
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
4 4 15 4
5 5 14 5
6 6 13 6
7 7 12 7
8 8 11 8
9 9 10 9
10 10 9 10
11 11 8 11
12 12 7 12
13 13 6 13
14 14 5 14
15 15 4 15
16 16 3 16
17 17 2 17
18 18 1 18
19 19 0 19

Creating a cudf.DataFrame from a pandas Dataframe and a dask_cudf.Dataframe from a cudf.Dataframe.

Note that best practice for using Dask-cuDF is to read data directly into a ``dask_cudf.DataFrame`` with something like ``read_csv`` (discussed below).

[6]:
pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
gdf
[6]:
a b
0 0 0.1
1 1 0.2
2 2 null
3 3 0.3
[7]:
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.compute()
[7]:
a b
0 0 0.1
1 1 0.2
2 2 null
3 3 0.3

Viewing Data

Viewing the top rows of a GPU dataframe.

[8]:
df.head(2)
[8]:
a b c
0 0 19 0
1 1 18 1
[9]:
ddf.head(2)
[9]:
a b c
0 0 19 0
1 1 18 1

Sorting by values.

[10]:
df.sort_values(by='b')
[10]:
a b c
19 19 0 19
18 18 1 18
17 17 2 17
16 16 3 16
15 15 4 15
14 14 5 14
13 13 6 13
12 12 7 12
11 11 8 11
10 10 9 10
9 9 10 9
8 8 11 8
7 7 12 7
6 6 13 6
5 5 14 5
4 4 15 4
3 3 16 3
2 2 17 2
1 1 18 1
0 0 19 0
[11]:
ddf.sort_values(by='b').compute()
[11]:
a b c
0 19 0 19
1 18 1 18
2 17 2 17
3 16 3 16
4 15 4 15
5 14 5 14
6 13 6 13
7 12 7 12
8 11 8 11
9 10 9 10
10 9 10 9
11 8 11 8
12 7 12 7
13 6 13 6
14 5 14 5
15 4 15 4
16 3 16 3
17 2 17 2
18 1 18 1
19 0 19 0

Selection

Getting

Selecting a single column, which initially yields a cudf.Series or dask_cudf.Series. Calling compute results in a cudf.Series (equivalent to df.a).

[12]:
df['a']
[12]:
0      0
1      1
2      2
3      3
4      4
5      5
6      6
7      7
8      8
9      9
10    10
11    11
12    12
13    13
14    14
15    15
16    16
17    17
18    18
19    19
Name: a, dtype: int64
[13]:
ddf['a'].compute()
[13]:
0      0
1      1
2      2
3      3
4      4
5      5
6      6
7      7
8      8
9      9
10    10
11    11
12    12
13    13
14    14
15    15
16    16
17    17
18    18
19    19
Name: a, dtype: int64

Selection by Label

Selecting rows from index 2 to index 5 from columns ‘a’ and ‘b’.

[14]:
df.loc[2:5, ['a', 'b']]
[14]:
a b
2 2 17
3 3 16
4 4 15
5 5 14
[15]:
ddf.loc[2:5, ['a', 'b']].compute()
[15]:
a b
2 2 17
3 3 16
4 4 15
5 5 14

Selection by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

[16]:
df.iloc[0]
[16]:
a     0
b    19
c     0
Name: 0, dtype: int64
[17]:
df.iloc[0:3, 0:2]
[17]:
a b
0 0 19
1 1 18
2 2 17

You can also select elements of a DataFrame or Series with direct index access.

[18]:
df[3:5]
[18]:
a b c
3 3 16 3
4 4 15 4
[19]:
s[3:5]
[19]:
3    null
4       4
dtype: int64

Boolean Indexing

Selecting rows in a DataFrame or Series by direct Boolean indexing.

[20]:
df[df.b > 15]
[20]:
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
[21]:
ddf[ddf.b > 15].compute()
[21]:
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3

Selecting values from a DataFrame where a Boolean condition is met, via the query API.

[22]:
df.query("b == 3")
[22]:
a b c
16 16 3 16
[23]:
ddf.query("b == 3").compute()
[23]:
a b c
16 16 3 16

You can also pass local variables to Dask-cuDF queries, via the local_dict keyword. With standard cuDF, you may either use the local_dict keyword or directly pass the variable via the @ keyword. Supported logical operators include >, <, >=, <=, ==, and !=.

[24]:
cudf_comparator = 3
df.query("b == @cudf_comparator")
[24]:
a b c
16 16 3 16
[25]:
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={'val':dask_cudf_comparator}).compute()
[25]:
a b c
16 16 3 16

Using the isin method for filtering.

[26]:
df[df.a.isin([0, 5])]
[26]:
a b c
0 0 19 0
5 5 14 5

MultiIndex

cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see Grouping below) automatically produces a DataFrame with a MultiIndex.

[27]:
arrays = [['a', 'a', 'b', 'b'],
          [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx
[27]:
MultiIndex(levels=[0    a
1    b
dtype: object, 0    1
1    2
2    3
3    4
dtype: int64],
codes=   0  1
0  0  0
1  0  1
2  1  2
3  1  3)

This index can back either axis of a DataFrame.

[28]:
gdf1 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)})
gdf1.index = idx
gdf1
[28]:
first second
a 1 0.154163 0.014575
2 0.740050 0.918747
b 3 0.263315 0.900715
4 0.533739 0.033421
[29]:
gdf2 = cudf.DataFrame({'first': np.random.rand(4), 'second': np.random.rand(4)}).T
gdf2.columns = idx
gdf2
[29]:
a b
1 2 3 4
first 0.956949 0.137209 0.283828 0.606083
second 0.944225 0.852736 0.002259 0.521226

Accessing values of a DataFrame with a MultiIndex. Note that slicing is not yet supported.

[30]:
gdf1.loc[('b', 3)]
[30]:
first second
b 3 0.263315 0.900715

Missing Data

Missing data can be replaced by using the fillna method.

[31]:
s.fillna(999)
[31]:
0      1
1      2
2      3
3    999
4      4
dtype: int64
[32]:
ds.fillna(999).compute()
[32]:
0      1
1      2
2      3
3    999
4      4
dtype: int64

Operations

Stats

Calculating descriptive statistics for a Series.

[33]:
s.mean(), s.var()
[33]:
(2.5, 1.666666666666666)
[34]:
ds.mean().compute(), ds.var().compute()
[34]:
(2.5, 1.6666666666666667)

Applymap

Applying functions to a Series. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use map_partitions to apply a function to each partition of the distributed dataframe.

[35]:
def add_ten(num):
    return num + 10

df['a'].applymap(add_ten)
[35]:
0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64
[36]:
ddf['a'].map_partitions(add_ten).compute()
[36]:
0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64

Histogramming

Counting the number of occurrences of each unique value of variable.

[37]:
df.a.value_counts()
[37]:
0     1
1     1
2     1
3     1
4     1
5     1
6     1
7     1
8     1
9     1
10    1
11    1
12    1
13    1
14    1
15    1
16    1
17    1
18    1
19    1
Name: a, dtype: int32
[38]:
ddf.a.value_counts().compute()
[38]:
0     1
1     1
2     1
3     1
4     1
5     1
6     1
7     1
8     1
9     1
10    1
11    1
12    1
13    1
14    1
15    1
16    1
17    1
18    1
19    1
Name: a, dtype: int64

String Methods

Like pandas, cuDF provides string processing methods in the str attribute of Series. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

[39]:
s = cudf.Series(['A', 'B', 'C', 'Aaba', 'Baca', None, 'CABA', 'dog', 'cat'])
s.str.lower()
[39]:
0       a
1       b
2       c
3    aaba
4    baca
5    None
6    caba
7     dog
8     cat
dtype: object
[40]:
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().compute()
[40]:
0       a
1       b
2       c
3    aaba
4    baca
5    None
6    caba
7     dog
8     cat
dtype: object

Concat

Concatenating Series and DataFrames row-wise.

[41]:
s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])
[41]:
0       1
1       2
2       3
3    null
4       5
0       1
1       2
2       3
3    null
4       5
dtype: int64
[42]:
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).compute()
[42]:
0       1
1       2
2       3
3    null
4       5
0       1
1       2
2       3
3    null
4       5
dtype: int64

Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

[43]:
df_a = cudf.DataFrame()
df_a['key'] = ['a', 'b', 'c', 'd', 'e']
df_a['vals_a'] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b['key'] = ['a', 'c', 'e']
df_b['vals_b'] = [float(i+100) for i in range(3)]

merged = df_a.merge(df_b, on=['key'], how='left')
merged
[43]:
key vals_a vals_b
0 a 10.0 100.0
1 c 12.0 101.0
2 e 14.0 102.0
3 b 11.0 null
4 d 13.0 null
[44]:
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = ddf_a.merge(ddf_b, on=['key'], how='left').compute()
merged
[44]:
key vals_a vals_b
0 a 10.0 100.0
1 c 12.0 101.0
2 b 11.0 null
0 e 14.0 102.0
1 d 13.0 null

Append

Appending values from another Series or array-like object.

[45]:
s.append(s)
[45]:
0       1
1       2
2       3
3    null
4       5
0       1
1       2
2       3
3    null
4       5
dtype: int64
[46]:
ds2.append(ds2).compute()
[46]:
0       1
1       2
2       3
3    null
4       5
0       1
1       2
2       3
3    null
4       5
dtype: int64

Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

[47]:
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping and then applying the sum function to the grouped data.

[48]:
df.groupby('agg_col1').sum()
[48]:
a b c agg_col2
agg_col1
0 100 90 100 3
1 90 100 90 4
[49]:
ddf.groupby('agg_col1').sum().compute()
[49]:
a b c agg_col2
agg_col1
0 100 90 100 3
1 90 100 90 4

Grouping hierarchically then applying the sum function to grouped data.

[50]:
df.groupby(['agg_col1', 'agg_col2']).sum()
[50]:
a b c
agg_col1 agg_col2
0 0 73 60 73
1 27 30 27
1 0 54 60 54
1 36 40 36
[51]:
ddf.groupby(['agg_col1', 'agg_col2']).sum().compute()
[51]:
a b c
agg_col1 agg_col2
1 1 36 40 36
0 0 73 60 73
1 0 54 60 54
0 1 27 30 27

Grouping and applying statistical functions to specific columns, using agg.

[52]:
df.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'})
[52]:
a b c
agg_col1
0 19 9.0 100
1 18 10.0 90
[53]:
ddf.groupby('agg_col1').agg({'a':'max', 'b':'mean', 'c':'sum'}).compute()
[53]:
a b c
agg_col1
0 19 9.0 100
1 18 10.0 90

Transpose

Transposing a dataframe, using either the transpose method or T property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF.

[54]:
sample = cudf.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
sample
[54]:
a b
0 1 4
1 2 5
2 3 6
[55]:
sample.transpose()
[55]:
0 1 2
a 1 2 3
b 4 5 6

Time Series

DataFrames supports datetime typed columns, which allow users to interact with and filter data based on specific timestamps.

[56]:
import datetime as dt

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
date_df.query('date <= @search_date')
[56]:
date value
0 2018-11-20 0.552038
1 2018-11-21 0.485377
2 2018-11-22 0.768134
3 2018-11-23 0.160717
[57]:
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute()
[57]:
date value
0 2018-11-20 0.552038
1 2018-11-21 0.485377
2 2018-11-22 0.768134
3 2018-11-23 0.160717

Categoricals

DataFrames support categorical columns.

[58]:
gdf = cudf.DataFrame({"id":[1,2,3,4,5,6], "grade":['a', 'b', 'b', 'a', 'a', 'e']})
gdf['grade'] = gdf['grade'].astype('category')
gdf
[58]:
id grade
0 1 a
1 2 b
2 3 b
3 4 a
4 5 a
5 6 e
[59]:
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.compute()
[59]:
id grade
0 1 a
1 2 b
2 3 b
3 4 a
4 5 a
5 6 e

Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.

[60]:
gdf.grade.cat.categories
[60]:
StringIndex(['a' 'b' 'e'], dtype='object')

Accessing the underlying code values of each categorical observation.

[61]:
gdf.grade.cat.codes
[61]:
0    0
1    1
2    1
3    0
4    0
5    2
Name: grade, dtype: int32
[62]:
dgdf.grade.cat.codes.compute()
[62]:
0    0
1    1
2    1
0    0
1    0
2    2
Name: grade, dtype: int32

Converting Data Representation

Pandas

Converting a cuDF and Dask-cuDF DataFrame to a pandas DataFrame.

[63]:
df.head().to_pandas()
[63]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
[64]:
ddf.compute().head().to_pandas()
[64]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

Numpy

Converting a cuDF or Dask-cuDF DataFrame to a numpy ndarray.

[65]:
df.as_matrix()
[65]:
array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0],
       [ 5, 14,  5,  0,  0],
       [ 6, 13,  6,  1,  1],
       [ 7, 12,  7,  0,  0],
       [ 8, 11,  8,  1,  0],
       [ 9, 10,  9,  0,  1],
       [10,  9, 10,  1,  0],
       [11,  8, 11,  0,  0],
       [12,  7, 12,  1,  1],
       [13,  6, 13,  0,  0],
       [14,  5, 14,  1,  0],
       [15,  4, 15,  0,  1],
       [16,  3, 16,  1,  0],
       [17,  2, 17,  0,  0],
       [18,  1, 18,  1,  1],
       [19,  0, 19,  0,  0]])
[66]:
ddf.compute().as_matrix()
[66]:
array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0],
       [ 5, 14,  5,  0,  0],
       [ 6, 13,  6,  1,  1],
       [ 7, 12,  7,  0,  0],
       [ 8, 11,  8,  1,  0],
       [ 9, 10,  9,  0,  1],
       [10,  9, 10,  1,  0],
       [11,  8, 11,  0,  0],
       [12,  7, 12,  1,  1],
       [13,  6, 13,  0,  0],
       [14,  5, 14,  1,  0],
       [15,  4, 15,  0,  1],
       [16,  3, 16,  1,  0],
       [17,  2, 17,  0,  0],
       [18,  1, 18,  1,  1],
       [19,  0, 19,  0,  0]])

Converting a cuDF or Dask-cuDF Series to a numpy ndarray.

[67]:
df['a'].to_array()
[67]:
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19])
[68]:
ddf['a'].compute().to_array()
[68]:
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19])

Arrow

Converting a cuDF or Dask-cuDF DataFrame to a PyArrow Table.

[69]:
df.to_arrow()
[69]:
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
metadata
--------
{b'pandas': b'{"index_columns": [{"kind": "range", "name": null, "start": 0, "'
            b'stop": 20, "step": 1}], "column_indexes": [{"name": null, "field'
            b'_name": null, "pandas_type": "unicode", "numpy_type": "object", '
            b'"metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "a", "'
            b'field_name": "a", "pandas_type": "int64", "numpy_type": "int64",'
            b' "metadata": null}, {"name": "b", "field_name": "b", "pandas_typ'
            b'e": "int64", "numpy_type": "int64", "metadata": null}, {"name": '
            b'"c", "field_name": "c", "pandas_type": "int64", "numpy_type": "i'
            b'nt64", "metadata": null}, {"name": "agg_col1", "field_name": "ag'
            b'g_col1", "pandas_type": "int64", "numpy_type": "int64", "metadat'
            b'a": null}, {"name": "agg_col2", "field_name": "agg_col2", "panda'
            b's_type": "int64", "numpy_type": "int64", "metadata": null}], "cr'
            b'eator": {"library": "pyarrow", "version": "0.14.1"}, "pandas_ver'
            b'sion": "0.24.2"}'}
[70]:
ddf.compute().to_arrow()
[70]:
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"na'
            b'me": null, "field_name": null, "pandas_type": "unicode", "numpy_'
            b'type": "object", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"name": "a", "field_name": "a", "pandas_type": "int64", "nump'
            b'y_type": "int64", "metadata": null}, {"name": "b", "field_name":'
            b' "b", "pandas_type": "int64", "numpy_type": "int64", "metadata":'
            b' null}, {"name": "c", "field_name": "c", "pandas_type": "int64",'
            b' "numpy_type": "int64", "metadata": null}, {"name": "agg_col1", '
            b'"field_name": "agg_col1", "pandas_type": "int64", "numpy_type": '
            b'"int64", "metadata": null}, {"name": "agg_col2", "field_name": "'
            b'agg_col2", "pandas_type": "int64", "numpy_type": "int64", "metad'
            b'ata": null}, {"name": null, "field_name": "__index_level_0__", "'
            b'pandas_type": "int64", "numpy_type": "int64", "metadata": null}]'
            b', "creator": {"library": "pyarrow", "version": "0.14.1"}, "panda'
            b's_version": "0.24.2"}'}

Getting Data In/Out

CSV

Writing to a CSV file.

[71]:
if not os.path.exists('example_output'):
    os.mkdir('example_output')

df.to_csv('example_output/foo.csv', index=False)
[72]:
ddf.compute().to_csv('example_output/foo_dask.csv', index=False)

Reading from a csv file.

[73]:
df = cudf.read_csv('example_output/foo.csv')
df
[73]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0
[74]:
ddf = dask_cudf.read_csv('example_output/foo_dask.csv')
ddf.compute()
[74]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

Reading all CSV files in a directory into a single dask_cudf.DataFrame, using the star wildcard.

[75]:
ddf = dask_cudf.read_csv('example_output/*.csv')
ddf.compute()
[75]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

Parquet

Writing to parquet files, using the CPU via PyArrow.

[76]:
df.to_parquet('example_output/temp_parquet')
/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/io/parquet.py:68: UserWarning: Using CPU via PyArrow to write Parquet dataset, this will be GPU accelerated in the future
  "Using CPU via PyArrow to write Parquet dataset, this will "

Reading parquet files with a GPU-accelerated parquet reader.

[77]:
df = cudf.read_parquet('example_output/temp_parquet/81389eafdff74e20bcff94c134f6f162.parquet')
df
[77]:
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

Writing to parquet files from a dask_cudf.DataFrame using PyArrow under the hood.

[1]:
ddf.to_parquet('example_files')

ORC

Reading ORC files.

[ ]:
df2 = cudf.read_orc('/rapids/cudf/python/cudf/cudf/tests/data/orc/TestOrcFile.test1.orc')
df2

Dask Performance Tips

Like Apache Spark, Dask operations are lazy. Instead of being executed at that moment, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.

Sometimes, though, we want to force the execution of operations. Calling persist on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we’re using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using wait. Wrapping an operation with wait will ensure it doesn’t begin executing until all necessary upstream operations have finished.

The snippets below provide basic examples, using LocalCUDACluster to create one dask-worker per GPU on the local machine. For more detailed information about persist and wait, please see the Dask documentation for persist and wait. Wait relies on the concept of Futures, which is beyond the scope of this tutorial. For more information on Futures, see the Dask Futures documentation. For more information about multi-GPU clusters, please see the dask-cuda library (documentation is in progress).

First, we set up a GPU cluster. With our client set up, Dask-cuDF computation will be distributed across the GPUs in the cluster.

[79]:
import time

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
client
[79]:

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 404.27 GB

Persisting Data

Next, we create our Dask-cuDF DataFrame and apply a transformation, storing the result as a new column.

[80]:
nrows = 10000000

df2 = cudf.DataFrame({'a':np.arange(nrows), 'b':np.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=5)
ddf2['c'] = ddf2['a'] + 5
ddf2
[80]:
Dask DataFrame Structure:
a b c
npartitions=5
0 int64 int64 int64
2000000 ... ... ...
... ... ... ...
8000000 ... ... ...
9999999 ... ... ...
Dask Name: assign, 20 tasks
[81]:
!nvidia-smi
Tue Aug 20 18:50:06 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.104      Driver Version: 410.104      CUDA Version: 10.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla T4            On   | 00000000:AF:00.0 Off |                    0 |
| N/A   51C    P0    29W /  70W |    803MiB / 15079MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            On   | 00000000:D8:00.0 Off |                    0 |
| N/A   46C    P0    27W /  70W |    129MiB / 15079MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
+-----------------------------------------------------------------------------+

Because Dask is lazy, the computation has not yet occurred. We can see that there are twenty tasks in the task graph and we’ve used about 800 MB of memory. We can force computation by using persist. By forcing execution, the result is now explicitly in memory and our task graph only contains one task per partition (the baseline).

[82]:
ddf2 = ddf2.persist()
ddf2
[82]:
Dask DataFrame Structure:
a b c
npartitions=5
0 int64 int64 int64
2000000 ... ... ...
... ... ... ...
8000000 ... ... ...
9999999 ... ... ...
Dask Name: assign, 5 tasks
[83]:
!nvidia-smi
Tue Aug 20 18:50:29 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.104      Driver Version: 410.104      CUDA Version: 10.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla T4            On   | 00000000:AF:00.0 Off |                    0 |
| N/A   51C    P0    29W /  70W |   1125MiB / 15079MiB |      2%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            On   | 00000000:D8:00.0 Off |                    0 |
| N/A   46C    P0    27W /  70W |    563MiB / 15079MiB |      1%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
+-----------------------------------------------------------------------------+

Because we forced computation, we now have a larger object in distributed GPU memory.

Wait

Depending on our workflow or distributed computing setup, we may want to wait until all upstream tasks have finished before proceeding with a specific function. This section shows an example of this behavior, adapted from the Dask documentation.

First, we create a new Dask DataFrame and define a function that we’ll map to every partition in the dataframe.

[87]:
nrows = 10000000

df1 = cudf.DataFrame({'a':np.arange(nrows), 'b':np.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)

def func(df):
    time.sleep(np.random.randint(1, 60))
    return (df + 5) * 3 - 11

This function will do a basic transformation of every column in the dataframe, but the time spent in the function will vary due to the time.sleep statement randomly adding 1-60 seconds of time. We’ll run this on every partition of our dataframe using map_partitions, which adds the task to our task-graph, and store the result. We can then call persist to force execution.

[88]:
results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()

However, some partitions will be done much sooner than others. If we had downstream processes that should wait for all partitions to be completed, we can enforce that behavior using wait.

[89]:
wait(results_ddf)
[89]:
DoneAndNotDoneFutures(done={<Future: status: finished, type: DataFrame, key: ('func-ce75665b80c984639a37a86847d7de7d', 3)>, <Future: status: finished, type: DataFrame, key: ('func-ce75665b80c984639a37a86847d7de7d', 4)>, <Future: status: finished, type: DataFrame, key: ('func-ce75665b80c984639a37a86847d7de7d', 1)>, <Future: status: finished, type: DataFrame, key: ('func-ce75665b80c984639a37a86847d7de7d', 0)>, <Future: status: finished, type: DataFrame, key: ('func-ce75665b80c984639a37a86847d7de7d', 2)>}, not_done=set())

With wait, we can safely proceed on in our workflow.