Source code for ribs.archives._density_archive

"""Contains the DensityArchive."""

from __future__ import annotations

from typing import Literal

import numpy as np
from numpy.typing import ArrayLike, DTypeLike
from scipy.spatial.distance import cdist
from sklearn.neighbors import KernelDensity

from ribs._utils import arr_readonly, check_batch_shape, check_finite
from ribs.archives._archive_base import ArchiveBase
from ribs.archives._utils import parse_all_dtypes
from ribs.typing import BatchData, Float, Int

try:
    import torch
    import zuko

    IS_ZUKO_AVAILABLE = True
except ImportError:
    IS_ZUKO_AVAILABLE = False

ZUKO_IMPORT_ERROR = (
    'When density_method="cnf" in DensityArchive, PyTorch and Zuko must be installed.'
)


def gkern(x: np.ndarray) -> np.ndarray:
    """Gaussian kernel."""
    gauss = np.exp(-0.5 * np.square(x))
    return gauss / np.sqrt(2 * np.pi)


def gaussian_kde_measures(
    measures: np.ndarray, buffer: np.ndarray, h: float
) -> np.ndarray:
    """Evaluates kernel density estimation with a Gaussian kernel.

    The density is defined as zero if the buffer is empty.

    Args:
        measures: (measures_batch_size, measure_dim) array of points at which to
            estimate density.
        buffer: (buffer_batch_size, measure_dim) batch of measures that parameterize the
            KDE.
        h: Kernel bandwidth.

    Returns:
        Evaluation of KDE(m).
    """
    if buffer.shape[0] == 0:
        return np.zeros(measures.shape[0], dtype=measures.dtype)

    # (measures_batch_size, buffer_batch_size)
    norms = cdist(measures, buffer) / h

    # (measures_batch_size,)
    t = np.sum(gkern(norms), axis=1)

    return t / (buffer.shape[0] * h)


class _CNFDensityEstimator:
    """Continuous normalizing flow density estimator for DDS-CNF.

    This is a small, self-contained helper that owns a :class:`zuko.flows.CNF` and an
    :class:`torch.optim.Adam` optimizer, and knows how to incrementally fine-tune the
    flow on a feature buffer via maximum likelihood. It is intentionally not exposed in
    the public API -- construct one implicitly by passing ``density_method="cnf"`` to
    :class:`DensityArchive`.

    The flow is built lazily on first training so that importing this module does not
    require torch or zuko to be installed.

    Args:
        measure_dim: Dimensionality of the feature space that the CNF models.
        lr: Learning rate for the Adam optimizer used during fine-tuning.
        train_steps: Number of gradient steps taken every time the estimator is asked to
            refit on a new buffer.
        batch_size: Mini-batch size used for stochastic gradient steps. If the buffer is
            smaller than ``batch_size``, the entire buffer is used as the batch.
        min_buffer_size: Minimum number of points in the buffer before the flow is
            trained at all. Calls before this threshold is reached are no-ops and
            density queries return zeros.
        device: Torch device on which the CNF lives. Strings like ``"cpu"``, ``"cuda"``,
            or an instance of :class:`torch.device` are accepted.
        seed: Seed for the torch random number generator controlling the CNF's weight
            initialization and any stochastic training steps.
        cnf_kwargs: Additional keyword arguments forwarded to :class:`zuko.flows.CNF`.
            ``features`` is set automatically from ``measure_dim`` and cannot be
            overridden. Defaults are chosen to match the small-MLP architecture used in
            the DDS paper experiments (``hidden_features=(64, 64)``).
    """

    def __init__(
        self,
        *,
        measure_dim: int,
        lr: float,
        train_steps: int,
        batch_size: int,
        min_buffer_size: int,
        device: torch.device,
        seed: int | None,
        cnf_kwargs: dict | None,
    ) -> None:
        if not IS_ZUKO_AVAILABLE:
            raise ImportError(ZUKO_IMPORT_ERROR)

        self._measure_dim = int(measure_dim)
        self._lr = float(lr)
        self._train_steps = int(train_steps)
        self._batch_size = int(batch_size)
        self._min_buffer_size = int(min_buffer_size)
        self._device = (
            torch.device(device) if not isinstance(device, torch.device) else device
        )

        # Disallow passing features via cnf_kwargs since it is determined by the
        # archive's measure_dim and would silently produce a shape mismatch.
        user_kwargs = dict(cnf_kwargs or {})
        if "features" in user_kwargs:
            raise ValueError(
                "`features` cannot be provided in cnf_kwargs; it is set "
                "automatically from measure_dim."
            )
        # Paper does not specify a precise CNF architecture. We pick a small
        # MLP that works well across the sphere/arm benchmarks in the DDS
        # experiments and let users override via cnf_kwargs.
        defaults = {"hidden_features": (64, 64)}
        defaults.update(user_kwargs)
        self._cnf_kwargs = defaults

        # zuko seeds its weight init from torch's global generator. Drop a
        # local generator here so that reproducible runs in pyribs don't leak
        # into the global torch state.
        self._generator = torch.Generator(device="cpu")
        if seed is not None:
            self._generator.manual_seed(int(seed))

        # Seed weight init from our local generator to avoid polluting the
        # global torch RNG state used by the rest of the user's program.
        state = torch.random.get_rng_state()
        torch.random.manual_seed(int(self._generator.initial_seed()))
        self._flow = zuko.flows.CNF(features=self._measure_dim, **self._cnf_kwargs).to(
            self._device
        )
        torch.random.set_rng_state(state)
        self._optimizer = torch.optim.Adam(self._flow.parameters(), lr=self._lr)
        self._fitted: bool = False

    @property
    def fitted(self) -> bool:
        """Whether the flow has been trained on any data yet."""
        return self._fitted

    def fit(self, buffer: np.ndarray) -> None:
        """Fine-tune the flow on the current feature buffer.

        This takes ``train_steps`` Adam steps, each on a random mini-batch of
        ``batch_size`` rows drawn (with replacement) from ``buffer``. If the
        buffer is smaller than ``min_buffer_size``, the call is a no-op --
        the flow remains untrained and :meth:`log_density` returns zeros.

        Args:
            buffer: ``(n, measure_dim)`` array of features to fit on.
        """
        if buffer.shape[0] < self._min_buffer_size:
            return

        data = torch.asarray(
            buffer, copy=True, dtype=torch.float32, device=self._device
        )
        n = data.shape[0]
        effective_batch = min(self._batch_size, n)

        self._flow.train()
        for _ in range(self._train_steps):
            # Sample with replacement so that very small buffers still train.
            idx = torch.randint(0, n, (effective_batch,), generator=self._generator).to(
                self._device
            )
            batch = data[idx]
            loss = -self._flow().log_prob(batch).mean()
            self._optimizer.zero_grad()
            loss.backward()
            self._optimizer.step()

        self._fitted = True

    def log_density(self, measures: np.ndarray) -> np.ndarray:
        """Evaluate log-density of the flow at a batch of measure-space points.

        If the flow has never been trained, returns zeros.

        Args:
            measures: ``(batch_size, measure_dim)`` array.

        Returns:
            ``(batch_size,)`` array of log-density values in float64.
        """
        if not self._fitted:
            return np.zeros(measures.shape[0], dtype=np.float64)

        data = torch.as_tensor(measures, dtype=torch.float32, device=self._device)
        self._flow.eval()
        with torch.no_grad():
            log_prob = self._flow().log_prob(data)
        return log_prob.detach().cpu().numpy().astype(np.float64)


# Developer Note: The documentation for this class is hacked. To list new methods,
# manually modify the template in docs/_templates/autosummary/class.rst


[docs] class DensityArchive(ArchiveBase): """An archive that models the density of solutions in measure space. This archive originates in Density Descent Search in `Lee 2024 <https://dl.acm.org/doi/10.1145/3638529.3654001>`_. It maintains a buffer of measures, and using that buffer, it builds a density estimator such as a `KDE <https://en.wikipedia.org/wiki/Kernel_density_estimation>`_. The density estimator indicates which areas of measure space have, for instance, a high density of solutions -- to improve exploration, an algorithm would need to target areas with a low density of solutions. Incoming solutions are added to the buffer with `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_, specifically as described in `Li 1994 <https://dl.acm.org/doi/abs/10.1145/198429.198435>`_. Reservoir sampling enables sampling uniformly from the incoming stream of solutions generated by the emitters. Unlike other archives, this archive does not store any elites, and as such, most methods from :class:`ArchiveBase` are not implemented. Rather, it is assumed that a separate ``result_archive`` (see :class:`~ribs.schedulers.Scheduler`) will store solutions when using this archive. .. note:: When ``density_method="cnf"``, this class requires `PyTorch <https://pytorch.org/>`_ and `Zuko <https://zuko.readthedocs.io/>`_ to be installed, e.g., by running ``pip install torch zuko>=1.0.0``. .. note:: For DDS-CNF, the default ``cnf`` parameters, particularly ``cnf_kwargs``, ``cnf_train_steps``, and ``cnf_batch_size``, are tuned to provide reasonable performance on the Sphere example (:doc:`/examples/sphere`). Further tuning may be needed for new domains. Additionally, these parameters are intended to reduce the computational cost of DDS-CNF when compared to the original parameters in `Lee 2024 <https://dl.acm.org/doi/10.1145/3638529.3654001>`_. See :pr:`707` for more info. Args: measure_dim: Dimension of the measure space. buffer_size: Size of the buffer of measures. density_method: Method for computing density. Supports ``"kde"`` (KDE -- kernel density estimator), ``"kde_sklearn"`` (KDE using :class:`sklearn.neighbors.KernelDensity`), and ``"cnf"`` (continuous normalizing flow, i.e., DDS-CNF from `Lee 2024 <https://arxiv.org/abs/2312.11331>`_). When ``"kde_sklearn"`` is used, this archive computes *log density* rather than density; see :meth:`sklearn.neighbors.KernelDensity.score_samples`. When ``"cnf"`` is used, this archive also returns *log density* since that is what the flow models directly. ``"cnf"`` requires installing torch and zuko. bandwidth: Bandwidth when using ``kde`` or ``kde_sklearn`` as the ``density_method``. sklearn_kwargs: kwargs for :class:`sklearn.neighbors.KernelDensity` when using ``"kde_sklearn"`` as the ``density_method``. Note that bandwidth is already passed in via the ``bandwidth`` parameter above. cnf_kwargs: Additional keyword arguments forwarded to :class:`zuko.flows.continuous.CNF` when ``density_method="cnf"``. ``features`` is set automatically from ``measure_dim`` and cannot be overridden. Defaults to ``{"hidden_features": (64, 64)}``. cnf_lr: Adam learning rate used to fine-tune the CNF during each call to :meth:`add` when ``density_method="cnf"``. cnf_train_steps: Number of Adam steps taken every time the CNF is fine-tuned on the buffer. cnf_batch_size: Mini-batch size used when fine-tuning the CNF. If the buffer has fewer points, the entire buffer is used. cnf_min_buffer_size: Minimum number of points in the buffer before the CNF is trained. Before this threshold, the flow stays untrained and density queries return zeros. cnf_device: Torch device on which the CNF lives when ``density_method="cnf"``. seed: Value to seed the random number generator. Set to None to avoid a fixed seed. measures_dtype: Data type of the measures. Defaults to float64 (numpy's default floating point type). dtype: Alternative for providing data type of the measures. Included for API compatibility. Cannot be used at the same time as ``measures_dtype``. Raises: ValueError: Unknown ``density_method`` provided. ImportError: ``density_method="cnf"`` is requested but torch or zuko is not installed. """ def __init__( self, *, measure_dim: Int, buffer_size: Int = 10000, density_method: Literal["kde", "kde_sklearn", "cnf"] = "kde", bandwidth: Float | None = None, sklearn_kwargs: dict | None = None, cnf_kwargs: dict | None = None, cnf_lr: Float = 1e-3, cnf_train_steps: Int = 5, cnf_batch_size: Int = 32, cnf_min_buffer_size: Int = 128, cnf_device: torch.device = "cpu", seed: Int | None = None, measures_dtype: DTypeLike = None, dtype: DTypeLike = None, ) -> None: self._rng = np.random.default_rng(seed) _, _, self._measures_dtype = parse_all_dtypes( dtype, None, None, measures_dtype, np ) ArchiveBase.__init__( self, solution_dim=0, objective_dim=(), measure_dim=measure_dim, ) # Buffer for storing the measures. self._buffer = np.empty((buffer_size, measure_dim), dtype=self._measures_dtype) # Number of occupied entries in the buffer. self._n_occupied = 0 # Acceptance threshold for the buffer. self._w = np.exp(np.log(self._rng.uniform()) / buffer_size) # Number of solutions to skip. self._n_skip = int(np.log(self._rng.uniform()) / np.log(1 - self._w)) # Set up density estimator. self._density_method = density_method self._cnf_estimator: _CNFDensityEstimator | None = None if self._density_method == "kde": # Kernel density estimation. self._bandwidth = bandwidth elif self._density_method == "kde_sklearn": self._bandwidth = bandwidth self._sklearn_kwargs = ( {} if sklearn_kwargs is None else sklearn_kwargs.copy() ) elif self._density_method == "cnf": # Continuous normalizing flow density estimator (DDS-CNF, Lee et al. 2024). # The estimator begins untrained, and the flow is fitted at the end of each # add() call once the buffer has at least `cnf_min_buffer_size` points. if not IS_ZUKO_AVAILABLE: raise ImportError(ZUKO_IMPORT_ERROR) self._cnf_estimator = _CNFDensityEstimator( measure_dim=self.measure_dim, lr=float(cnf_lr), train_steps=int(cnf_train_steps), batch_size=int(cnf_batch_size), min_buffer_size=int(cnf_min_buffer_size), device=cnf_device, seed=seed, cnf_kwargs=cnf_kwargs, ) else: raise ValueError(f"Unknown density_method '{self._density_method}'") ## Properties inherited from ArchiveBase ## # Necessary to implement this since `Scheduler` calls it. @property def empty(self) -> bool: """Whether the archive is empty; always ``False``. Since the archive does not store elites, we always mark it as not empty. """ return False ## Properties that are not in ArchiveBase ## @property def buffer(self) -> np.ndarray: """Buffer of measures considered in the density estimator. Shape (n, :attr:`measure_dim`). """ return arr_readonly(self._buffer[: self._n_occupied]) ## Utilities ##
[docs] def compute_density(self, measures: ArrayLike) -> np.ndarray: """Computes density at the given points in measure space. Args: measures: (batch_size, :attr:`measure_dim`) array with measure space coordinates of all the solutions. Returns: ``(batch_size,)`` array of density values of the input solutions. """ measures = np.asarray(measures, dtype=self._measures_dtype) if self._density_method == "kde": # Use self.buffer instead of self._buffer since self.buffer only contains # the valid entries of the buffer. return gaussian_kde_measures( measures, self.buffer, self._bandwidth, ).astype(self._measures_dtype) elif self._density_method == "kde_sklearn": if self.buffer.shape[0] == 0: return np.zeros(measures.shape[0], dtype=measures.dtype) # Note that this is log density with some normalization too. kde = KernelDensity( bandwidth=self._bandwidth, **self._sklearn_kwargs, ).fit(self.buffer) return kde.score_samples(measures).astype(self._measures_dtype) elif self._density_method == "cnf": # The CNF returns log-density (matching kde_sklearn's semantics). Before the # estimator has been fit, i.e., during the first add() call or before the # buffer crosses `cnf_min_buffer_size`, this returns zeros, mirroring how # KDE returns zeros on an empty buffer. if self._cnf_estimator is None: raise RuntimeError( 'cnf_estimator should not be None when density_method="cnf"' ) return self._cnf_estimator.log_density(measures).astype( self._measures_dtype ) else: raise ValueError(f"Unknown density_method '{self._density_method}'")
## Methods for writing to the archive ##
[docs] def add( self, solution: ArrayLike | None, objective: ArrayLike | None, measures: ArrayLike, **fields: ArrayLike | None, ) -> BatchData: """Adds measures to the buffer and updates the density estimator if necessary. The measures are added to the buffer with reservoir sampling to enable sampling uniformly from the incoming solutions. Args: solution: Included for API consistency. Any value is ignored. objective: Included for API consistency. Any value is ignored. measures: (batch_size, :attr:`measure_dim`) array with measure space coordinates of all the solutions. fields: Included for API consistency. Any value is ignored. Returns: Information describing the result of the add operation. The dict contains the following keys: - ``"status"`` (:class:`numpy.ndarray` of :class:`np.int32`): An array of integers that represent the "status" obtained when attempting to insert each solution in the batch. Since this archive does not store any elites, all statuses are set to 2 (which normally indicates the solution discovered a new cell in the archive -- see :class:`AddStatus`). - ``"density"`` (:class:`numpy.ndarray` of the dtype passed in at init): The density values of the measure passed in, before the buffer or density estimator was updated. Note that when ``"kde_sklearn"`` or ``"cnf"`` is used as the ``density_method``, *log density* is computed rather than density. For more info, see :meth:`sklearn.neighbors.KernelDensity.score_samples` for ``"kde_sklearn"`` and the class-level docstring for ``"cnf"``. Raises: ValueError: The array arguments do not match their specified shapes. ValueError: ``measures`` has non-finite values (inf or NaN). """ measures = np.asarray(measures, dtype=self._measures_dtype) check_batch_shape(measures, "measures", self.measure_dim, "measure_dim", "") check_finite(measures, "measures") batch_size = len(measures) buffer_size = len(self._buffer) add_info = { # Make all statuses be 2 as a placeholder value. "status": np.full(batch_size, 2, dtype=np.int32), # Note that density should be computed _before_ updating the buffer or # density estimator. "density": self.compute_density(measures), } # Add to the buffer using reservoir sampling as in Li 1994 # (https://dl.acm.org/doi/pdf/10.1145/198429.198435). # First, fill the buffer if there are any slots available. n_fill = 0 if buffer_size > self._n_occupied: n_fill = min(buffer_size - self._n_occupied, batch_size) self._buffer[self._n_occupied : self._n_occupied + n_fill] = measures[ :n_fill ] remaining_measures = measures[n_fill:] self._n_occupied += n_fill else: remaining_measures = measures # Replace measures in the buffer using reservoir sampling. n_remaining = remaining_measures.shape[0] while n_remaining > 0: # Done with skipping, replace measures. if self._n_skip < n_remaining: replace = self._rng.integers(buffer_size) self._buffer[replace] = remaining_measures[self._n_skip] self._w *= np.exp(np.log(self._rng.uniform()) / buffer_size) self._n_skip = int(np.log(self._rng.uniform()) / np.log(1 - self._w)) skip = min(self._n_skip, n_remaining) n_remaining -= skip self._n_skip -= skip # For DDS-CNF, Algorithm 1 line 12 of Lee et al. 2024 calls for the density # estimator to be refit on the updated buffer at the end of every iteration. We # fine-tune the CNF here so that the next compute_density() call uses an # up-to-date flow. KDE methods do not need this step because they are # non-parametric. if self._density_method == "cnf": if self._cnf_estimator is None: raise RuntimeError( 'cnf_estimator should not be None when density_method="cnf"' ) self._cnf_estimator.fit(self.buffer) return add_info