malevich.square.df#
DF
, DFS
, OBJ
, M
and Sink
are special types
used to denote a specification of units like processors in your apps.
- class malevich.square.df.DF(df: DataFrame)#
Wrapper class for tabular data.
DF (and DFS) classes are used to denote tabular data. They can specify the scheme of the data. The scheme can be a reference tp class adecorated with
malevich.square.jls.scheme()
or simply a string containing the name of the scheme.DF may follow an interface of different data frames implementation. But, most of the time, it is just a wrapper of
pandas.DataFrame
. So, you can use all the methods ofpandas.DataFrame
directly on DF object.Warning
You should not construct DF directly. Instead, when returning results from processors, you should use any of supported data frames classes directly.
For example, you may return a
pandas.DataFrame
:import pandas as pd @processor() def my_processor(df: DF): return pd.DataFrame(...)
- scheme_name()#
Returns the name of the scheme of the data frame.
- class malevich.square.df.DFS#
Wrapper class for tabular data.
DFS is a container for multiple DFs. It is used to denote an output of processors that return multiple data frames.
Each of the elements of DFS is also a
DF
orDFS
.Usage#
DFS is primarily used to denote types of arguments of processors. There are couple of cases to consider:
1. Explicit number of inputs#
Once you know the number of inputs, and their schemes, you can use DFS in the following way:
from typing import Any from malevich.square import DFS, processor @processor() def my_processor(dfs: DFS["users", Any]): ...
Here, we have one input argument (from either collection or previous app) that consists of two data frames. The first data frame has scheme “users”, and the second data frame has an arbitrary scheme.
1. Variable number of inputs#
You may also assume an unbouded number of inputs. In this case, you should use
malevich.square.jls.M
together with DFS:from typing import Any from malevich.square import DFS, M, processor @processor() def process_tables(dfs: DFS[M["sql_tables"]]): ... @processor() def process_user_data(dfs: DFS["users", M[Any]]): ...
Here, we have two processors. The first one accepts any number of data frames with scheme “sql_tables”. The second one accepts one data frame with scheme “users”, and any number of data frames with arbitrary schemes as one argument.
Note
When iterating over argument of type
DFS[M["sql_tables"]]
, it will contain exactly one element of type DFS, which will consist of a number of data frames with scheme “sql_tables”.When iterating over argument of type
DFS["users", M[Any]]
, the first element will be of type DF, and the second element will be of type DFS, consisting of data frames with arbitrary schemes.- __len__() int #
Returns the number of elements in the DFS
- class malevich.square.df.OBJ(path: str)#
Wrapper class that represents files (or folders)
Used in the same way as
DF
, but provides additional functionality to work with files and folders.- property path: str#
A real path to the binary object (or a folder)
- property raw: bytes#
Reads the binary object from the path
- property df: DataFrame#
Reads the asset as a data frame as .csv file
- Raises:
Exception – If asset is not pointed to a .csv file
- class malevich.square.df.M#
Special indicator to be used in
DFS
to denote variable number of inputs.See
DFS
for more details.
- class malevich.square.df.Sink#
Wrapper class to denote a specific type inputs to processor
Normally, each argument in processor function signature corresponds to exactly one output of the previous processor or exactly one collection.
To denote a processor that is able to accept a variable number of inputs, you should use this class.
from typing import Any from malevich.square import Sink, DFS, M, processor @processor() def merge_tables_sink(dfs: Sink["sql_tables"]): pass @processor() def merge_tables_dfs(dfs: DFS[M["sql_tables"]]): pass
Here, we have two processors.
Sink[schema]
is equivalent toList[DFS[M[schema]]]
.The difference between two processors lies in the fact that the first one can be connected to any number of processors that return data frames with scheme “sql_tables”, while the second one can be connected to exactly one processor that returns any number of data frames with scheme “sql_tables”.
See the difference visually:
graph TD C1[Prev. processor] -->|table_1, table_2| B[merge_tables_dfs] C2[Collection 1] -->|table_1| A[merge_tables_sink] C3[Collection 2] -->|table_2| A[merge_tables_sink] C4[Prev. processor] -->|table_3, table_4| A[merge_tables_sink]In this case, in function
merge_tables_sink
, accessingdfs[0]
will return aDFS
object consisting of a single data frame with scheme “sql_tables”, but accessingdfs[2]
will return aDFS
with two data frames with scheme “sql_tables” inside.In case of
merge_tables_dfs
, accessingdfs[0]
will return aDFS
object consisting of a two data frames with scheme “sql_tables”. There is no way to connect more than one processor tomerge_tables_dfs
.Note
In case there are other arguments in processor with
Sink
argument, they will be mapped to non-sink arguments and Sink will greedily collect the rest.In other words, first and last inputs will be connected to non-Sink arguments (if there are any), and the rest will be included into Sink.
@processor() def merge_tables_sink( table_1: DF, dfs: Sink["sql_tables"] table_2: DF, ): pass
graph TD C1[App 1] -->|table_1| A[merge_tables_sink] C2[Collection 1] -->|table_2| A[merge_tables_sink] C4[App 2] -->|table_4, table_5| A[merge_tables_sink] C3[Collection 2] -->|table_3| A[merge_tables_sink]Consider the example above. In this case,
table_1
will come fromApp 1
,table_3
will come fromCollection 2
, and the rest of input data frames (table_2
,table_4
andtable_5
) will be included intodfs
argument. Accessingdfs[0]
will return aDFS
object consisting of a single data frame (table_2
), but accessingdfs[1]
will return aDFS
with two data frames (table_4
andtable_5
).