CLX Workflow¶
This is an introduction to the CLX Workflow and it’s I/O components.
What is a CLX Workflow?¶
A CLX Workflow receives data from a particular source, performs operations on that data within a GPU dataframe, and outputs that data to a particular destination. This guide will teach you how to configure your workflow inputs and outputs around a simple workflow example.
When to use a CLX Workflow¶
A CLX Workflow provides a simple and modular way of “plugging in” a particular workflow to a read from different inputs and outputs. Use a CLX Workflow when you would like to deploy a workflow as part of a data pipeline.
A simple example of a custom Workflow¶
[8]:
from clx.workflow.workflow import Workflow
class CustomWorkflow(Workflow):
def workflow(self, dataframe):
dataframe["enriched"] = "enriched output"
return dataframe
The Workflow relies on the Workflow class which handles the I/O and general data processing functionality. To implement a new Workflow, the developer need only implement the workflow
function which receives an input dataframe, as shown above.
A more advanced example of a Worlflow can be found here. It is an example of a Splunk Alert Workflow used to find anamolies in Splunk alert data.
Workflow I/O Components¶
In order to deploy a workflow to an input an output data feed, we integrate the CLX I/O components.
Let’s look quickly at what a workflow configuration for the source and destination might look like. You can see below we declare each of the properties within a dictionary. For more information on how to declare configuration within a configurable yaml file [go].
[4]:
source = {
"type": "fs",
"input_format": "csv",
"input_path": "/full/path/to/input/data",
"schema": ["raw"],
"delimiter": ",",
"required_cols": ["raw"],
"dtype": ["str"],
"header": 0
}
destination = {
"type": "fs",
"output_format": "csv",
"output_path": "/full/path/to/output/data"
}
The first step to configuring the input and output of a workflow is to determine the source and destination type. Then to set the associated parameters for that specific type. As seen above the type
property is listed first and can be one of the following.
Source Types
fs
- Read from a local filesystemdask_fs
- Increase the speed of GPU workflow operations by reading from a file using Daskkafka
- Read from Kafka
Destination Types
fs
- Writing to local filesystemkafka
- Write to Kafka
Filesystem¶
If the fs
type is used, the developer must distinguish the data format using the input_format
attribute. Formats available are: csv, parquet, and orc.
The associated parameters available for the fs
type and input_format
are documented within the cuDF I/O API. For example for reading data from a csv file, reference cudf.io.csv.read_csv available parameters.
Example
[ ]:
source = {
"type": "fs",
"input_format": "parquet",
"input_path": "/full/path/to/input/data",
"columns": ["x"]
}
Dask Filesystem¶
If the dask_fs
type is used the developer must distinguish the data format using the input_format
attribute. Formats available are: csv, parquet, and orc.
The associated parameters available for the dask_fs
type and input_format
are listed within the Dask cuDF documentation.
Example
[ ]:
source = {
"type": "dask_fs",
"input_format": "csv",
"input_path": "/full/path/to/input/data/*.csv"
}
Kafka¶
If the kafka
type is used the following parameters must be indicated
Source
kafka_brokers
- Kafka brokersgroup_id
- Group ID for consuming kafka messagesconsumer_kafka_topics
- Names of kafka topics to read frombatch_size
- Indicates number of kafka messages to read before data is processed through the workflowtime_window
- Maximum time window to wait forbatch_size
to be reached before workflow processing begins.
Destination
kafka_brokers
- Kafka brokerspublisher_kafka_topic
- Names of kafka topic to write data tobatch_size
- Indicates number of workflow-processed messages to aggregate before data is written to the kafka topicoutput_delimiter
- Delimiter of the data columns
Example
[ ]:
source = {
"type": "kafka",
"kafka_brokers": "kafka:9092",
"group_id": "cyber",
"batch_size": 10,
"consumer_kafka_topics": ["topic1", "topic2"],
"time_window": 5
}
dest = {
"type": "kafka",
"kafka_brokers": "kafka:9092"
"batch_size": 10,
"publisher_kafka_topic": "topic3",
"output_delimiter": ","
}
Tying it together¶
Once we have established our workflow and source and destination configurations we can now run our workflow. Let’s create a workflow using the CustomWorkflow
we created above.
Firstly, we must know the parameters for instantiating a basic workflow
name
- The name of the workflowsource
- The source of input data (optional)destination
- The destination for output data (optional)
[ ]:
from clx.workflow.workflow import Workflow
class CustomWorkflow(Workflow):
def workflow(self, dataframe):
dataframe["enriched"] = "enriched output"
return dataframe
source = {
"type": "fs",
"input_format": "csv",
"input_path": "/full/path/to/input/data",
"schema": ["raw"],
"delimiter": ",",
"required_cols": ["raw"],
"dtype": ["str"],
"header": 0
}
destination = {
"type": "fs",
"output_format": "csv",
"output_path": "/full/path/to/output/data"
}
my_new_workflow = CustomWorkflow(source=source, destination=destination, name="my_new_workflow")
my_new_workflow.run_workflow()
Workflow configurations in an external file¶
Sometimes workflow configurations may need to change dependent upon the environment. To avoid declaring workflow configurations within sourcecode you may also declare them in an external yaml file. A workflow will look for and establish I/O connections by searching for configurations in the following order:
/etc/clx/[workflow-name]/workflow.yaml
~/.config/clx/[workflow-name]/workflow.yaml
In-line python config
If source and destination are indicated in external files, they are not required to instantiate a new workflow
[ ]:
# Workflow config located at /etc/clx/my_new_workflow/workflow.yaml
my_new_workflow = CustomWorkflow(name="my_new_workflow")