malevich._autoflow#
Malevich Autoflow is a dependency tracking engine allowing us to automatically detect flows and build execution graphs from the order of function calls.
Autoflow is built on following ideas:
The part of code that is graph is built for is called within
Flow
context manager.The functions that represent processor nodes are decorated with
autotrace
orautotrace
decorators.Objects that are passed to the functions are wrapped with
traced
class.
Usually, the engine is working under the hood and you don’t need to interact with it directly. For example,
flow()
function that is used to decorated flow definitions, is already doing all the magic using Autoflow engine.
However, if you want to build your own flow definition, you can use Autoflow engine directly.
- malevich._autoflow.function.autotrace(func: Callable[[C], R]) Callable[[C], R] #
Function decorator that enables automatic dependency tracking
The result is turned into
traced
object (if it is not already).Then, all arguments that are traced are linked to the result by calling
traced.calledby()
method with results being the ‘caller’.The link produced is
ArgumentLink <malevich.models.argument.ArgumentLink
withindex
set to the argument position andname
set to the argument name. If the argument is passed as a keyword argument, and the function accepts the argument as **kwargs, raises a warning and does not link the argument.
- malevich._autoflow.function.sinktrace(func: Callable[[C], R]) Callable[[C], R] #
A special form of autotrace to trace functions with *args
This decorator is applied to processors that contains
AutoFlow
The package provides a context manager for providing an access to an execution tree in function contexts.
- class malevich._autoflow.flow.Flow(*args, **kwargs)#
Context-manager that holds the execution tree
The class is a singleton and can be used as a context-manager to create a new execution tree. It holds a stack of trees to maintain nested flows. The stack is thread-safe.
Example:
- isinflow() bool #
Check if the current execution is inside a flow
- flow_ref() ExecutionTree #
Returns the reference to the current flow in the context
- static get_stack() tuple[ExecutionTree] #
Returns immutable stack of execution trees
- class malevich._autoflow.tracer.root(*, id: str = None)#
A root element of the execution tree
Used as a default value for the owner of the traced objects
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=False, default_factory=<lambda>)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class malevich._autoflow.tracer.autoflow(tree: ExecutionTree[T, Any])#
Autoflow is a bridge between traced objects and the execution tree
It holds both the reference to the execution tree and the reference to the traced object. Can be reattached to another traced object. Provides interfaces for reporting a new dependency in the execution tree.
- attach(component: T) None #
Attach the autoflow to a traced object
- class malevich._autoflow.tracer.traced(owner: T = root(id='7d6c40dce1fb4987817facfd4b895b33'))#
Traced objects are wrappers around regular Python objects They hold a reference to the current execution tree and provides an access to
autoflow
to enable dependency trackingThese objects CAN ONLY BE CREATED inside a flow.
- claim(owner: T) None #
Claim the ownership of the tracer
- class malevich._autoflow.tracer.tracedLike(owner: T = root(id='75ae416d42754bf89b339fd3a4ed4f86'))#
This class is used to immitate the traced object without actually tracing it
It is used to provide homogeneous interface for tree nodes without attaching them to a particular tree.
These objects CAN BE CREATED from outside of a flow.
- claim(owner: T) None #
Claim the ownership of the tracer
- exception malevich._autoflow.tree.BadEdgeError(message: str, edge: tuple[T, T, LinkType])#
- class malevich._autoflow.tree.ExecutionTree(tree: list[tuple[T, T, LinkType]] | None = None)#
An utility class holding a low-level representation of the execution graph
The class provides a set of basic operations on the graph, such as adding edges, pruning, and traversing.
The tree is stored both as set of nodes and as a list of edges. The nodes can be any hashable and comparable class, while the edges are triples of the form (callee, caller, link) where caller and callee are nodes and link is an arbitrary object.
The connection is directed from callee to caller. This is because
caller(callee)
notation requires the callee to be evaluated first.graph LR Callee --> Caller- put_edge(callee: T, caller: T, link: LinkType = None) None #
Add an edge to the execution tree
- Parameters:
callee (T) – The source node (a.k.a. Callee)
caller (T) – The target node (a.k.a. caller)
link (LinkType, optional) – An arbitrary object that represents the edge payload (e.g. argument name). Defaults to None.
- Raises:
BadEdgeError – If the edge already exists, or if the edge is a self-edge
- prune(outer_nodes: list[T]) None #
Removes specified nodes from the execution tree
- Parameters:
outer_nodes (list[T]) – List of nodes to remove
- edges_from(node: T) None #
Returns all edges starting from the specified node
- traverse() Iterator[tuple[T, T, LinkType]] #
Traverse the execution tree
The traversal is performed in a multi-source breadth-first manner.
- Returns:
Generator of nodes
- Return type:
Generator[T]
- leaves() Iterable[T] #
Returns all leaves of the execution tree
- static connected(a: ExecutionTree, b: ExecutionTree) bool #
Checks if two execution trees are connected
- nodes() Iterable[T] #
Returns all nodes of the execution tree