malevich.square.utils#
- class malevich.square.utils.Context#
Context contains all the necessary information about the run and the environment in which it is executed. Also, context provides a number of auxiliary functions for interacting with the environment, such as working with shared storage (
share()
,get_share_path()
,delete_share()
), dealing with common objects (common
), access to the key-value storage (dag_key_value
), and object storage (object_storage
).Context contains all the necessary information about the run and the environment in which it is executed. Also, context provides a number of auxiliary functions for interacting with the environment, such as working with shared storage (
share()
,get_share_path()
,delete_share()
), dealing with common objects (common
), access to the key-value storage (dag_key_value
), and object storage (object_storage
).Usage#
Context object is used implicitly in apps. It may be requested by including an argument with explicit type annotation in the function signature:
from malevich.square import Context, processor @processor() def my_app(ctx: Context): pass
Example usage#
Here is some frequently used examples of context usage.
- class Context._ObjectStorage#
A storage for binary objects common to the user.
Works with cloud S3 storage and app shared file system. Provides an access to the objects shared with
Context.share()
method. and accessible withContext.get_share_path()
method.All methods of the object storage have
local
parameter. If it is set toTrue
, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage.The information might be unsynchronized between apps. To ensure that the information is synchronized, use
all_apps
parameter. If it is set toTrue
, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from.- get_keys(local: bool = False, all_apps: bool = False) List[str] #
Get keys from local mount or remote object storage.
- Parameters:
local (bool, optional) – whether to use local mount or remote object storage. If set to
True
, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.all_apps (bool, optional) – whether to synchronize the operation between all apps. If set to
True
, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from. Defaults to False.
- Returns:
Keys from local mount or remote object storage.
- Return type:
List[str]
- async async_get_keys(local: bool = False, all_apps: bool = False) List[str] #
Get keys from local mount or remote object storage.
- Parameters:
local (bool, optional) – whether to use local mount or remote object storage. If set to
True
, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.all_apps (bool, optional) – whether to synchronize the operation between all apps. If set to
True
, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from. Defaults to False.
- Returns:
Keys from local mount or remote object storage.
- Return type:
List[str]
- get(keys: List[str], force: bool = False, all_apps: bool = True) List[str] #
Updates mount for this app (or all apps), return keys for which it was successful
- Parameters:
keys (List[str]) – Keys by which values are obtained. If this is not possible, this key will not be returned in result)
force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False.
all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True.
- Returns:
Keys by which it was possible to obtain the value and load it into the mount
- Return type:
List[str]
- async async_get(keys: List[str], force: bool = False, all_apps: bool = True) List[str] #
Updates mount for this app (or all apps), return keys for which it was successful
- Parameters:
keys (List[str]) – Keys by which values are obtained. If this is not possible, this key will not be returned in result)
force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False.
all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True.
- Returns:
Keys by which it was possible to obtain the value and load it into the mount
- Return type:
List[str]
- get_all(local: bool = False, force: bool = False, all_apps: bool = True) List[str] #
Updates mount and return all keys in it, if local - return result only for mount (or all apps mounts if all_apps), otherwise - load all by remote object storage
- Parameters:
local (bool, optional) – whether to use local mount or remote object storage. If set to
True
, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False.
all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True.
- Returns:
All keys in the mount or all apps mounts if all_apps is True, otherwise load all keys from remote object storage.
- Return type:
List[str]
- async async_get_all(local: bool = False, force: bool = False, all_apps: bool = True) List[str] #
Updates mount and return all keys in it, if local - return result only for mount (or all apps mounts if all_apps), otherwise - load all by remote object storage
- Parameters:
local (bool, optional) – whether to use local mount or remote object storage. If set to
True
, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False.
all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True.
- Returns:
All keys in the mount or all apps mounts if all_apps is True, otherwise load all keys from remote object storage.
- Return type:
List[str]
- update(keys: List[str], presigned_expire: int | None = -1) Dict[str, str] #
Updates objects in remote storage
Retrieves objects from local mount and updates remote object storage. If
presigned_expire
is set to a positive value, creates and returns presigned urls for the objects.If
presigned_expire
is None, set to default timeout.If
presigned_expire
is negative, returns an empty dictionary.- Parameters:
keys (List[str]) – Keys to update
presigned_expire (int, optional) – If positve, life span of presigned urls in seconds. If None, set to default timeout. Defaults to -1.
- Returns:
Mapping of keys to presigned urls
- Return type:
Dict[str, str]
- async async_update(keys: List[str], presigned_expire: int | None = -1) Dict[str, str] #
Updates objects in remote storage
Retrieves objects from local mount and updates remote object storage. If
presigned_expire
is set to a positive value, creates and returns presigned urls for the objects.If
presigned_expire
is None, set to default timeout.If
presigned_expire
is negative, returns an empty dictionary.- Parameters:
keys (List[str]) – Keys to update
presigned_expire (int, optional) – If positve, life span of presigned urls in seconds. If None, set to default timeout. Defaults to -1.
- Returns:
key to presigned url
- Return type:
Dict[str, str]
- presigned(keys: List[str], expire: int | None = None) Dict[str, str] #
Creates presigned urls for specified keys
- Parameters:
keys (List[str]) – Keys to create presigned urls for
expire (int, optional) – Life span of presigned urls in seconds (must be positive). If None, set to default timeout. Defaults to None.
- Returns:
Mapping of keys to presigned urls
- Return type:
Dict[str, str]
- async async_presigned(keys: List[str], expire: int | None = None) Dict[str, str] #
Creates presigned urls for specified keys
- Parameters:
keys (List[str]) – Keys to create presigned urls for
expire (int, optional) – Life span of presigned urls in seconds (must be positive). If None, set to default timeout. Defaults to None.
- Returns:
Mapping of keys to presigned urls
- Return type:
Dict[str, str]
- delete(keys: List[str]) None #
Deletes values in mount and remote storage
- Parameters:
keys (List[str]) – Keys to delete
- async async_delete(keys: List[str]) None #
Deletes values in mount and remote storage
- Parameters:
keys (List[str]) – Keys to delete
- class Context._DagKeyValue(run_id: str | None = None)#
Simple key-value storage, shared for all apps of one run. Values must be bytes, string, int or float; dictionary order is not guaranteed
- get_bytes(key: str) bytes #
Gets a binary value by key more optimally.
Consider using this to retrieve binary data in more efficient way, but keep in mind that
get()
can also be used- Parameters:
key (str) – key in storage
- Returns:
Value stored by key
- Return type:
bytes
- async async_get_bytes(key: str) bytes #
Gets a binary value by key more optimally.
Consider using this to retrieve binary data in more efficient way, but keep in mind that
get()
can also be used- Parameters:
key (str) – key in storage
- Returns:
Value stored by key
- Return type:
bytes
- get(keys: List[str]) Dict[str, Any] #
Gets values by keys
Retrieves a slice of storage by keys. If a key is not found, the value will be set to None
- Parameters:
keys (list[str]) – list of keys
- Returns:
A dictionary of key-value pairs.
- Return type:
Dict[str, Any]
- async async_get(keys: List[str]) Dict[str, Any] #
Gets values by keys
Retrieves a slice of storage by keys. If a key is not found, the value will be set to None
- Parameters:
keys (list[str]) – list of keys
- Returns:
A dictionary of key-value pairs.
- Return type:
Dict[str, Any]
- get_all() Dict[str, Any] #
Gets all values
Retrieves the whole storage as a dictionary.
- Returns:
A dictionary of key-value pairs.
- Return type:
Dict[str, Any]
- async async_get_all() Dict[str, Any] #
Gets all values
Retrieves the whole storage as a dictionary.
- Returns:
A dictionary of key-value pairs.
- Return type:
Dict[str, Any]
- update(keys_values: Dict[str, Any]) None #
Sets values by keys
Accepts a dictionary of key-value pairs and sets them in storage. If a key already exists, it will be overwritten.
- Parameters:
keys_values (dict) – A dictionary of key-value pairs.
- async async_update(keys_values: Dict[str, Any]) None #
Sets values by keys
Accepts a dictionary of key-value pairs and sets them in storage. If a key already exists, it will be overwritten.
- Parameters:
keys_values (dict) – A dictionary of key-value pairs.
- clear() None #
Purges the storage
- async async_clear() None #
Purges the storage
- class malevich.square.utils.S3Helper(client: Any, s3_bucket: str)#
Ready-made auxiliary wrapper for interacting with custom s3
- get_object(key: str, bucket: str | None = None) StreamingBody | None #
Uses
get_object
from client by bucket and key- Parameters:
key (str) – object storage key
bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None.
- Returns:
get_object response body
- Return type:
Optional[StreamingBody]
- get_df(key: str, bucket: str | None = None) DataFrame #
Uses
get_object
result and cast it to data frame- Parameters:
key (str) – object storage key
bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None.
- Returns:
result in df view
- Return type:
pd.DataFrame
- save_object(body: Any, key: str, bucket: str | None = None) None #
Uses
put_object
from client by bucket, key and body- Parameters:
body (Any) – saved data
key (str) – object storage key
bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None.
- save_df(df: DataFrame, key: str, bucket: str | None = None) None #
Uses
save_object
to save df by bucket and key- Parameters:
df (pd.DataFrame) – df to save
key (str) – object storage key
bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None.
- delete_object(key: str, bucket: str | None = None) None #
Deletes object by bucket and key
- Parameters:
key (str) – object storage key
bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None.
- class malevich.square.utils.SmtpSender(login: str, password: str, smtp_server: str = 'smtp.gmail.com', smtp_port: int = 465)#
Ready-made auxiliary wrapper for interacting with SMTP :param login: login :type login: str :param password: password :type password: str :param smtp_server: smtp server. Defaults to “smtp.gmail.com”. :type smtp_server: str, optional :param smtp_port: smtp port. Defaults to 465. :type smtp_port: int, optional
- send(receivers: list[str], subject: str, message: str) None #
Sends an email
- Parameters:
receivers (list[str]) – list of emails
subject (str) – message subject
message (str) – message text
- malevich.square.utils.to_df(x: Any, force: bool = False) DataFrame #
Creates a data frame from an arbitrary object - torch.Tensor: Tensor is serialized using torch.save and then encoded using base112. Autograd information is preserved. - numpy, list, tuple, range, bytearray: Data is serialized using pickle and stored as is in data column. - set, frozenset: Data is converted to list and stored as is in data column. - dict: Data is serialized using json and stored as is in data column. - int, float, complex, str, bytes, bool: Data is stored as is in data column.
- Parameters:
x (Any) – Object to convert to data frame
force (bool, optional) – If set, it will ignore the type of the object and serialize it using pickle. Defaults to False.
- Returns:
Data frame with a single column
data
- Return type:
pd.DataFrame
- malevich.square.utils.from_df(x: DataFrame, type_name: str | None = None, force: bool = False) Any #
Converts a data frame obtained by running
to_df()
back to an object- Parameters:
x (pd.DataFrame) – Data frame to convert
type_name (Optional[str], optional) – Type of the object to convert to. If not specified, the type is inferred from the data frame. Defaults to None.
force (bool, optional) – If set, it will ignore the type of the object and deserialize it using pickle. Defaults to False.
- Returns:
Object of type
type_name
or inferred type- Return type:
Any
- malevich.square.utils.to_binary(smth: Any) bytes #
Converts object to binary :param smth: object to convert :type smth: Any
- malevich.square.utils.from_binary(smth: bytes) Any #
Converts binary to object :param smth: binary to convert :type smth: bytes
- malevich.square.utils.APP_DIR = '/julius/apps'#
Working directory from which the app is run. Equivalent to
os.getcwd()
from within the app.
- malevich.square.utils.WORKDIR = '/julius'#
Directory into which the user code is copied during app construction.