Skip to content

proxystore_ex.connectors.daos

DAOS Container connector implementation.

DAOSKey

Bases: NamedTuple

Key to object stored in a DAOS container.

Attributes:

  • pool (str) –

    DAOS pool the container belongs to.

  • container (str) –

    Name of the DAOS container the dictionary with the object is in.

  • namespace (str) –

    Name of the DAOS dictionary the object is in.

  • dict_key (str) –

    Unique key in the DAOS dictionary.

DAOSConnector

DAOSConnector(
    pool: str,
    container: str,
    namespace: str,
    clear: bool = False,
)

DAOS Container connector.

Learn more about DAOS in Python here.

Example

Assume we have a DAOS pool named "mypool." First, we create a new container in that pool named "kvstore" with type PYTHON.

$ daos cont create mypool --type=PYTHON --label=kvstore

Then we can create a connector.

from proxystore_ex.connectors.daos import DAOSConnector

with DAOSConnector('mypool', 'kvstore', 'mynamespace') as connector:
    key = connector.put('value')
    assert connector.get(key) == 'value'

Parameters:

  • pool (str) –

    DAOS pool label or UUID string.

  • container (str) –

    DAOS container label or UUID string.

  • namespace (str) –

    Name of the DAOS dictionary created in the DAOS container. All operations will be performed on this one dictionary so it can be thought of as a logically namespace separated from other applications interacting with this DAOS container.

  • clear (bool, default: False ) –

    Remove all keys from the DAOS dictionary named namespace when close() is called. This will delete keys regardless of if they were created by ProxyStore or not.

Source code in proxystore_ex/connectors/daos.py
def __init__(
    self,
    pool: str,
    container: str,
    namespace: str,
    clear: bool = False,
) -> None:
    self.pool = pool
    self.container = container
    self.namespace = namespace
    self.clear = clear

    self._container = pydaos.DCont(self.pool, self.container)
    try:
        self._dict = self._container.get(self.namespace)
    except pydaos.DObjNotFound:
        self._dict = self._container.dict(self.namespace)

close()

close(clear: bool | None = None) -> None

Close the connector and clean up.

Warning

Passing clear=True will result in ALL keys in the DAOS Dictionary being deleted regardless of if they were created by ProxyStore or not.

Parameters:

  • clear (bool | None, default: None ) –

    Remove all keys in the DAOS dictionary. Overrides the default value of clear provided when the DAOSConnector was instantiated.

Source code in proxystore_ex/connectors/daos.py
def close(self, clear: bool | None = None) -> None:
    """Close the connector and clean up.

    Warning:
        Passing `clear=True` will result in **ALL** keys in the DAOS
        Dictionary being deleted regardless of if they were created by
        ProxyStore or not.

    Args:
        clear: Remove all keys in the DAOS dictionary. Overrides the
            default value of `clear` provided when the
            [`DAOSConnector`][proxystore_ex.connectors.daos.DAOSConnector]
            was instantiated.
    """
    if self.clear if clear is None else clear:
        for key in list(self._dict):
            del self._dict[key]

config()

config() -> dict[str, Any]

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore_ex/connectors/daos.py
def config(self) -> dict[str, Any]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    return {
        'pool': self.pool,
        'container': self.container,
        'namespace': self.namespace,
        'clear': self.clear,
    }

from_config() classmethod

from_config(config: dict[str, Any]) -> DAOSConnector

Create a new connector instance from a configuration.

Parameters:

  • config (dict[str, Any]) –

    Configuration returned by .config().

Source code in proxystore_ex/connectors/daos.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> DAOSConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    return cls(**config)

evict()

evict(key: DAOSKey) -> None

Evict the object associated with the key.

Parameters:

  • key (DAOSKey) –

    Key associated with object to evict.

Source code in proxystore_ex/connectors/daos.py
def evict(self, key: DAOSKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    self._validate_key(key)
    self._dict.pop(key.dict_key)

exists()

exists(key: DAOSKey) -> bool

Check if an object associated with the key exists.

Parameters:

  • key (DAOSKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore_ex/connectors/daos.py
def exists(self, key: DAOSKey) -> bool:
    """Check if an object associated with the key exists.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    self._validate_key(key)
    return key.dict_key in self._dict

get()

get(key: DAOSKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (DAOSKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore_ex/connectors/daos.py
def get(self, key: DAOSKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    self._validate_key(key)
    try:
        return self._dict.get(key.dict_key)
    except KeyError:
        return None

get_batch()

get_batch(keys: Sequence[DAOSKey]) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

  • keys (Sequence[DAOSKey]) –

    Sequence of keys associated with objects to retrieve.

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore_ex/connectors/daos.py
def get_batch(self, keys: Sequence[DAOSKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    # Note: using DDict.bget() would be more efficient, but it will
    # error if any key is missing. So to maintain the semantics of
    # this method, we have to try each key individually.
    objs: list[bytes | None] = []
    for key in keys:
        self._validate_key(key)
        objs.append(self.get(key))
    return objs

put()

put(obj: bytes) -> DAOSKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

Returns:

  • DAOSKey

    Key which can be used to retrieve the object.

Source code in proxystore_ex/connectors/daos.py
def put(self, obj: bytes) -> DAOSKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.

    Returns:
        Key which can be used to retrieve the object.
    """
    key = DAOSKey(
        pool=self.pool,
        container=self.container,
        namespace=self.namespace,
        dict_key=str(uuid.uuid4()),
    )
    self._dict.put(key.dict_key, obj)
    return key

put_batch()

put_batch(objs: Sequence[bytes]) -> list[DAOSKey]

Put a batch of serialized objects in the store.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

Returns:

  • list[DAOSKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Source code in proxystore_ex/connectors/daos.py
def put_batch(self, objs: Sequence[bytes]) -> list[DAOSKey]:
    """Put a batch of serialized objects in the store.

    Args:
        objs: Sequence of serialized objects to put in the store.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.
    """
    keys = [
        DAOSKey(
            pool=self.pool,
            container=self.container,
            namespace=self.namespace,
            dict_key=str(uuid.uuid4()),
        )
        for _ in objs
    ]
    self._dict.bput({key.dict_key: obj for key, obj in zip(keys, objs)})
    return keys