malevich.models#
- class malevich.models.flow_function.FlowFunction(f: Callable[[Params], R], reverse_id: str, name: str, description: str, **kwargs: Any)#
Class-decorator for a flow function
Captures the function and holds its metadata.
- __init__(f: Callable[[Params], R], reverse_id: str, name: str, description: str, **kwargs: Any) None #
- Parameters:
f (Callable[Params, R]) – The function that defines the flow
reverse_id (str) – Reverse ID of the flow
name (str) – Name of the flow
description (str) – Description of the function
**kwargs (Any) – Additional arguments to be passed to the flow component. See
ComponentSchema
for details.
- class malevich_space.schema.ComponentSchema(*, name: str, reverse_id: str, uid: str | None = None, visibility: list[str] = [], anticipated_python_deps: list[str] = [], anticipated_default: list[str] = [], anticipated_api_call: str | None = None, anticipated_api_name: str | None = None, example_code_before_juliusfication: str | None = None, description: str | None = None, designed_for: str | None = None, not_designed_for: str | None = None, designed_for_use_case: list[UseCaseSchema] = [], not_designed_for_use_case: list[UseCaseSchema] = [], tags: list[str] | None = None, app: AppSchema | None = None, flow: FlowSchema | None = None, collection: CollectionAliasSchema | None = None, asset: Asset | None = None, icon: str | None = None, hf_url: str | None = None, repo_url: str | None = None, docs_url: str | None = None, branch: BranchSchema | None = None, version: VersionSchema | None = None, required_schema: list[SchemaMetadata] = [])#
- class malevich.models.task.promised.PromisedStage(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- class malevich.models.task.promised.PromisedTask(results: traced[BaseNode] | Iterable[traced[BaseNode]] | None, tree: TreeNode, component: ComponentSchema)#
An intermediate representation of a task that can be interpreted
Interpretation of a task means attaching it to a particular platform and preparing it for execution. This is done by calling
interpret()
method.Once the task is interpreted, the following methods become available:
- property tree: TreeNode#
Returns the execution tree, encapsulated into TreeNode class
- get_stage() Any #
Retrieves the current stage of the task.
If the task has not been interpreted, the stage is NOT_INTERPRETED. Otherwise, the stage is defined by the underlying interpreted task.
- get_stage_class() Type #
Returns the enum class of the stage of the task
If the task has not been interpreted, the class is PromisedStage. Otherwise, the class is defined by the underlying interpreted task.
- interpret(interpreter: Interpreter = None) None #
Interprets the task with a particular interpreter
Attaching a task to a particular platform and preparing it for execution. This is done by calling
interpret()
. Once the task is interpreted, it can be prepared, run, stopped and its results can be fetched.The task might be re-interpreted with a different interpreter. In this case, the previous interpreter is discarded and the new one is used.
- Parameters:
interpreter (
malevich.interpreter.Interpreter
, optional) – Interpreter to use.malevich.interprete.SpaceInterpreter
is used when not specified. Defaults to None.
- prepare(*args, **kwargs) None #
Prepares necessary data for the task to be executed (run)
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- async_prepare(*args, **kwargs) None #
Asynchronously prepares necessary data for the task to be executed (run)
The method is non-blocking
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- async_run(*args, run_id: str | None = None, **kwargs) None #
Asynchronously runs the task with the given run_id
The method is non-blocking. Beware that you need to call
prepare()
and ensure that the task is prepared before calling this method. In case, you have usedasync_prepare()
, you need to wait for the task to be prepared before calling this method.
- async_stop(*args, **kwargs) None #
Asynchronously stops the task preventing it from further execution
- run(run_id: str | None = None, override: dict[str, Any] | None = None, *args, **kwargs) None #
Runs the task
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- stop(*args, **kwargs) None #
Stops the task
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- results(*args, **kwargs) list[BaseResult | CoreLocalDFResult | CoreResult | SpaceCollectionResult] #
Retrieve results of the task
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- commit_returned(returned: traced[BaseNode] | Iterable[traced[BaseNode]] | None) None #
Saves objects to determine the results of the task
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- dump() bytes #
Serialize the task to bytes, which can be saved and used to load it again
- static load(object_bytes: bytes) PromisedTask #
Static method. Deserialize bytes into task object
- get_injectables() list[BaseInjectable] #
Returns a list of possible injections into run
See the documentation of the corresponding interpreter used before calling this method.
- get_operations() list[str] #
Returns a list of operations
The term operations is defined by the interpreter used. See the documentation of the corresponding interpreter used before calling this method.
- configure(*operations: str, **kwargs) None #
Configures the task for a particular operation
What is configurable and how it is configurable is defined by the interpreter used. See the documentation of the corresponding interpreter used before calling this method.
- Parameters:
operation (str) – Operation to configure (one from
get_operations()
)**kwargs (Any) – Arguments to configure the operation
- get_interpreted_task() BaseTask #
Retrieves the interpreted task that is produced when calling
interpret()
- Returns:
The interpreted task
- Return type:
BaseTask
- publish(*args, **kwargs) MetaEndpoint #
Creates a HTTP endpoint for the task
Accepts any arguments and keyword arguments and passes them to the underlying callback created in the interpreter itself. For particular arguments and keyword arguments, see the documentation of the interpreter used before calling this method.
- Returns:
An endpoint object
- Return type:
malevich.models.endpoint.MetaEndpoint
- class malevich.models.results.BaseResult#
Result obtained running a flow.
Encapsulates the result object obtained after running a flow. The object can be a DataFrame, a list of DataFrames, binary data, etc.
- abstract property num_elements: int#
The number of elements (assets/collections) in the result
- abstract get() RealResultType #
Fetches a real result object (download, etc.)
- Returns:
The actual result
- Return type:
RealResultType
- get_df() DataFrame #
Returns a DataFrame from the result if possible
- Returns:
The DataFrame
- Return type:
pd.DataFrame
- get_dfs() list[DataFrame] #
Returns a list of DataFrames from the result if possible
- Returns:
The list of DataFrames
- Return type:
list[pd.DataFrame]
- get_binary() bytes #
Returns binary data from the result if possible
- Returns:
The binary data
- Return type:
bytes
- get_binary_dir() dict[str, bytes] #
Returns a dict of binary data from the result if possible
- Returns:
The dict of binary data
- Return type:
dict[str, bytes]
- class malevich.models.results.CoreLocalDFResult(coll: Collection, auth: Tuple[str, str], conn_url: str = 'https://core.malevich.ai/')#
A specification of
collection()
as a resultThis class is used to wrap a collection object as result. It simply stores collection object as a DataFrame and returns it when
get()
method is called.- num_elements() int #
The number of elements (assets/collections) in the result
- get() DataFrame | None #
Simply extracts saved data frame
- Returns:
Saved data frame
- Return type:
DataFrame
- class malevich.models.results.CoreResult(core_group_name: str, core_operation_id: str, core_run_id: str, conn_url: str, auth: Tuple[str, str])#
A representation of a result of execution of a flow on Malevich Core
Represents a single returned result, which can contain a collection, a list of collections, a single asset or a list of assets.
To be precise, for the function:
from malevich import flow, collection, CoreInterpreter from malevich.etl import process from malevich.utility import merge @flow() def data_process(): data = collection('my_fancy_smth', df=pd.DataFrame({'a': [1, 2, 3]})) processed = process(data) output = merge( processed, data, config={'how': 'inner', 'on': 'index'} ) return output, processed task = data_process() task.interpret(CoreInterpreter()) # list of CoreResult objects results = task() # single CoreResult object output = results[0] # list of CoreResultPayload objects print(results[0].get())
The output variable will be a single CoreResult object, which contains a single CoreResultPayload object, which contains a single data frame (the result of the merge operation). The processed variable will be a single CoreResult object, which contains a single CoreResultPayload object, which contains a single collection (the result of the process operation).
Note: The class is not intended to be constructed by user.
- property num_elements: int#
The number of elements (assets/collections) in the result
- get() list[CoreResultPayload] #
Retrieves results from the Core
Returns a list of CoreResultPayload objects, each of which contains an output from a single operation. The output might be a collection, a list of collection, a single file (asset) or a list of files (composite asset).
If you wish to get more specific results, use the following methods:
get_df()
to get a single data frame (if result is a collection)get_dfs()
to get a list of data frames (if result is a collection)get_binary()
to get a single binary (if result is an asset)get_binary_dir()
to get a dict of binaries (if result is a composite asset)
- Returns:
The list of results
- Return type:
list[CoreResultPayload]
- get_df() DataFrame #
Converts the result to a single data frame
It only works if the result is a single collection, otherwise it raises an error. Check the number of elements using
num_elements
property and use only if it is equal to 1- Returns:
The result as a data frame
- Return type:
DataFrame
- Raises:
NotImplementedError – If the result is not a single collection
- get_dfs() list[DataFrame] #
Converts the result to a list of data frames
If some of the results are assets, they are ignored in the output.
- Returns:
The result as a list of data frames
- Return type:
list[
DataFrame
]
- get_binary() bytes #
Retrieves asset binary data, if the result is file asset
It only works if the result is a single file returned as asset. Otherwise it raises an error. Check the number of elements using
num_elements
property and use only if it is equal to 1- Returns:
The binary data of the asset
- Return type:
bytes
- Raises:
NotImplementedError – If the result is not a single asset
- get_binary_dir() dict[str, bytes] #
Retrieves files from assets
Non-assets are ignored in the output
- Returns:
Dict of file names and their binary data
- Return type:
dict[str, bytes]
- class malevich.models.results.CoreResultPayload(data: DataFrame | list[bytes], is_asset: bool = False, is_composite_asset: bool = False, is_collection: bool = False, paths: list[str] | None = None)#
An actual information that is saved as result
A payload can be either asset or collection. If it is an asset, it can be a single file or a composite asset (a directory with multiple files). It represents one of results of a single operation.
Payload follows the following convention:
If
is_asset()
is True, thendata
isbytes
objectIf
is_collection()
is True, thendata
isDataFrame
objectIf
is_composite_asset()
is True, thendata
is a dict ofbytes
objects
- is_asset() bool #
Checks if the result is an asset
- Returns:
True if the result is an asset
- Return type:
bool
- is_composite_asset() bool #
Checks if the result is a composite asset
- Returns:
True if the result is a composite asset
- Return type:
bool
- is_collection() bool #
Checks if the result is a collection
- Returns:
True if the result is a collection
- Return type:
bool
- class malevich.models.results.SpaceCollectionResult(run_id: str, in_flow_id: str, space_ops: SpaceOps)#
Result obtained running a flow with Space.
Represents a set of collections obtained after running a flow with Space.
- num_elements() int #
The number of elements (assets/collections) in the result
- get() list[DataFrame] #
Fetches a list of DataFrames with the actual result
- Returns:
The actual result
- Return type:
list[
DataFrame
]
- class malevich.models.argument.ArgumentLink(*, index: int, name: str, compressed_edges: list[tuple[ArgumentLink, traced[ArgumentLinkNodeType]]] = [], shadow_collection: traced | None = None, is_compressed_edge: bool = False)#