Lifecycle#
Once you define a flow, it can be in any of the following states:
Defined: The initial state of a flow just after you defined a function decorated with
@flow
.Interpreted: When flow is attached to any of the platform and ready to be converted to an actual task on it.
Deployed (Prepared). When flow has successfully been converted to a task on the platform and ready to accept and proceed with the input data.
Stopped (Suspended). When underlying task is stopped and cannot be executed anymore.
Also, once task is deployed, it can be run multiple times with different input data.
Definition#
To define a flow, you shold import a meta
decorator and use it on an ordinary function:
from malevich import flow
@flow
def my_flow():
pass
task = my_flow()
After it, calling my_flow
will return a malevich.models.PromisedTask
instance. You can
attach the flow to any of the platforms and run it there.
Interpretation#
Interpretation is a process of attaching a flow to a specific platform you
wish to run it on. The flow is automatically interpreted when passed to a
deploy assistant such as Core
or Space
.
from malevich import Core, Space
@flow
def my_flow():
# Your flow logic here...
interpreted_on_core = Core(my_flow)
# or Core(my_flow(...)) if my_flow accepts args
interpreted_on_space = Space(my_flow)
Here, interpreted_on_core
and interpreted_on_space
are instances of interpreted tasks. They
preserve the flow logic and inner state specific to the platform they are attached to. Running prepare()
method
will create an instance of a the task on a platform. A ready-to-run task can be invoked with run()
method and
stopped with stop()
method. The full list of methods and their signatures is described in CoreTask
and SpaceTask
classes.
Deployment#
We refer to deployment as a process of acquiring resources on the platform and
preparing the task to be run. To deploy a task, you should call prepare()
interpreted_on_core = Core(my_flow)
interpreted_on_core.prepare() # Creates a task on the platform
Once the task is deployed, you may run run()
, stop()
.
Running#
Each task can be considered as a standalone function that can be run on the platform. To run a task, you should call
run()
method on it. The method can accept input data for collections, assets and configurations.
.. TODO: write about configuration overrides
from malevich import table
interpreted_on_core.run() # Run with default input data defined in the flow
interpreted_on_core.run(overrides={'my_data': table(...)}) # Run with new data in `my_data` collection
interpreted_on_core.run(config_extensions={'preprocess': {...}}) # Run with configuration extensions
# .run method returns run_id for future reference
run_id = interpreted_on_core.run(
overrides={'my_data': table(...)},
config_extensions={'preprocess': {...}}
)
# Also, you may run the task with a run_id
interpreted_on_core.run(run_id=run_id)
Signature and logic of run()
method is different for different interpreters. Check the documentation for
specific interpreters for more information.
Stopping#
It is important to release resources on the platform when you don’t need them anymore. To stop a task, you should call
stop()
method on it.
task.prepare() # Prepare the task
task.run() # Do the job
task.stop() # Release resources
Warning
Tasks that are not stopped manually and not active for a certain period are subjects to be suspended automatically.
Results#
After the task is run, you may want to get the results of its execution. To do so, you should call results()
method
on it. The flow fetches results for objects you have returned in flow
decorated function. Results are a list
of malevich.models.results.base.BaseResult
instances. See Results section to learn more about how to work
with results.
interpreted_on_core.run()
# list of CoreResult instances
results = interpreted_on_core.results()
interpreted_on_space.run()
# list of SpaceResult instances
results = interpreted_on_space.results()