Processors#

Processors are the core logic units of apps. They are responsible for processing data and generating output.

Processors receive input in the form of assets, collections and their combinations. Assets can be understood as files or folders, while the term collection refers to tabular data. Processor can have multiple inputs and can output multiple objects.

How to define a processor?#

To define a processor, you have to decorate a function with the @processor decorator.

from malevich.square import processor, Context

@processor()
def my_processor(input1, input2, context: Context):
    # do something with input1 and input2
    return output1, output2

The function have to follow the following conventions:

1. Each of argument but the one annotated with context: Context has to be a reference to a particular input. The input can be either an output of a previous processor, a collection, or an asset. 2. The function has to return either a single dataframe or a tuple of dataframes.

Each of the input references to the output of exactly one previous processor or collection. Assume, the following pipeline:

graph LR A[train_model] --> |"| model, metrics |"| B[predict] C[prediction_data] --> B[predict]

and the following code:

from malevich.square import processor


@processor()
def train_model(data):
    ...
    return model, metrics


@processor()
def predict(train_outcome, data_for_prediction):
    model, metrics = train_outcome
    ...
    return predictions

In this case, train_outcome refers to the output of train_model and data_for_prediction refers to data in prediction_data collection. To access model and metrics, you have to unpack the train_outcome variable.

DF, DFS, Sink and OBJ#

Malevich makes use of specific data types when passing data between processors. Each of these types denote a specific entity that processor can receive as an input or return as an output.

  • DF - a single instance of tabular data. The table can follow a specific schema.

  • DFS - a collection of tabular data. The collection can be bound by a specific number of tables or be unlimited. Also, it can impose a schema on each table.

  • Sink - a collection of DFS that allows you to denote a processor capable of being link to unbounded number of processors.

  • OBJ - a collection of files that can hold arbitrary binary data.

See, how they are applied in the following example:

from malevich.square import processor, DF, Sink, OBJ, obj


    @processor()
    def train_model(data: DF['TrainData']) -> tuple[OBJ, DF['Metrics']]:
        ...
        return model, metrics


    @processor()
    def predict(
        train_outcome: DFS['obj', 'Metrics'],
        data_for_prediction: DF["ValidationData"]
    ) -> DF["Predictions"]:
        model, metrics = train_outcome
        ...
        return predictions

Context schema#

You may define a schema for the context object.

from malevich.square import processor, Context, schema

@schema()   # Makes the class pydantic
class MySchema:
    param1: str
    param2: int

@processor()
def my_processor(input1, input2, context: Context[MySchema]):
    context.app_cfg.param1 # Access to the app configuration as a model

Dataframe schema#

Also, you may define a schema for the dataframe. You may use primitive types to define columns you expect in the dataframe.

from malevich.square import processor, DF, schema

@schema()
class MySchema:
    total: str
    title: int

@processor()
def my_processor(input1: DF[MySchema], input2: DF[MySchema]):
    pass

The inputs will be validated and remapped if possible. For example, the processor was called with a dataframe with columns num_elements of type int, and text of type str, the processor will remap the columns to title and total respectively.

The rules of remapping are as follows:

  1. If the number of columns are the same, and the order of column types is the same, the columns will be remapped.

  2. Columns with the same name will be mapped to each other, the rest will be mapped by the first rule.