malevich.square.df#

DF, DFS, OBJ, M and Sink are special types used to denote a specification of units like processors in your apps.

class malevich.square.df.DF(df: DataFrame)#

Wrapper class for tabular data.

DF (and DFS) classes are used to denote tabular data. They can specify the scheme of the data. The scheme can be a reference tp class adecorated with malevich.square.jls.scheme() or simply a string containing the name of the scheme.

DF may follow an interface of different data frames implementation. But, most of the time, it is just a wrapper of pandas.DataFrame. So, you can use all the methods of pandas.DataFrame directly on DF object.

Warning

You should not construct DF directly. Instead, when returning results from processors, you should use any of supported data frames classes directly.

For example, you may return a pandas.DataFrame:

import pandas as pd

@processor()
def my_processor(df: DF):
    return pd.DataFrame(...)
scheme_name()#

Returns the name of the scheme of the data frame.

class malevich.square.df.DFS#

Wrapper class for tabular data.

DFS is a container for multiple DFs. It is used to denote an output of processors that return multiple data frames.

Each of the elements of DFS is also a DF or DFS.

Usage#

DFS is primarily used to denote types of arguments of processors. There are couple of cases to consider:

1. Explicit number of inputs#

Once you know the number of inputs, and their schemes, you can use DFS in the following way:

from typing import Any
from malevich.square import DFS, processor

@processor()
def my_processor(dfs: DFS["users", Any]):
    ...

Here, we have one input argument (from either collection or previous app) that consists of two data frames. The first data frame has scheme “users”, and the second data frame has an arbitrary scheme.

1. Variable number of inputs#

You may also assume an unbouded number of inputs. In this case, you should use malevich.square.jls.M together with DFS:

from typing import Any
from malevich.square import DFS, M, processor

@processor()
def process_tables(dfs: DFS[M["sql_tables"]]):
    ...

@processor()
def process_user_data(dfs: DFS["users", M[Any]]):
    ...

Here, we have two processors. The first one accepts any number of data frames with scheme “sql_tables”. The second one accepts one data frame with scheme “users”, and any number of data frames with arbitrary schemes as one argument.

Note

When iterating over argument of type DFS[M["sql_tables"]], it will contain exactly one element of type DFS, which will consist of a number of data frames with scheme “sql_tables”.

When iterating over argument of type DFS["users", M[Any]], the first element will be of type DF, and the second element will be of type DFS, consisting of data frames with arbitrary schemes.

__getitem__(key: int) DF | DFS | OBJ | None#

Returns the i-th element of the DFS

__iter__() Iterator[DF | DFS | OBJ | None]#

Returns an iterator over the elements of the DFS

__len__() int#

Returns the number of elements in the DFS

class malevich.square.df.OBJ(path: str)#

Wrapper class that represents files (or folders)

Used in the same way as DF, but provides additional functionality to work with files and folders.

property path: str#

A real path to the binary object (or a folder)

property raw: bytes#

Reads the binary object from the path

property as_df: DF[obj]#

df, file paths

property df: DataFrame#

Reads the asset as a data frame as .csv file

Raises:

Exception – If asset is not pointed to a .csv file

class malevich.square.df.M#

Special indicator to be used in DFS to denote variable number of inputs.

See DFS for more details.

class malevich.square.df.Sink#

Wrapper class to denote a specific type inputs to processor

Normally, each argument in processor function signature corresponds to exactly one output of the previous processor or exactly one collection.

To denote a processor that is able to accept a variable number of inputs, you should use this class.

from typing import Any
from malevich.square import Sink, DFS, M, processor

@processor()
def merge_tables_sink(dfs: Sink["sql_tables"]):
    pass

@processor()
def merge_tables_dfs(dfs: DFS[M["sql_tables"]]):
    pass

Here, we have two processors. Sink[schema] is equivalent to List[DFS[M[schema]]].

The difference between two processors lies in the fact that the first one can be connected to any number of processors that return data frames with scheme “sql_tables”, while the second one can be connected to exactly one processor that returns any number of data frames with scheme “sql_tables”.

See the difference visually:

graph TD C1[Prev. processor] -->|table_1, table_2| B[merge_tables_dfs] C2[Collection 1] -->|table_1| A[merge_tables_sink] C3[Collection 2] -->|table_2| A[merge_tables_sink] C4[Prev. processor] -->|table_3, table_4| A[merge_tables_sink]

In this case, in function merge_tables_sink, accessing dfs[0] will return a DFS object consisting of a single data frame with scheme “sql_tables”, but accessing dfs[2] will return a DFS with two data frames with scheme “sql_tables” inside.

In case of merge_tables_dfs, accessing dfs[0] will return a DFS object consisting of a two data frames with scheme “sql_tables”. There is no way to connect more than one processor to merge_tables_dfs.

Note

In case there are other arguments in processor with Sink argument, they will be mapped to non-sink arguments and Sink will greedily collect the rest.

In other words, first and last inputs will be connected to non-Sink arguments (if there are any), and the rest will be included into Sink.

@processor()
def merge_tables_sink(
    table_1: DF,
    dfs: Sink["sql_tables"]
    table_2: DF,
):
    pass
graph TD C1[App 1] -->|table_1| A[merge_tables_sink] C2[Collection 1] -->|table_2| A[merge_tables_sink] C4[App 2] -->|table_4, table_5| A[merge_tables_sink] C3[Collection 2] -->|table_3| A[merge_tables_sink]

Consider the example above. In this case, table_1 will come from App 1, table_3 will come from Collection 2, and the rest of input data frames (table_2, table_4 and table_5) will be included into dfs argument. Accessing dfs[0] will return a DFS object consisting of a single data frame (table_2), but accessing dfs[1] will return a DFS with two data frames (table_4 and table_5).