malevich._autoflow#

Malevich Autoflow is a dependency tracking engine allowing us to automatically detect flows and build execution graphs from the order of function calls.

Autoflow is built on following ideas:

  1. The part of code that is graph is built for is called within Flow context manager.

  2. The functions that represent processor nodes are decorated with autotrace or autotrace decorators.

  3. Objects that are passed to the functions are wrapped with traced class.

Usually, the engine is working under the hood and you don’t need to interact with it directly. For example, flow() function that is used to decorated flow definitions, is already doing all the magic using Autoflow engine.

However, if you want to build your own flow definition, you can use Autoflow engine directly.

malevich._autoflow.function.autotrace(func: Callable[[C], R]) Callable[[C], R]#

Function decorator that enables automatic dependency tracking

The result is turned into traced object (if it is not already).

Then, all arguments that are traced are linked to the result by calling traced.calledby() method with results being the ‘caller’.

The link produced is ArgumentLink <malevich.models.argument.ArgumentLink with index set to the argument position and name set to the argument name. If the argument is passed as a keyword argument, and the function accepts the argument as **kwargs, raises a warning and does not link the argument.

malevich._autoflow.function.sinktrace(func: Callable[[C], R]) Callable[[C], R]#

A special form of autotrace to trace functions with *args

This decorator is applied to processors that contains

AutoFlow

The package provides a context manager for providing an access to an execution tree in function contexts.

class malevich._autoflow.flow.Flow(*args, **kwargs)#

Context-manager that holds the execution tree

The class is a singleton and can be used as a context-manager to create a new execution tree. It holds a stack of trees to maintain nested flows. The stack is thread-safe.

Example:

isinflow() bool#

Check if the current execution is inside a flow

flow_ref() ExecutionTree#

Returns the reference to the current flow in the context

static get_stack() tuple[ExecutionTree]#

Returns immutable stack of execution trees

class malevich._autoflow.tracer.root(*, id: str = None)#

A root element of the execution tree

Used as a default value for the owner of the traced objects

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=False, default_factory=<lambda>)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class malevich._autoflow.tracer.autoflow(tree: ExecutionTree[T, Any])#

Autoflow is a bridge between traced objects and the execution tree

It holds both the reference to the execution tree and the reference to the traced object. Can be reattached to another traced object. Provides interfaces for reporting a new dependency in the execution tree.

attach(component: T) None#

Attach the autoflow to a traced object

calledby(caller: traced, argument: str | None = None) None#

Report a new dependency in the execution tree

class malevich._autoflow.tracer.traced(owner: T = root(id='7d6c40dce1fb4987817facfd4b895b33'))#

Traced objects are wrappers around regular Python objects They hold a reference to the current execution tree and provides an access to autoflow to enable dependency tracking

These objects CAN ONLY BE CREATED inside a flow.

claim(owner: T) None#

Claim the ownership of the tracer

class malevich._autoflow.tracer.tracedLike(owner: T = root(id='75ae416d42754bf89b339fd3a4ed4f86'))#

This class is used to immitate the traced object without actually tracing it

It is used to provide homogeneous interface for tree nodes without attaching them to a particular tree.

These objects CAN BE CREATED from outside of a flow.

claim(owner: T) None#

Claim the ownership of the tracer

exception malevich._autoflow.tree.BadEdgeError(message: str, edge: tuple[T, T, LinkType])#
class malevich._autoflow.tree.ExecutionTree(tree: list[tuple[T, T, LinkType]] | None = None)#

An utility class holding a low-level representation of the execution graph

The class provides a set of basic operations on the graph, such as adding edges, pruning, and traversing.

The tree is stored both as set of nodes and as a list of edges. The nodes can be any hashable and comparable class, while the edges are triples of the form (callee, caller, link) where caller and callee are nodes and link is an arbitrary object.

The connection is directed from callee to caller. This is because caller(callee) notation requires the callee to be evaluated first.

graph LR Callee --> Caller
put_edge(callee: T, caller: T, link: LinkType = None) None#

Add an edge to the execution tree

Parameters:
  • callee (T) – The source node (a.k.a. Callee)

  • caller (T) – The target node (a.k.a. caller)

  • link (LinkType, optional) – An arbitrary object that represents the edge payload (e.g. argument name). Defaults to None.

Raises:

BadEdgeError – If the edge already exists, or if the edge is a self-edge

prune(outer_nodes: list[T]) None#

Removes specified nodes from the execution tree

Parameters:

outer_nodes (list[T]) – List of nodes to remove

edges_from(node: T) None#

Returns all edges starting from the specified node

traverse() Iterator[tuple[T, T, LinkType]]#

Traverse the execution tree

The traversal is performed in a multi-source breadth-first manner.

Returns:

Generator of nodes

Return type:

Generator[T]

leaves() Iterable[T]#

Returns all leaves of the execution tree

static connected(a: ExecutionTree, b: ExecutionTree) bool#

Checks if two execution trees are connected

nodes() Iterable[T]#

Returns all nodes of the execution tree