"""Contains the DensityArchive."""
from __future__ import annotations
from typing import Literal
import numpy as np
from numpy.typing import ArrayLike, DTypeLike
from scipy.spatial.distance import cdist
from sklearn.neighbors import KernelDensity
from ribs._utils import arr_readonly, check_batch_shape, check_finite
from ribs.archives._archive_base import ArchiveBase
from ribs.archives._utils import parse_all_dtypes
from ribs.typing import BatchData, Float, Int
def gkern(x: np.ndarray) -> np.ndarray:
"""Gaussian kernel."""
gauss = np.exp(-0.5 * np.square(x))
return gauss / np.sqrt(2 * np.pi)
def gaussian_kde_measures(
measures: np.ndarray, buffer: np.ndarray, h: float
) -> np.ndarray:
"""Evaluates kernel density estimation with a Gaussian kernel.
The density is defined as zero if the buffer is empty.
Args:
measures: (measures_batch_size, measure_dim) array of points at which to
estimate density.
buffer: (buffer_batch_size, measure_dim) batch of measures that parameterize the
KDE.
h: Kernel bandwidth.
Returns:
Evaluation of KDE(m).
"""
if buffer.shape[0] == 0:
return np.zeros(measures.shape[0], dtype=measures.dtype)
# (measures_batch_size, buffer_batch_size)
norms = cdist(measures, buffer) / h
# (measures_batch_size,)
t = np.sum(gkern(norms), axis=1)
return t / (buffer.shape[0] * h)
# Developer Note: The documentation for this class is hacked. To list new methods,
# manually modify the template in docs/_templates/autosummary/class.rst
[docs]
class DensityArchive(ArchiveBase):
"""An archive that models the density of solutions in measure space.
This archive originates in Density Descent Search in `Lee 2024
<https://dl.acm.org/doi/10.1145/3638529.3654001>`_. It maintains a buffer of
measures, and using that buffer, it builds a density estimator such as a KDE. The
density estimator indicates which areas of measure space have, for instance, a high
density of solutions -- to improve exploration, an algorithm would need to target
areas with a low density of solutions.
Incoming solutions are added to the buffer with `reservoir sampling
<https://en.wikipedia.org/wiki/Reservoir_sampling>`_, specifically as described in
`Li 1994 <https://dl.acm.org/doi/abs/10.1145/198429.198435>`_. Reservoir sampling
enables sampling uniformly from the incoming stream of solutions generated by the
emitters.
Unlike other archives, this archive does not store any elites, and as such, most
methods from :class:`ArchiveBase` are not implemented. Rather, it is assumed that a
separate ``result_archive`` (see :class:`~ribs.schedulers.Scheduler`) will store
solutions when using this archive.
Args:
measure_dim: Dimension of the measure space.
buffer_size: Size of the buffer of measures.
density_method: Method for computing density. Currently supports ``"kde"`` (KDE
-- kernel density estimator), ``"kde_sklearn"`` (KDE using
:class:`sklearn.neighbors.KernelDensity`). Note that when ``"kde_sklearn"``
is used, this archive computes *log density*; see
:meth:`sklearn.neighbors.KernelDensity.score_samples` for more info.
bandwidth: Bandwidth when using ``kde`` or ``kde_sklearn`` as the
``density_method``.
sklearn_kwargs: kwargs for :class:`sklearn.neighbors.KernelDensity` when using
``"kde_sklearn"`` as the ``density_method``. Note that bandwidth is already
passed in via the ``bandwidth`` parameter above.
seed: Value to seed the random number generator. Set to None to avoid a fixed
seed.
measures_dtype: Data type of the measures. Defaults to float64 (numpy's default
floating point type).
dtype: Alternative for providing data type of the measures. Included for API
compatibility. Cannot be used at the same time as ``measures_dtype``.
Raises:
ValueError: Unknown ``density_method`` provided.
"""
def __init__(
self,
*,
measure_dim: Int,
buffer_size: Int = 10000,
density_method: Literal["kde", "kde_sklearn"] = "kde",
bandwidth: Float | None = None,
sklearn_kwargs: dict | None = None,
seed: Int | None = None,
measures_dtype: DTypeLike = None,
dtype: DTypeLike = None,
) -> None:
self._rng = np.random.default_rng(seed)
_, _, self._measures_dtype = parse_all_dtypes(
dtype, None, None, measures_dtype, np
)
ArchiveBase.__init__(
self,
solution_dim=0,
objective_dim=(),
measure_dim=measure_dim,
)
# Buffer for storing the measures.
self._buffer = np.empty((buffer_size, measure_dim), dtype=self._measures_dtype)
# Number of occupied entries in the buffer.
self._n_occupied = 0
# Acceptance threshold for the buffer.
self._w = np.exp(np.log(self._rng.uniform()) / buffer_size)
# Number of solutions to skip.
self._n_skip = int(np.log(self._rng.uniform()) / np.log(1 - self._w))
# Set up density estimator.
self._density_method = density_method
if self._density_method == "kde":
# Kernel density estimation
self._bandwidth = bandwidth
elif self._density_method == "kde_sklearn":
self._bandwidth = bandwidth
self._sklearn_kwargs = (
{} if sklearn_kwargs is None else sklearn_kwargs.copy()
)
else:
raise ValueError(f"Unknown density_method '{self._density_method}'")
## Properties inherited from ArchiveBase ##
# Necessary to implement this since `Scheduler` calls it.
@property
def empty(self) -> bool:
"""Whether the archive is empty; always ``False``.
Since the archive does not store elites, we always mark it as not empty.
"""
return False
## Properties that are not in ArchiveBase ##
@property
def buffer(self) -> np.ndarray:
"""Buffer of measures considered in the density estimator.
Shape (n, :attr:`measure_dim`).
"""
return arr_readonly(self._buffer[: self._n_occupied])
## Utilities ##
[docs]
def compute_density(self, measures: ArrayLike) -> np.ndarray:
"""Computes density at the given points in measure space.
Args:
measures: (batch_size, :attr:`measure_dim`) array with measure space
coordinates of all the solutions.
Returns:
``(batch_size,)`` array of density values of the input solutions.
"""
measures = np.asarray(measures, dtype=self._measures_dtype)
if self._density_method == "kde":
# Use self.buffer instead of self._buffer since self.buffer only contains
# the valid entries of the buffer.
return gaussian_kde_measures(
measures,
self.buffer,
self._bandwidth,
).astype(self._measures_dtype)
elif self._density_method == "kde_sklearn":
if self.buffer.shape[0] == 0:
return np.zeros(measures.shape[0], dtype=measures.dtype)
# Note that this is log density with some normalization too.
kde = KernelDensity(
bandwidth=self._bandwidth,
**self._sklearn_kwargs,
).fit(self.buffer)
return kde.score_samples(measures).astype(self._measures_dtype)
else:
raise ValueError(f"Unknown density_method '{self._density_method}'")
## Methods for writing to the archive ##
[docs]
def add(
self,
solution: ArrayLike | None,
objective: ArrayLike | None,
measures: ArrayLike,
**fields: ArrayLike | None,
) -> BatchData:
"""Adds measures to the buffer and updates the density estimator if necessary.
The measures are added to the buffer with reservoir sampling to enable sampling
uniformly from the incoming solutions.
Args:
solution: Included for API consistency. Any value is ignored.
objective: Included for API consistency. Any value is ignored.
measures: (batch_size, :attr:`measure_dim`) array with measure space
coordinates of all the solutions.
fields: Included for API consistency. Any value is ignored.
Returns:
Information describing the result of the add operation. The dict contains
the following keys:
- ``"status"`` (:class:`numpy.ndarray` of :class:`np.int32`): An array of
integers that represent the "status" obtained when attempting to insert
each solution in the batch. Since this archive does not store any elites,
all statuses are set to 2 (which normally indicates the solution
discovered a new cell in the archive -- see :class:`AddStatus`).
- ``"density"`` (:class:`numpy.ndarray` of the dtype passed in at init): The
density values of the measure passed in, before the buffer or density
estimator was updated. Note that when ``"kde_sklearn"`` is used as the
``density_method``, *log density* is computed; see
:meth:`sklearn.neighbors.KernelDensity.score_samples` for more info.
Raises:
ValueError: The array arguments do not match their specified shapes.
ValueError: ``measures`` has non-finite values (inf or NaN).
"""
measures = np.asarray(measures, dtype=self._measures_dtype)
check_batch_shape(measures, "measures", self.measure_dim, "measure_dim", "")
check_finite(measures, "measures")
batch_size = len(measures)
buffer_size = len(self._buffer)
add_info = {
# Make all statuses be 2 as a placeholder value.
"status": np.full(batch_size, 2, dtype=np.int32),
# Note that density should be computed _before_ updating the buffer or
# density estimator.
"density": self.compute_density(measures),
}
# Add to the buffer using reservoir sampling as in Li 1994
# (https://dl.acm.org/doi/pdf/10.1145/198429.198435).
# First, fill the buffer if there are any slots available.
n_fill = 0
if buffer_size > self._n_occupied:
n_fill = min(buffer_size - self._n_occupied, batch_size)
self._buffer[self._n_occupied : self._n_occupied + n_fill] = measures[
:n_fill
]
remaining_measures = measures[n_fill:]
self._n_occupied += n_fill
else:
remaining_measures = measures
# Replace measures in the buffer using reservoir sampling.
n_remaining = remaining_measures.shape[0]
while n_remaining > 0:
# Done with skipping, replace measures.
if self._n_skip < n_remaining:
replace = self._rng.integers(buffer_size)
self._buffer[replace] = remaining_measures[self._n_skip]
self._w *= np.exp(np.log(self._rng.uniform()) / buffer_size)
self._n_skip = int(np.log(self._rng.uniform()) / np.log(1 - self._w))
skip = min(self._n_skip, n_remaining)
n_remaining -= skip
self._n_skip -= skip
return add_info