Execution#
Flow captures the semantic of the execution tree and can be used as a standalone function. As any function, it can be run with input data and produce the output. Collections and assets serve as flow inputs, while the output of last processors is the output of the flow.
Here, collections (Collections #1 and #2) and assets (Asset #1) are the inputs of the flow. Processors #1 and #2 transform the input data, while Processor #3 and Processor #4 are the final processors that produce the output of the flow. In Python, it would look like this:
from malevich import collection, asset
def my_flow(collection_1: collection, collection_2: collection, asset_1: asset):
# ... Flow logic here ...
return processor_3_output, processor_4_output
Overriding Data#
When defining a flow, you specify the initial data in collections
from malevich import collection, flow, table
@flow
def my_flow():
data = collection(
'name_of_my_collection',
df=table(...),
alias='my_data'
)
# ... Flow logic here ...
But on run, you can supply new data to the flow. It is done using the alias
of the collection. The process
of supplying new data is called overriding.
from malevich import Space, table
interpreted_on_space = Space(my_flow)
interpreted_on_space.prepare()
interpreted_on_space.run(overrides={'my_data': table(...)})
If you install the flow, you may supply new data using the keyword argument of the flow stub:
from malevich.flows import my_flow
my_flow(my_data=table(...))
If no data is supplied, the flow will run with the default data defined in the flow.
Warning
The alias of each component in the flow should be unique.
Configuration Extension#
In addition to the data, you may supply configuration extensions to the processors in the flow. It is also done by aliases.
from malevich.utility import rename, add_column # installed processors
from malevich import Core, collection, flow, table
@flow
def my_flow():
data = collection(
'marketplace_cards',
df=table(...),
alias='cards'
)
title = locs(data, column='Card Title')
description = locs(data, column='Card Description')
return (
rename(title, config={"Card Title": "title"}, alias='to_title'),
rename(description, config={"Card Description": "description", alias='to_description'}
)
interpreted_on_core = Core(my_flow)
interpreted_on_core.prepare()
interpreted_on_core.run(config_extensions={'to_title': {'column': 'Title'}})
In this example, we supply a new configuration to the to_title
processor. The configuration is a dictionary
that will be merged with the default configuration of the processor.
The extension does not replace the default configuration, but extends it. Assume the following logic behind it:
default_config = {"column": "Card Title"}
new_config = {"column": "Title"}
final_config = {**default_config, **new_config}
# final_config = {"column": "Title"}