malevich.square.utils#
- class malevich.square.utils.Context#
- Context contains all the necessary information about the run and the environment in which it is executed. Also, context provides a number of auxiliary functions for interacting with the environment, such as working with shared storage ( - share(),- get_share_path(),- delete_share()), dealing with common objects (- common), access to the key-value storage (- dag_key_value), and object storage (- object_storage).- Context contains all the necessary information about the run and the environment in which it is executed. Also, context provides a number of auxiliary functions for interacting with the environment, such as working with shared storage ( - share(),- get_share_path(),- delete_share()), dealing with common objects (- common), access to the key-value storage (- dag_key_value), and object storage (- object_storage).- Usage#- Context object is used implicitly in apps. It may be requested by including an argument with explicit type annotation in the function signature: - from malevich.square import Context, processor @processor() def my_app(ctx: Context): pass - Example usage#- Here is some frequently used examples of context usage. 
- class Context._ObjectStorage#
- A storage for binary objects common to the user. - Works with cloud S3 storage and app shared file system. Provides an access to the objects shared with - Context.share()method. and accessible with- Context.get_share_path()method.- All methods of the object storage have - localparameter. If it is set to- True, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage.- The information might be unsynchronized between apps. To ensure that the information is synchronized, use - all_appsparameter. If it is set to- True, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from.- get_keys(local: bool = False, all_apps: bool = False) List[str]#
- Get keys from local mount or remote object storage. - Parameters:
- local (bool, optional) – whether to use local mount or remote object storage. If set to - True, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.
- all_apps (bool, optional) – whether to synchronize the operation between all apps. If set to - True, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from. Defaults to False.
 
- Returns:
- Keys from local mount or remote object storage. 
- Return type:
- List[str] 
 
 - async async_get_keys(local: bool = False, all_apps: bool = False) List[str]#
- Get keys from local mount or remote object storage. - Parameters:
- local (bool, optional) – whether to use local mount or remote object storage. If set to - True, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.
- all_apps (bool, optional) – whether to synchronize the operation between all apps. If set to - True, the operation will be performed on all apps. Otherwise, the operation will be performed only on apps the method is called from. Defaults to False.
 
- Returns:
- Keys from local mount or remote object storage. 
- Return type:
- List[str] 
 
 - get(keys: List[str], force: bool = False, all_apps: bool = True) List[str]#
- Updates mount for this app (or all apps), return keys for which it was successful - Parameters:
- keys (List[str]) – Keys by which values are obtained. If this is not possible, this key will not be returned in result) 
- force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False. 
- all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True. 
 
- Returns:
- Keys by which it was possible to obtain the value and load it into the mount 
- Return type:
- List[str] 
 
 - async async_get(keys: List[str], force: bool = False, all_apps: bool = True) List[str]#
- Updates mount for this app (or all apps), return keys for which it was successful - Parameters:
- keys (List[str]) – Keys by which values are obtained. If this is not possible, this key will not be returned in result) 
- force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False. 
- all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True. 
 
- Returns:
- Keys by which it was possible to obtain the value and load it into the mount 
- Return type:
- List[str] 
 
 - get_all(local: bool = False, force: bool = False, all_apps: bool = True) List[str]#
- Updates mount and return all keys in it, if local - return result only for mount (or all apps mounts if all_apps), otherwise - load all by remote object storage - Parameters:
- local (bool, optional) – whether to use local mount or remote object storage. If set to - True, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.
- force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False. 
- all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True. 
 
- Returns:
- All keys in the mount or all apps mounts if all_apps is True, otherwise load all keys from remote object storage. 
- Return type:
- List[str] 
 
 - async async_get_all(local: bool = False, force: bool = False, all_apps: bool = True) List[str]#
- Updates mount and return all keys in it, if local - return result only for mount (or all apps mounts if all_apps), otherwise - load all by remote object storage - Parameters:
- local (bool, optional) – whether to use local mount or remote object storage. If set to - True, the operation will be performed on the local file system. Otherwise, the operation will be performed on the remote object storage. Defaults to False.
- force (bool, optional) – If set, it will ignore what is locally and load data from the remote object storage. Otherwise it will only take what does not exist. Defaults to False. 
- all_apps (bool, optional) – If set to true, the operation will be performed in all apps. Otherwise only for apps with associated mount. Defaults to True. 
 
- Returns:
- All keys in the mount or all apps mounts if all_apps is True, otherwise load all keys from remote object storage. 
- Return type:
- List[str] 
 
 - update(keys: List[str], presigned_expire: int | None = -1) Dict[str, str]#
- Updates objects in remote storage - Retrieves objects from local mount and updates remote object storage. If - presigned_expireis set to a positive value, creates and returns presigned urls for the objects.- If - presigned_expireis None, set to default timeout.- If - presigned_expireis negative, returns an empty dictionary.- Parameters:
- keys (List[str]) – Keys to update 
- presigned_expire (int, optional) – If positve, life span of presigned urls in seconds. If None, set to default timeout. Defaults to -1. 
 
- Returns:
- Mapping of keys to presigned urls 
- Return type:
- Dict[str, str] 
 
 - async async_update(keys: List[str], presigned_expire: int | None = -1) Dict[str, str]#
- Updates objects in remote storage - Retrieves objects from local mount and updates remote object storage. If - presigned_expireis set to a positive value, creates and returns presigned urls for the objects.- If - presigned_expireis None, set to default timeout.- If - presigned_expireis negative, returns an empty dictionary.- Parameters:
- keys (List[str]) – Keys to update 
- presigned_expire (int, optional) – If positve, life span of presigned urls in seconds. If None, set to default timeout. Defaults to -1. 
 
- Returns:
- key to presigned url 
- Return type:
- Dict[str, str] 
 
 - presigned(keys: List[str], expire: int | None = None) Dict[str, str]#
- Creates presigned urls for specified keys - Parameters:
- keys (List[str]) – Keys to create presigned urls for 
- expire (int, optional) – Life span of presigned urls in seconds (must be positive). If None, set to default timeout. Defaults to None. 
 
- Returns:
- Mapping of keys to presigned urls 
- Return type:
- Dict[str, str] 
 
 - async async_presigned(keys: List[str], expire: int | None = None) Dict[str, str]#
- Creates presigned urls for specified keys - Parameters:
- keys (List[str]) – Keys to create presigned urls for 
- expire (int, optional) – Life span of presigned urls in seconds (must be positive). If None, set to default timeout. Defaults to None. 
 
- Returns:
- Mapping of keys to presigned urls 
- Return type:
- Dict[str, str] 
 
 - delete(keys: List[str]) None#
- Deletes values in mount and remote storage - Parameters:
- keys (List[str]) – Keys to delete 
 
 - async async_delete(keys: List[str]) None#
- Deletes values in mount and remote storage - Parameters:
- keys (List[str]) – Keys to delete 
 
 
- class Context._DagKeyValue(run_id: str | None = None)#
- Simple key-value storage, shared for all apps of one run. Values must be bytes, string, int or float; dictionary order is not guaranteed - get_bytes(key: str) bytes#
- Gets a binary value by key more optimally. - Consider using this to retrieve binary data in more efficient way, but keep in mind that - get()can also be used- Parameters:
- key (str) – key in storage 
- Returns:
- Value stored by key 
- Return type:
- bytes 
 
 - async async_get_bytes(key: str) bytes#
- Gets a binary value by key more optimally. - Consider using this to retrieve binary data in more efficient way, but keep in mind that - get()can also be used- Parameters:
- key (str) – key in storage 
- Returns:
- Value stored by key 
- Return type:
- bytes 
 
 - get(keys: List[str]) Dict[str, Any]#
- Gets values by keys - Retrieves a slice of storage by keys. If a key is not found, the value will be set to None - Parameters:
- keys (list[str]) – list of keys 
- Returns:
- A dictionary of key-value pairs. 
- Return type:
- Dict[str, Any] 
 
 - async async_get(keys: List[str]) Dict[str, Any]#
- Gets values by keys - Retrieves a slice of storage by keys. If a key is not found, the value will be set to None - Parameters:
- keys (list[str]) – list of keys 
- Returns:
- A dictionary of key-value pairs. 
- Return type:
- Dict[str, Any] 
 
 - get_all() Dict[str, Any]#
- Gets all values - Retrieves the whole storage as a dictionary. - Returns:
- A dictionary of key-value pairs. 
- Return type:
- Dict[str, Any] 
 
 - async async_get_all() Dict[str, Any]#
- Gets all values - Retrieves the whole storage as a dictionary. - Returns:
- A dictionary of key-value pairs. 
- Return type:
- Dict[str, Any] 
 
 - update(keys_values: Dict[str, Any]) None#
- Sets values by keys - Accepts a dictionary of key-value pairs and sets them in storage. If a key already exists, it will be overwritten. - Parameters:
- keys_values (dict) – A dictionary of key-value pairs. 
 
 - async async_update(keys_values: Dict[str, Any]) None#
- Sets values by keys - Accepts a dictionary of key-value pairs and sets them in storage. If a key already exists, it will be overwritten. - Parameters:
- keys_values (dict) – A dictionary of key-value pairs. 
 
 - clear() None#
- Purges the storage 
 - async async_clear() None#
- Purges the storage 
 
- class malevich.square.utils.S3Helper(client: Any, s3_bucket: str)#
- Ready-made auxiliary wrapper for interacting with custom s3 - get_object(key: str, bucket: str | None = None) StreamingBody | None#
- Uses - get_objectfrom client by bucket and key- Parameters:
- key (str) – object storage key 
- bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None. 
 
- Returns:
- get_object response body 
- Return type:
- Optional[StreamingBody] 
 
 - get_df(key: str, bucket: str | None = None) DataFrame#
- Uses - get_objectresult and cast it to data frame- Parameters:
- key (str) – object storage key 
- bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None. 
 
- Returns:
- result in df view 
- Return type:
- pd.DataFrame 
 
 - save_object(body: Any, key: str, bucket: str | None = None) None#
- Uses - put_objectfrom client by bucket, key and body- Parameters:
- body (Any) – saved data 
- key (str) – object storage key 
- bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None. 
 
 
 - save_df(df: DataFrame, key: str, bucket: str | None = None) None#
- Uses - save_objectto save df by bucket and key- Parameters:
- df (pd.DataFrame) – df to save 
- key (str) – object storage key 
- bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None. 
 
 
 - delete_object(key: str, bucket: str | None = None) None#
- Deletes object by bucket and key - Parameters:
- key (str) – object storage key 
- bucket (Optional[str], optional) – object storage bucket, if set - override default. Defaults to None. 
 
 
 
- class malevich.square.utils.SmtpSender(login: str, password: str, smtp_server: str = 'smtp.gmail.com', smtp_port: int = 465)#
- Ready-made auxiliary wrapper for interacting with SMTP :param login: login :type login: str :param password: password :type password: str :param smtp_server: smtp server. Defaults to “smtp.gmail.com”. :type smtp_server: str, optional :param smtp_port: smtp port. Defaults to 465. :type smtp_port: int, optional - send(receivers: list[str], subject: str, message: str) None#
- Sends an email - Parameters:
- receivers (list[str]) – list of emails 
- subject (str) – message subject 
- message (str) – message text 
 
 
 
- malevich.square.utils.to_df(x: Any, force: bool = False) DataFrame#
- Creates a data frame from an arbitrary object - torch.Tensor: Tensor is serialized using torch.save and then encoded using base112. Autograd information is preserved. - numpy, list, tuple, range, bytearray: Data is serialized using pickle and stored as is in data column. - set, frozenset: Data is converted to list and stored as is in data column. - dict: Data is serialized using json and stored as is in data column. - int, float, complex, str, bytes, bool: Data is stored as is in data column. - Parameters:
- x (Any) – Object to convert to data frame 
- force (bool, optional) – If set, it will ignore the type of the object and serialize it using pickle. Defaults to False. 
 
- Returns:
- Data frame with a single column - data
- Return type:
- pd.DataFrame 
 
- malevich.square.utils.from_df(x: DataFrame, type_name: str | None = None, force: bool = False) Any#
- Converts a data frame obtained by running - to_df()back to an object- Parameters:
- x (pd.DataFrame) – Data frame to convert 
- type_name (Optional[str], optional) – Type of the object to convert to. If not specified, the type is inferred from the data frame. Defaults to None. 
- force (bool, optional) – If set, it will ignore the type of the object and deserialize it using pickle. Defaults to False. 
 
- Returns:
- Object of type - type_nameor inferred type
- Return type:
- Any 
 
- malevich.square.utils.to_binary(smth: Any) bytes#
- Converts object to binary :param smth: object to convert :type smth: Any 
- malevich.square.utils.from_binary(smth: bytes) Any#
- Converts binary to object :param smth: binary to convert :type smth: bytes 
- malevich.square.utils.APP_DIR = '/julius/apps'#
- Working directory from which the app is run. Equivalent to - os.getcwd()from within the app.
- malevich.square.utils.WORKDIR = '/julius'#
- Directory into which the user code is copied during app construction.