malevich.models#

class malevich.models.flow_function.FlowFunction(f: Callable[[Params], R], reverse_id: str, name: str, description: str, **kwargs: Any)#

Class-decorator for a flow function

Captures the function and holds its metadata.

__init__(f: Callable[[Params], R], reverse_id: str, name: str, description: str, **kwargs: Any) None#
Parameters:
  • f (Callable[Params, R]) – The function that defines the flow

  • reverse_id (str) – Reverse ID of the flow

  • name (str) – Name of the flow

  • description (str) – Description of the function

  • **kwargs (Any) – Additional arguments to be passed to the flow component. See ComponentSchema for details.

class malevich_space.schema.ComponentSchema(*, name: str, reverse_id: str, uid: str | None = None, visibility: list[str] = [], anticipated_python_deps: list[str] = [], anticipated_default: list[str] = [], anticipated_api_call: str | None = None, anticipated_api_name: str | None = None, example_code_before_juliusfication: str | None = None, description: str | None = None, designed_for: str | None = None, not_designed_for: str | None = None, designed_for_use_case: list[UseCaseSchema] = [], not_designed_for_use_case: list[UseCaseSchema] = [], tags: list[str] | None = None, app: AppSchema | None = None, flow: FlowSchema | None = None, collection: CollectionAliasSchema | None = None, asset: Asset | None = None, icon: str | None = None, hf_url: str | None = None, repo_url: str | None = None, docs_url: str | None = None, branch: BranchSchema | None = None, version: VersionSchema | None = None, required_schema: list[SchemaMetadata] = [])#
class malevich.models.task.promised.PromisedStage(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
class malevich.models.task.promised.PromisedTask(results: traced[BaseNode] | Iterable[traced[BaseNode]] | None, tree: TreeNode, component: ComponentSchema)#

An intermediate representation of a task that can be interpreted

Interpretation of a task means attaching it to a particular platform and preparing it for execution. This is done by calling interpret() method.

Once the task is interpreted, the following methods become available:

property tree: TreeNode#

Returns the execution tree, encapsulated into TreeNode class

get_stage() Any#

Retrieves the current stage of the task.

If the task has not been interpreted, the stage is NOT_INTERPRETED. Otherwise, the stage is defined by the underlying interpreted task.

get_stage_class() Type#

Returns the enum class of the stage of the task

If the task has not been interpreted, the class is PromisedStage. Otherwise, the class is defined by the underlying interpreted task.

interpret(interpreter: Interpreter = None) None#

Interprets the task with a particular interpreter

Attaching a task to a particular platform and preparing it for execution. This is done by calling interpret(). Once the task is interpreted, it can be prepared, run, stopped and its results can be fetched.

The task might be re-interpreted with a different interpreter. In this case, the previous interpreter is discarded and the new one is used.

Parameters:

interpreter (malevich.interpreter.Interpreter, optional) – Interpreter to use. malevich.interprete.SpaceInterpreter is used when not specified. Defaults to None.

prepare(*args, **kwargs) None#

Prepares necessary data for the task to be executed (run)

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

async_prepare(*args, **kwargs) None#

Asynchronously prepares necessary data for the task to be executed (run)

The method is non-blocking

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

async_run(*args, run_id: str | None = None, **kwargs) None#

Asynchronously runs the task with the given run_id

The method is non-blocking. Beware that you need to call prepare() and ensure that the task is prepared before calling this method. In case, you have used async_prepare(), you need to wait for the task to be prepared before calling this method.

async_stop(*args, **kwargs) None#

Asynchronously stops the task preventing it from further execution

run(run_id: str | None = None, override: dict[str, Any] | None = None, *args, **kwargs) None#

Runs the task

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

stop(*args, **kwargs) None#

Stops the task

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

results(*args, **kwargs) list[BaseResult | CoreLocalDFResult | CoreResult | SpaceCollectionResult]#

Retrieve results of the task

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

commit_returned(returned: traced[BaseNode] | Iterable[traced[BaseNode]] | None) None#

Saves objects to determine the results of the task

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

dump() bytes#

Serialize the task to bytes, which can be saved and used to load it again

static load(object_bytes: bytes) PromisedTask#

Static method. Deserialize bytes into task object

get_injectables() list[BaseInjectable]#

Returns a list of possible injections into run

See the documentation of the corresponding interpreter used before calling this method.

get_operations() list[str]#

Returns a list of operations

The term operations is defined by the interpreter used. See the documentation of the corresponding interpreter used before calling this method.

configure(*operations: str, **kwargs) None#

Configures the task for a particular operation

What is configurable and how it is configurable is defined by the interpreter used. See the documentation of the corresponding interpreter used before calling this method.

Parameters:
  • operation (str) – Operation to configure (one from get_operations())

  • **kwargs (Any) – Arguments to configure the operation

get_interpreted_task() BaseTask#

Retrieves the interpreted task that is produced when calling interpret()

Returns:

The interpreted task

Return type:

BaseTask

publish(*args, **kwargs) MetaEndpoint#

Creates a HTTP endpoint for the task

Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.

Returns:

An endpoint object

Return type:

malevich.models.endpoint.MetaEndpoint

class malevich.models.results.BaseResult#

Result obtained running a flow.

Encapsulates the result object obtained after running a flow. The object can be a DataFrame, a list of DataFrames, binary data, etc.

abstract property num_elements: int#

The number of elements (assets/collections) in the result

abstract get() RealResultType#

Fetches a real result object (download, etc.)

Returns:

The actual result

Return type:

RealResultType

get_df() DataFrame#

Returns a DataFrame from the result if possible

Returns:

The DataFrame

Return type:

pd.DataFrame

get_dfs() list[DataFrame]#

Returns a list of DataFrames from the result if possible

Returns:

The list of DataFrames

Return type:

list[pd.DataFrame]

get_binary() bytes#

Returns binary data from the result if possible

Returns:

The binary data

Return type:

bytes

get_binary_dir() dict[str, bytes]#

Returns a dict of binary data from the result if possible

Returns:

The dict of binary data

Return type:

dict[str, bytes]

class malevich.models.results.CoreLocalDFResult(coll: Collection, auth: Tuple[str, str], conn_url: str = 'https://core.malevich.ai/')#

A specification of collection() as a result

This class is used to wrap a collection object as result. It simply stores collection object as a DataFrame and returns it when get() method is called.

num_elements() int#

The number of elements (assets/collections) in the result

get() DataFrame | None#

Simply extracts saved data frame

Returns:

Saved data frame

Return type:

DataFrame

class malevich.models.results.CoreResult(core_group_name: str, core_operation_id: str, core_run_id: str, conn_url: str, auth: Tuple[str, str])#

A representation of a result of execution of a flow on Malevich Core

Represents a single returned result, which can contain a collection, a list of collections, a single asset or a list of assets.

To be precise, for the function:

from malevich import flow, collection, CoreInterpreter
from malevich.etl import process
from malevich.utility import merge

@flow()
def data_process():
    data = collection('my_fancy_smth', df=pd.DataFrame({'a': [1, 2, 3]}))
    processed = process(data)
    output = merge(
        processed,
        data,
        config={'how': 'inner', 'on': 'index'}
    )
    return output, processed

task = data_process()
task.interpret(CoreInterpreter())
# list of CoreResult objects
results = task()
# single CoreResult object
output = results[0]
# list of CoreResultPayload objects
print(results[0].get())

The output variable will be a single CoreResult object, which contains a single CoreResultPayload object, which contains a single data frame (the result of the merge operation). The processed variable will be a single CoreResult object, which contains a single CoreResultPayload object, which contains a single collection (the result of the process operation).

Note: The class is not intended to be constructed by user.

property num_elements: int#

The number of elements (assets/collections) in the result

get() list[CoreResultPayload]#

Retrieves results from the Core

Returns a list of CoreResultPayload objects, each of which contains an output from a single operation. The output might be a collection, a list of collection, a single file (asset) or a list of files (composite asset).

If you wish to get more specific results, use the following methods:

  • get_df() to get a single data frame (if result is a collection)

  • get_dfs() to get a list of data frames (if result is a collection)

  • get_binary() to get a single binary (if result is an asset)

  • get_binary_dir() to get a dict of binaries (if result is a composite asset)

Returns:

The list of results

Return type:

list[CoreResultPayload]

get_df() DataFrame#

Converts the result to a single data frame

It only works if the result is a single collection, otherwise it raises an error. Check the number of elements using num_elements property and use only if it is equal to 1

Returns:

The result as a data frame

Return type:

DataFrame

Raises:

NotImplementedError – If the result is not a single collection

get_dfs() list[DataFrame]#

Converts the result to a list of data frames

If some of the results are assets, they are ignored in the output.

Returns:

The result as a list of data frames

Return type:

list[DataFrame]

get_binary() bytes#

Retrieves asset binary data, if the result is file asset

It only works if the result is a single file returned as asset. Otherwise it raises an error. Check the number of elements using num_elements property and use only if it is equal to 1

Returns:

The binary data of the asset

Return type:

bytes

Raises:

NotImplementedError – If the result is not a single asset

get_binary_dir() dict[str, bytes]#

Retrieves files from assets

Non-assets are ignored in the output

Returns:

Dict of file names and their binary data

Return type:

dict[str, bytes]

class malevich.models.results.CoreResultPayload(data: DataFrame | list[bytes], is_asset: bool = False, is_composite_asset: bool = False, is_collection: bool = False, paths: list[str] | None = None)#

An actual information that is saved as result

A payload can be either asset or collection. If it is an asset, it can be a single file or a composite asset (a directory with multiple files). It represents one of results of a single operation.

Payload follows the following convention:

is_asset() bool#

Checks if the result is an asset

Returns:

True if the result is an asset

Return type:

bool

is_composite_asset() bool#

Checks if the result is a composite asset

Returns:

True if the result is a composite asset

Return type:

bool

is_collection() bool#

Checks if the result is a collection

Returns:

True if the result is a collection

Return type:

bool

class malevich.models.results.SpaceCollectionResult(run_id: str, in_flow_id: str, space_ops: SpaceOps)#

Result obtained running a flow with Space.

Represents a set of collections obtained after running a flow with Space.

num_elements() int#

The number of elements (assets/collections) in the result

get() list[DataFrame]#

Fetches a list of DataFrames with the actual result

Returns:

The actual result

Return type:

list[DataFrame]

get_dfs() list[DataFrame]#

Returns a list of DataFrames from the result.

Simply calls get()

Returns:

The list of DataFrames

Return type:

list[DataFrame]

get_df() DataFrame#

Returns a DataFrame from the result if it is a single DataFrame.

Simply calls get() and returns the first element if the list has only one element, otherwise raises an error.

Returns:

The result DataFrame

Return type:

DataFrame