Integration#

A flow can be easily integrated into your application. To do this, you should install the flow using malevich install <flow_reverse_id> command. The command will generate a stub for the flow in the malevich.flows module. The stub can be imported and used as a regular function.

Let’s assume you developed a flow for inferencing a vision transformer model. The flow is defined as follows:

from malevich import collection, flow, table
from malevich.vit import prepare_images, run, last_hidden_states

@flow
def vision_transformer():
    data = collection(
        'images',
        df=table(...),
        alias='images'
    )

    # Converts image links to tensors
    images = prepare_images(data)
    # Run the inference obtaining the results
    hidden_states = run(images)
    # Extracts a tensor of the last hidden states
    return last_hidden_states(hidden_states)

To install the flow, you should upload it to the platform:

from malevich import Space

Space(vision_transformer).upload()

And then install it:

malevich install vision_transformer

After the flow is installed, you can use it in your application:

from malevich.flows import vision_transformer

results = vision_transformer(images=table(...))

If no data is supplied, the flow will run with the default data defined in the flow. You may run the flow asynchronously:

run_id = vision_transformer(images=table(...), wait_for_results=False)

You can also specify the version and branch of the flow to run:

results = vision_transformer(images=table(...), version='1.0.0', branch='dev')

Also, you may run the particular deployment:

results = vision_transformer(images=table(...), deployment_id='...', wait_for_results=True)

# or asynchronously
run_id = vision_transformer(images=table(...), deployment_id='...', wait_for_results=False)

If you want to have more control over the task, you can specify get_task=True and obtain SpaceTask instance:

task = vision_transformer(images=table(...), get_task=True)
task.run(...) # Run the task with all available options
results = task.results() # Wait for the results