CLX Workflow

This is an introduction to the CLX Workflow and it’s I/O components.

What is a CLX Workflow?

A CLX Workflow receives data from a particular source, performs operations on that data within a GPU dataframe, and outputs that data to a particular destination. This guide will teach you how to configure your workflow inputs and outputs around a simple workflow example.

When to use a CLX Workflow

A CLX Workflow provides a simple and modular way of “plugging in” a particular workflow to a read from different inputs and outputs. Use a CLX Workflow when you would like to deploy a workflow as part of a data pipeline.

A simple example of a custom Workflow

[8]:
from clx.workflow.workflow import Workflow
class CustomWorkflow(Workflow):
    def workflow(self, dataframe):
        dataframe["enriched"] = "enriched output"
        return dataframe

The Workflow relies on the Workflow class which handles the I/O and general data processing functionality. To implement a new Workflow, the developer need only implement the workflow function which receives an input dataframe, as shown above.

A more advanced example of a Worlflow can be found here. It is an example of a Splunk Alert Workflow used to find anamolies in Splunk alert data.

Workflow I/O Components

In order to deploy a workflow to an input an output data feed, we integrate the CLX I/O components.

Let’s look quickly at what a workflow configuration for the source and destination might look like. You can see below we declare each of the properties within a dictionary. For more information on how to declare configuration within a configurable yaml file [go].

[4]:
source = {
   "type": "fs",
   "input_format": "csv",
   "input_path": "/full/path/to/input/data",
   "schema": ["raw"],
   "delimiter": ",",
   "required_cols": ["raw"],
   "dtype": ["str"],
   "header": 0
}
destination = {
   "type": "fs",
   "output_format": "csv",
   "output_path": "/full/path/to/output/data"
}

The first step to configuring the input and output of a workflow is to determine the source and destination type. Then to set the associated parameters for that specific type. As seen above the type property is listed first and can be one of the following.

Source Types

  • fs - Read from a local filesystem

  • dask_fs - Increase the speed of GPU workflow operations by reading from a file using Dask

  • kafka - Read from Kafka

Destination Types

  • fs - Writing to local filesystem

  • kafka - Write to Kafka

Filesystem

If the fs type is used, the developer must distinguish the data format using the input_format attribute. Formats available are: csv, parquet, and orc.

The associated parameters available for the fs type and input_format are documented within the cuDF I/O API. For example for reading data from a csv file, reference cudf.io.csv.read_csv available parameters.

Example

[ ]:
source = {
   "type": "fs",
   "input_format": "parquet",
   "input_path": "/full/path/to/input/data",
   "columns": ["x"]
}

Dask Filesystem

If the dask_fs type is used the developer must distinguish the data format using the input_format attribute. Formats available are: csv, parquet, and orc.

The associated parameters available for the dask_fs type and input_format are listed within the Dask cuDF documentation.

Example

[ ]:
source = {
   "type": "dask_fs",
   "input_format": "csv",
   "input_path": "/full/path/to/input/data/*.csv"
}

Kafka

If the kafka type is used the following parameters must be indicated

Source

  • kafka_brokers - Kafka brokers

  • group_id - Group ID for consuming kafka messages

  • consumer_kafka_topics - Names of kafka topics to read from

  • batch_size - Indicates number of kafka messages to read before data is processed through the workflow

  • time_window - Maximum time window to wait for batch_size to be reached before workflow processing begins.

Destination

  • kafka_brokers - Kafka brokers

  • publisher_kafka_topic - Names of kafka topic to write data to

  • batch_size - Indicates number of workflow-processed messages to aggregate before data is written to the kafka topic

  • output_delimiter - Delimiter of the data columns

Example

[ ]:
source = {
    "type": "kafka",
    "kafka_brokers": "kafka:9092",
    "group_id": "cyber",
    "batch_size": 10,
    "consumer_kafka_topics": ["topic1", "topic2"],
    "time_window": 5
}
dest = {
    "type": "kafka",
    "kafka_brokers": "kafka:9092"
    "batch_size": 10,
    "publisher_kafka_topic": "topic3",
    "output_delimiter": ","
}

Tying it together

Once we have established our workflow and source and destination configurations we can now run our workflow. Let’s create a workflow using the CustomWorkflow we created above.

Firstly, we must know the parameters for instantiating a basic workflow

  • name - The name of the workflow

  • source - The source of input data (optional)

  • destination - The destination for output data (optional)

[ ]:
from clx.workflow.workflow import Workflow
class CustomWorkflow(Workflow):
    def workflow(self, dataframe):
        dataframe["enriched"] = "enriched output"
        return dataframe

source = {
   "type": "fs",
   "input_format": "csv",
   "input_path": "/full/path/to/input/data",
   "schema": ["raw"],
   "delimiter": ",",
   "required_cols": ["raw"],
   "dtype": ["str"],
   "header": 0
}
destination = {
   "type": "fs",
   "output_format": "csv",
   "output_path": "/full/path/to/output/data"
}

my_new_workflow = CustomWorkflow(source=source, destination=destination, name="my_new_workflow")
my_new_workflow.run_workflow()

Workflow configurations in an external file

Sometimes workflow configurations may need to change dependent upon the environment. To avoid declaring workflow configurations within sourcecode you may also declare them in an external yaml file. A workflow will look for and establish I/O connections by searching for configurations in the following order:

  1. /etc/clx/[workflow-name]/workflow.yaml

  2. ~/.config/clx/[workflow-name]/workflow.yaml

  3. In-line python config

If source and destination are indicated in external files, they are not required to instantiate a new workflow

[ ]:
# Workflow config located at /etc/clx/my_new_workflow/workflow.yaml
my_new_workflow = CustomWorkflow(name="my_new_workflow")