"""Contains the CVTArchive."""
import numbers
import numpy as np
from numpy_groupies import aggregate_nb as aggregate
from scipy.spatial import cKDTree # pylint: disable=no-name-in-module
from scipy.stats.qmc import Halton, Sobol
from sklearn.cluster import k_means
from ribs._utils import (check_batch_shape, check_finite, check_shape,
validate_batch, validate_single)
from ribs.archives._archive_base import ArchiveBase
from ribs.archives._archive_stats import ArchiveStats
from ribs.archives._array_store import ArrayStore
from ribs.archives._utils import (fill_sentinel_values, parse_dtype,
validate_cma_mae_settings)
[docs]class CVTArchive(ArchiveBase):
# pylint: disable = too-many-public-methods
"""An archive that tessellates the measure space with centroids.
This archive originates in `Vassiliades 2018
<https://ieeexplore.ieee.org/document/8000667>`_. It uses Centroidal Voronoi
Tessellation (CVT) to divide an n-dimensional measure space into k cells.
The CVT is created by sampling points uniformly from the n-dimensional
measure space and using k-means clustering to identify k centroids. When
items are inserted into the archive, we identify their cell by identifying
the closest centroid in measure space (using Euclidean distance). For
k-means clustering, we use :func:`sklearn.cluster.k_means`.
By default, finding the closest centroid is done in roughly
O(log(number of cells)) time using :class:`scipy.spatial.cKDTree`. To switch
to brute force, which takes O(number of cells) time, pass
``use_kd_tree=False``.
To compare the performance of using the k-D tree vs brute force, we ran
benchmarks where we inserted 1k batches of 100 solutions into a 2D archive
with varying numbers of cells. We took the minimum over 5 runs for each data
point, as recommended in the docs for :meth:`timeit.Timer.repeat`. Note the
logarithmic scales. This plot was generated on a reasonably modern laptop.
.. image:: ../_static/imgs/cvt_add_plot.png
:alt: Runtime to insert 100k entries into CVTArchive
Across almost all numbers of cells, using the k-D tree is faster than using
brute force. Thus, **we recommend always using the k-D tree.** See
`benchmarks/cvt_add.py
<https://github.com/icaros-usc/pyribs/tree/master/benchmarks/cvt_add.py>`_
in the project repo for more information about how this plot was generated.
Finally, if running multiple experiments, it may be beneficial to use the
same centroids across each experiment. Doing so can keep experiments
consistent and reduce execution time. To do this, either (1) construct
custom centroids and pass them in via the ``custom_centroids`` argument, or
(2) access the centroids created in the first archive with :attr:`centroids`
and pass them into ``custom_centroids`` when constructing archives for
subsequent experiments.
.. note:: The idea of archive thresholds was introduced in `Fontaine 2023
<https://arxiv.org/abs/2205.10752>`_. For more info on thresholds,
including the ``learning_rate`` and ``threshold_min`` parameters, refer
to our tutorial :doc:`/tutorials/cma_mae`.
.. note:: For more information on our choice of k-D tree implementation, see
:pr:`38`.
Args:
solution_dim (int or tuple of int): Dimensionality of the solution
space. Scalar or multi-dimensional solution shapes are allowed by
passing an empty tuple or tuple of integers, respectively.
cells (int): The number of cells to use in the archive, equivalent to
the number of centroids/areas in the CVT.
ranges (array-like of (float, float)): Upper and lower bound of each
dimension of the measure space, e.g. ``[(-1, 1), (-2, 2)]``
indicates the first dimension should have bounds :math:`[-1,1]`
(inclusive), and the second dimension should have bounds
:math:`[-2,2]` (inclusive). ``ranges`` should be the same length as
``dims``.
learning_rate (float): The learning rate for threshold updates. Defaults
to 1.0.
threshold_min (float): The initial threshold value for all the cells.
qd_score_offset (float): Archives often contain negative objective
values, and if the QD score were to be computed with these negative
objectives, the algorithm would be penalized for adding new cells
with negative objectives. Thus, a standard practice is to normalize
all the objectives so that they are non-negative by introducing an
offset. This QD score offset will be *subtracted* from all
objectives in the archive, e.g., if your objectives go as low as
-300, pass in -300 so that each objective will be transformed as
``objective - (-300)``.
seed (int): Value to seed the random number generator as well as
:func:`~sklearn.cluster.k_means`. Set to None to avoid a fixed seed.
dtype (str or data-type or dict): Data type of the solutions,
objectives, and measures. This can be ``"f"`` / ``np.float32``,
``"d"`` / ``np.float64``, or a dict specifying separate dtypes, of
the form ``{"solution": <dtype>, "objective": <dtype>, "measures":
<dtype>}``.
extra_fields (dict): Description of extra fields of data that is stored
next to elite data like solutions and objectives. The description is
a dict mapping from a field name (str) to a tuple of ``(shape,
dtype)``. For instance, ``{"foo": ((), np.float32), "bar": ((10,),
np.float32)}`` will create a "foo" field that contains scalar values
and a "bar" field that contains 10D values. Note that field names
must be valid Python identifiers, and names already used in the
archive are not allowed.
custom_centroids (array-like): If passed in, this (cells, measure_dim)
array will be used as the centroids of the CVT instead of generating
new ones. In this case, ``samples`` will be ignored, and
``archive.samples`` will be None. This can be useful when one wishes
to use the same CVT across experiments for fair comparison.
centroid_method (str): Pass in the following methods for
generating centroids: "random", "sobol", "scrambled_sobol",
"halton". Default method is "kmeans". These methods are derived from
Mouret 2023: https://dl.acm.org/doi/pdf/10.1145/3583133.3590726.
Note: Samples are only used when method is "kmeans".
samples (int or array-like): If it is an int, this specifies the number
of samples to generate when creating the CVT. Otherwise, this must
be a (num_samples, measure_dim) array where samples[i] is a sample
to use when creating the CVT. It can be useful to pass in custom
samples when there are restrictions on what samples in the measure
space are (physically) possible.
k_means_kwargs (dict): kwargs for :func:`~sklearn.cluster.k_means`. By
default, we pass in `n_init=1`, `init="random"`,
`algorithm="lloyd"`, and `random_state=seed`.
use_kd_tree (bool): If True, use a k-D tree for finding the closest
centroid when inserting into the archive. If False, brute force will
be used instead.
ckdtree_kwargs (dict): kwargs for :class:`~scipy.spatial.cKDTree`. By
default, we do not pass in any kwargs.
chunk_size (int): If passed, brute forcing the closest centroid search
will chunk the distance calculations to compute chunk_size inputs at
a time.
Raises:
ValueError: Invalid values for learning_rate and threshold_min.
ValueError: Invalid names in extra_fields.
ValueError: The ``samples`` array or the ``custom_centroids`` array has
the wrong shape.
"""
def __init__(
self,
*,
solution_dim,
cells,
ranges,
learning_rate=None,
threshold_min=-np.inf,
qd_score_offset=0.0,
seed=None,
dtype=np.float64,
extra_fields=None,
custom_centroids=None,
centroid_method="kmeans",
samples=100_000,
k_means_kwargs=None,
use_kd_tree=True,
ckdtree_kwargs=None,
chunk_size=None,
):
self._rng = np.random.default_rng(seed)
ArchiveBase.__init__(
self,
solution_dim=solution_dim,
objective_dim=(),
measure_dim=len(ranges),
)
# Set up the ArrayStore, which is a data structure that stores all the
# elites' data in arrays sharing a common index.
extra_fields = extra_fields or {}
reserved_fields = {
"solution", "objective", "measures", "threshold", "index"
}
if reserved_fields & extra_fields.keys():
raise ValueError("The following names are not allowed in "
f"extra_fields: {reserved_fields}")
dtype = parse_dtype(dtype)
self._store = ArrayStore(
field_desc={
"solution": (self.solution_dim, dtype["solution"]),
"objective": ((), dtype["objective"]),
"measures": (self.measure_dim, dtype["measures"]),
# Must be same dtype as the objective since they share
# calculations.
"threshold": ((), dtype["objective"]),
**extra_fields,
},
capacity=cells,
)
# Set up constant properties.
ranges = list(zip(*ranges))
self._lower_bounds = np.array(ranges[0], dtype=self.dtypes["measures"])
self._upper_bounds = np.array(ranges[1], dtype=self.dtypes["measures"])
self._interval_size = self._upper_bounds - self._lower_bounds
self._learning_rate, self._threshold_min = validate_cma_mae_settings(
learning_rate, threshold_min, self.dtypes["threshold"])
self._qd_score_offset = self.dtypes["objective"](qd_score_offset)
# Set up statistics -- objective_sum is the sum of all objective values
# in the archive; it is useful for computing qd_score and obj_mean.
self._best_elite = None
self._objective_sum = None
self._stats = None
self._stats_reset()
# Apply default args for k-means. Users can easily override these,
# particularly if they want higher quality clusters.
self._k_means_kwargs = ({} if k_means_kwargs is None else
k_means_kwargs.copy())
self._k_means_kwargs.setdefault(
# Only run one iter to be fast.
"n_init",
1)
self._k_means_kwargs.setdefault(
# The default "k-means++" takes very long to init.
"init",
"random")
self._k_means_kwargs.setdefault("algorithm", "lloyd")
self._k_means_kwargs.setdefault("random_state", seed)
if custom_centroids is None:
self._samples = None
if centroid_method == "kmeans":
if not isinstance(samples, numbers.Integral):
# Validate shape of custom samples.
samples = np.asarray(samples, dtype=self.dtypes["measures"])
if samples.shape[1] != self._measure_dim:
raise ValueError(
f"Samples has shape {samples.shape} but must be of "
f"shape (n_samples, len(ranges)="
f"{self._measure_dim})")
self._samples = samples
else:
self._samples = self._rng.uniform(
self._lower_bounds,
self._upper_bounds,
size=(samples, self._measure_dim),
).astype(self.dtypes["measures"])
self._centroids = k_means(self._samples, self.cells,
**self._k_means_kwargs)[0]
if self._centroids.shape[0] < self.cells:
raise RuntimeError(
"While generating the CVT, k-means clustering found "
f"{self._centroids.shape[0]} centroids, but this "
f"archive needs {self.cells} cells. This most "
"likely happened because there are too few samples "
"and/or too many cells.")
elif centroid_method == "random":
# Generates random centroids.
self._centroids = self._rng.uniform(self._lower_bounds,
self._upper_bounds,
size=(self.cells,
self._measure_dim))
elif centroid_method == "sobol":
# Generates centroids as a Sobol sequence.
sampler = Sobol(d=self._measure_dim, scramble=False)
sobol_nums = sampler.random(n=self.cells)
self._centroids = (self._lower_bounds + sobol_nums *
(self._upper_bounds - self._lower_bounds))
elif centroid_method == "scrambled_sobol":
# Generates centroids as a scrambled Sobol sequence.
sampler = Sobol(d=self._measure_dim, scramble=True)
sobol_nums = sampler.random(n=self.cells)
self._centroids = (self._lower_bounds + sobol_nums *
(self._upper_bounds - self._lower_bounds))
elif centroid_method == "halton":
# Generates centroids with a Halton sequence.
sampler = Halton(d=self._measure_dim)
halton_nums = sampler.random(n=self.cells)
self._centroids = (self._lower_bounds + halton_nums *
(self._upper_bounds - self._lower_bounds))
else:
# Validate shape of `custom_centroids` when they are provided.
custom_centroids = np.asarray(custom_centroids,
dtype=self.dtypes["measures"])
if custom_centroids.shape != (cells, self._measure_dim):
raise ValueError(
f"custom_centroids has shape {custom_centroids.shape} but "
f"must be of shape (cells={cells}, len(ranges)="
f"{self._measure_dim})")
self._centroids = custom_centroids
self._samples = None
self._use_kd_tree = use_kd_tree
self._centroid_kd_tree = None
self._ckdtree_kwargs = ({} if ckdtree_kwargs is None else
ckdtree_kwargs.copy())
self._chunk_size = chunk_size
if self._use_kd_tree:
self._centroid_kd_tree = cKDTree(self._centroids,
**self._ckdtree_kwargs)
## Properties inherited from ArchiveBase ##
@property
def field_list(self):
return self._store.field_list_with_index
@property
def dtypes(self):
return self._store.dtypes_with_index
@property
def stats(self):
return self._stats
@property
def empty(self):
return len(self._store) == 0
## Properties that are not in ArchiveBase ##
## Roughly ordered by the parameter list in the constructor. ##
@property
def best_elite(self):
"""dict: The elite with the highest objective in the archive.
None if there are no elites in the archive.
.. note::
If the archive is non-elitist (this occurs when using the archive
with a learning rate which is not 1.0, as in CMA-MAE), then this
best elite may no longer exist in the archive because it was
replaced with an elite with a lower objective value. This can happen
because in non-elitist archives, new solutions only need to exceed
the *threshold* of the cell they are being inserted into, not the
*objective* of the elite currently in the cell. See :pr:`314` for
more info.
.. note::
The best elite will contain a "threshold" key. This threshold is the
threshold of the best elite's cell after the best elite was inserted
into the archive.
"""
return self._best_elite
@property
def cells(self):
"""int: Total number of cells in the archive."""
return self._store.capacity
@property
def lower_bounds(self):
"""(measure_dim,) numpy.ndarray: Lower bound of each dimension."""
return self._lower_bounds
@property
def upper_bounds(self):
"""(measure_dim,) numpy.ndarray: Upper bound of each dimension."""
return self._upper_bounds
@property
def interval_size(self):
"""(measure_dim,) numpy.ndarray: The size of each dim (upper_bounds -
lower_bounds)."""
return self._interval_size
@property
def learning_rate(self):
"""float: The learning rate for threshold updates."""
return self._learning_rate
@property
def threshold_min(self):
"""float: The initial threshold value for all the cells."""
return self._threshold_min
@property
def qd_score_offset(self):
"""float: The offset which is subtracted from objective values when
computing the QD score."""
return self._qd_score_offset
@property
def centroids(self):
"""(n_centroids, measure_dim) numpy.ndarray: The centroids used in the
CVT.
"""
return self._centroids
@property
def samples(self):
"""(num_samples, measure_dim) numpy.ndarray: The samples used in
creating the CVT.
Will be None if custom centroids were passed in to the archive.
"""
return self._samples
## dunder methods ##
def __len__(self):
return len(self._store)
def __iter__(self):
return iter(self._store)
## Utilities ##
def _stats_reset(self):
"""Resets the archive stats."""
self._best_elite = None
self._objective_sum = self.dtypes["objective"](0.0)
self._stats = ArchiveStats(
num_elites=0,
coverage=self.dtypes["objective"](0.0),
qd_score=self.dtypes["objective"](0.0),
norm_qd_score=self.dtypes["objective"](0.0),
obj_max=None,
obj_mean=None,
)
def _stats_update(self, new_objective_sum, new_best_index):
"""Updates statistics based on a new sum of objective values
(new_objective_sum) and the index of a potential new best elite
(new_best_index)."""
_, new_best_elite = self._store.retrieve([new_best_index])
new_best_elite = {k: v[0] for k, v in new_best_elite.items()}
if (self._stats.obj_max is None or
new_best_elite["objective"] > self._stats.obj_max):
self._best_elite = new_best_elite
new_obj_max = new_best_elite["objective"]
else:
new_obj_max = self._stats.obj_max
self._objective_sum = new_objective_sum
new_qd_score = (
self._objective_sum -
self.dtypes["objective"](len(self)) * self._qd_score_offset)
self._stats = ArchiveStats(
num_elites=len(self),
coverage=self.dtypes["objective"](len(self) / self.cells),
qd_score=new_qd_score,
norm_qd_score=self.dtypes["objective"](new_qd_score / self.cells),
obj_max=new_obj_max,
obj_mean=self.dtypes["objective"](self._objective_sum / len(self)),
)
[docs] def index_of(self, measures):
"""Finds the indices of the centroid closest to the given coordinates in
measure space.
If ``index_batch`` is the batch of indices returned by this method, then
``archive.centroids[index_batch[i]]`` holds the coordinates of the
centroid closest to ``measures[i]``. See :attr:`centroids` for more
info.
The centroid indices are located using either the k-D tree or brute
force, depending on the value of ``use_kd_tree`` in the constructor.
Args:
measures (array-like): (batch_size, :attr:`measure_dim`) array of
coordinates in measure space.
Returns:
numpy.ndarray: (batch_size,) array of centroid indices
corresponding to each measure space coordinate.
Raises:
ValueError: ``measures`` is not of shape (batch_size,
:attr:`measure_dim`).
ValueError: ``measures`` has non-finite values (inf or NaN).
"""
measures = np.asarray(measures)
check_batch_shape(measures, "measures", self.measure_dim, "measure_dim")
check_finite(measures, "measures")
if self._use_kd_tree:
_, indices = self._centroid_kd_tree.query(measures)
return indices.astype(np.int32)
else:
expanded_measures = np.expand_dims(measures, axis=1)
# Compute indices chunks at a time
if self._chunk_size is not None and \
self._chunk_size < measures.shape[0]:
indices = []
chunks = np.array_split(
expanded_measures,
np.ceil(len(expanded_measures) / self._chunk_size))
for chunk in chunks:
distances = chunk - self.centroids
distances = np.sum(np.square(distances), axis=2)
current_res = np.argmin(distances, axis=1).astype(np.int32)
indices.append(current_res)
return np.concatenate(tuple(indices))
else:
# Brute force distance calculation -- start by taking the
# difference between each measure i and all the centroids.
distances = expanded_measures - self.centroids
# Compute the total squared distance -- no need to compute
# actual distance with a sqrt.
distances = np.sum(np.square(distances), axis=2)
return np.argmin(distances, axis=1).astype(np.int32)
[docs] def index_of_single(self, measures):
"""Returns the index of the measures for one solution.
See :meth:`index_of`.
Args:
measures (array-like): (:attr:`measure_dim`,) array of measures for
a single solution.
Returns:
int or numpy.integer: Integer index of the measures in the archive's
storage arrays.
Raises:
ValueError: ``measures`` is not of shape (:attr:`measure_dim`,).
ValueError: ``measures`` has non-finite values (inf or NaN).
"""
measures = np.asarray(measures)
check_shape(measures, "measures", self.measure_dim, "measure_dim")
check_finite(measures, "measures")
return self.index_of(measures[None])[0]
## Methods for writing to the archive ##
@staticmethod
def _compute_thresholds(indices, objective, cur_threshold, learning_rate,
dtype):
"""Computes new thresholds with the CMA-MAE batch threshold update rule.
If entries in `indices` are duplicated, they receive the same threshold.
"""
if len(indices) == 0:
return np.array([], dtype=dtype)
# Compute the number of objectives inserted into each cell. Note that we
# index with `indices` to place the counts at all relevant indices. For
# instance, if we had an array [1,2,3,1,5], we would end up with
# [2,1,1,2,1] (there are 2 1's, 1 2, 1 3, 2 1's, and 1 5).
#
# All objective_sizes should be > 0 since we only retrieve counts for
# indices in `indices`.
objective_sizes = aggregate(indices, 1, func="len",
fill_value=0)[indices]
# Compute the sum of the objectives inserted into each cell -- again, we
# index with `indices`.
objective_sums = aggregate(indices,
objective,
func="sum",
fill_value=np.nan)[indices]
# Update the threshold with the batch update rule from Fontaine 2023
# (https://arxiv.org/abs/2205.10752).
#
# Unlike in single_entry_with_threshold, we do not need to worry about
# cur_threshold having -np.inf here as a result of threshold_min being
# -np.inf. This is because the case with threshold_min = -np.inf is
# handled separately since we compute the new threshold based on the max
# objective in each cell in that case.
ratio = dtype(1.0 - learning_rate)**objective_sizes
new_threshold = (ratio * cur_threshold +
(objective_sums / objective_sizes) * (1 - ratio))
return new_threshold
[docs] def add(self, solution, objective, measures, **fields):
"""Inserts a batch of solutions into the archive.
Each solution is only inserted if it has a higher ``objective`` than the
threshold of the corresponding cell. For the default values of
``learning_rate`` and ``threshold_min``, this threshold is simply the
objective value of the elite previously in the cell. If multiple
solutions in the batch end up in the same cell, we only insert the
solution with the highest objective. If multiple solutions end up in the
same cell and tie for the highest objective, we insert the solution that
appears first in the batch.
For the default values of ``learning_rate`` and ``threshold_min``, the
threshold for each cell is updated by taking the maximum objective value
among all the solutions that landed in the cell, resulting in the same
behavior as in the vanilla MAP-Elites archive. However, for other
settings, the threshold is updated with the batch update rule described
in the appendix of `Fontaine 2023 <https://arxiv.org/abs/2205.10752>`_.
.. note:: The indices of all arguments should "correspond" to each
other, i.e., ``solution[i]``, ``objective[i]``, and ``measures[i]``
should be the solution parameters, objective, and measures for
solution ``i``.
Args:
solution (array-like): (batch_size, :attr:`solution_dim`) array of
solution parameters.
objective (array-like): (batch_size,) array with objective function
evaluations of the solutions.
measures (array-like): (batch_size, :attr:`measure_dim`) array with
measure space coordinates of all the solutions.
fields (keyword arguments): Additional data for each solution. Each
argument should be an array with batch_size as the first
dimension.
Returns:
dict: Information describing the result of the add operation. The
dict contains the following keys:
- ``"status"`` (:class:`numpy.ndarray` of :class:`numpy.int32`): An
array of integers that represent the "status" obtained when
attempting to insert each solution in the batch. Each item has the
following possible values:
- ``0``: The solution was not added to the archive.
- ``1``: The solution improved the objective value of a cell
which was already in the archive.
- ``2``: The solution discovered a new cell in the archive.
All statuses (and values, below) are computed with respect to the
*current* archive. For example, if two solutions both introduce
the same new archive cell, then both will be marked with ``2``.
The alternative is to depend on the order of the solutions in the
batch -- for example, if we have two solutions ``a`` and ``b``
that introduce the same new cell in the archive, ``a`` could be
inserted first with status ``2``, and ``b`` could be inserted
second with status ``1`` because it improves upon ``a``. However,
our implementation does **not** do this.
To convert statuses to a more semantic format, cast all statuses
to :class:`AddStatus`, e.g., with ``[AddStatus(s) for s in
add_info["status"]]``.
- ``"value"`` (:class:`numpy.ndarray` of
:attr:`dtypes` ["objective"]): An array with values for each
solution in the batch. With the default values of ``learning_rate
= 1.0`` and ``threshold_min = -np.inf``, the meaning of each value
depends on the corresponding ``status`` and is identical to that
in CMA-ME (`Fontaine 2020 <https://arxiv.org/abs/1912.02400>`_):
- ``0`` (not added): The value is the "negative improvement,"
i.e., the objective of the solution passed in minus the
objective of the elite still in the archive (this value is
negative because the solution did not have a high enough
objective to be added to the archive).
- ``1`` (improve existing cell): The value is the "improvement,"
i.e., the objective of the solution passed in minus the
objective of the elite previously in the archive.
- ``2`` (new cell): The value is just the objective of the
solution.
In contrast, for other values of ``learning_rate`` and
``threshold_min``, each value is equivalent to the objective value
of the solution minus the threshold of its corresponding cell in
the archive.
Raises:
ValueError: The array arguments do not match their specified shapes.
ValueError: ``objective`` or ``measures`` has non-finite values (inf
or NaN).
"""
data = validate_batch(
self,
{
"solution": solution,
"objective": objective,
"measures": measures,
**fields,
},
)
# Delete these so that we only use the clean, validated data in `data`.
del solution, objective, measures, fields
# Information to return about the addition.
add_info = {}
# Retrieve indices of the archive cells.
indices = self.index_of(data["measures"])
batch_size = len(indices)
# Retrieve current data and thresholds. Unoccupied cells default to
# threshold_min.
cur_occupied, cur_data = self._store.retrieve(indices)
cur_threshold = cur_data["threshold"]
cur_threshold[~cur_occupied] = self.threshold_min
# Compute status -- arrays below are all boolean arrays of length
# batch_size.
#
# When we want CMA-ME behavior, the threshold defaults to -inf for new
# cells, which satisfies the condition for can_insert.
can_insert = data["objective"] > cur_threshold
is_new = can_insert & ~cur_occupied
improve_existing = can_insert & cur_occupied
add_info["status"] = np.zeros(batch_size, dtype=np.int32)
add_info["status"][is_new] = 2
add_info["status"][improve_existing] = 1
# If threshold_min is -inf, then we want CMA-ME behavior, which computes
# the improvement value of new solutions w.r.t zero. Otherwise, we
# compute improvement with respect to threshold_min.
cur_threshold[is_new] = (self.dtypes["threshold"](0.0)
if self.threshold_min == -np.inf else
self.threshold_min)
add_info["value"] = data["objective"] - cur_threshold
# Return early if we cannot insert anything -- continuing throws a
# ValueError in aggregate() since index[can_insert] would be empty.
if not np.any(can_insert):
return add_info
# Select all solutions that _can_ be inserted -- at this point, there
# are still conflicts in the insertions, e.g., multiple solutions can
# map to index 0.
indices = indices[can_insert]
data = {name: arr[can_insert] for name, arr in data.items()}
cur_threshold = cur_threshold[can_insert]
# Compute the new threshold associated with each entry.
if self.threshold_min == -np.inf:
# Regular archive behavior: thresholds are just the objectives.
new_threshold = data["objective"]
else:
# Batch threshold update described in Fontaine 2023
# (https://arxiv.org/abs/2205.10752). This computation is based on
# the mean objective of all solutions in the batch that could have
# been inserted into each cell.
new_threshold = self._compute_thresholds(indices, data["objective"],
cur_threshold,
self.learning_rate,
self.dtypes["threshold"])
# Retrieve indices of solutions that _should_ be inserted into the
# archive. Currently, multiple solutions may be inserted at each archive
# index, but we only want to insert the maximum among these solutions.
# Thus, we obtain the argmax for each archive index.
#
# We use a fill_value of -1 to indicate archive indices that were not
# covered in the batch. Note that the length of archive_argmax is only
# max(indices), rather than the total number of grid cells. However,
# this is okay because we only need the indices of the solutions, which
# we store in should_insert.
#
# aggregate() always chooses the first item if there are ties, so the
# first elite will be inserted if there is a tie. See their default
# numpy implementation for more info:
# https://github.com/ml31415/numpy-groupies/blob/master/numpy_groupies/aggregate_numpy.py#L107
archive_argmax = aggregate(indices,
data["objective"],
func="argmax",
fill_value=-1)
should_insert = archive_argmax[archive_argmax != -1]
# Select only solutions that will be inserted into the archive.
indices = indices[should_insert]
data = {name: arr[should_insert] for name, arr in data.items()}
data["threshold"] = new_threshold[should_insert]
# Insert elites into the store.
self._store.add(indices, data)
# Compute statistics.
cur_objective = cur_data["objective"]
cur_objective[~cur_occupied] = 0.0
cur_objective = cur_objective[can_insert][should_insert]
objective_sum = (self._objective_sum +
np.sum(data["objective"] - cur_objective))
best_index = indices[np.argmax(data["objective"])]
self._stats_update(objective_sum, best_index)
return add_info
[docs] def add_single(self, solution, objective, measures, **fields):
"""Inserts a single solution into the archive.
The solution is only inserted if it has a higher ``objective`` than the
threshold of the corresponding cell. For the default values of
``learning_rate`` and ``threshold_min``, this threshold is simply the
objective value of the elite previously in the cell. The threshold is
also updated if the solution was inserted.
.. note::
This method is provided as an easier-to-understand implementation
that has less performance due to inserting only one solution at a
time. For better performance, see :meth:`add`.
Args:
solution (array-like): Parameters of the solution.
objective (float): Objective function evaluation of the solution.
measures (array-like): Coordinates in measure space of the solution.
fields (keyword arguments): Additional data for the solution.
Returns:
dict: Information describing the result of the add operation. The
dict contains ``status`` and ``value`` keys; refer to :meth:`add`
for the meaning of status and value.
Raises:
ValueError: The array arguments do not match their specified shapes.
ValueError: ``objective`` is non-finite (inf or NaN) or ``measures``
has non-finite values.
"""
data = validate_single(
self,
{
"solution": solution,
"objective": objective,
"measures": measures,
**fields,
},
)
# Delete these so that we only use the clean, validated data in `data`.
del solution, objective, measures, fields
# Information to return about the addition.
add_info = {}
# Identify the archive cell.
index = self.index_of_single(data["measures"])
# Retrieve current data of the cell.
cur_occupied, cur_data = self._store.retrieve([index])
cur_occupied = cur_occupied[0]
if cur_occupied:
# If the cell is currently occupied, the threshold comes from the
# current data of the elite in the cell.
cur_threshold = cur_data["threshold"][0]
else:
# If the cell is not currently occupied, the threshold needs special
# settings.
#
# If threshold_min is -inf, then we want CMA-ME behavior, which
# computes the improvement value with a threshold of zero for new
# solutions. Otherwise, we will set cur_threshold to threshold_min.
cur_threshold = (self.dtypes["threshold"](0.0) if self.threshold_min
== -np.inf else self.threshold_min)
# Retrieve candidate objective.
objective = data["objective"]
# Compute status and threshold.
add_info["status"] = np.int32(0) # NOT_ADDED
# Now we check whether a solution should be added to the archive. We use
# the addition rule from MAP-Elites (Fig. 2 of Mouret 2015
# https://arxiv.org/pdf/1504.04909.pdf), with modifications for CMA-MAE.
# This checks if a new solution is discovered in the archive. Note that
# regular MAP-Elites only checks `not cur_occupied`. CMA-MAE has an
# additional `threshold_min` that the objective must exceed for new
# cells. If CMA-MAE is not being used, then `threshold_min` is -np.inf,
# making this check identical to that of MAP-Elites.
is_new = not cur_occupied and self.threshold_min < objective
# This checks whether the solution improves an existing cell in the
# archive, i.e., whether it performs better than the current elite in
# this cell. Vanilla MAP-Elites compares to the objective of the cell's
# current elite. CMA-MAE compares to a threshold value that updates
# over time (i.e., cur_threshold). When learning_rate is set to 1.0 (the
# default value), we recover the same rule as in MAP-Elites because
# cur_threshold is equivalent to the objective of the solution in the
# cell.
improve_existing = cur_occupied and cur_threshold < objective
if is_new or improve_existing:
if improve_existing:
add_info["status"] = np.int32(1) # IMPROVE_EXISTING
else:
add_info["status"] = np.int32(2) # NEW
# This calculation works in the case where threshold_min is -inf
# because cur_threshold will be set to 0.0 instead.
data["threshold"] = [(cur_threshold * (1.0 - self.learning_rate) +
objective * self.learning_rate)]
# Insert elite into the store.
self._store.add(index[None], {
name: np.expand_dims(arr, axis=0) for name, arr in data.items()
})
# Update stats.
cur_objective = (cur_data["objective"][0]
if cur_occupied else self.dtypes["objective"](0.0))
self._stats_update(self._objective_sum + objective - cur_objective,
index)
# Value is the improvement over the current threshold (can be negative).
add_info["value"] = objective - cur_threshold
return add_info
[docs] def clear(self):
"""Removes all elites in the archive."""
self._store.clear()
self._stats_reset()
## Methods for reading from the archive ##
## Refer to ArchiveBase for documentation of these methods. ##
[docs] def retrieve(self, measures):
measures = np.asarray(measures)
check_batch_shape(measures, "measures", self.measure_dim, "measure_dim")
check_finite(measures, "measures")
occupied, data = self._store.retrieve(self.index_of(measures))
fill_sentinel_values(occupied, data)
return occupied, data
[docs] def retrieve_single(self, measures):
measures = np.asarray(measures)
check_shape(measures, "measures", self.measure_dim, "measure_dim")
check_finite(measures, "measures")
occupied, data = self.retrieve(measures[None])
return occupied[0], {field: arr[0] for field, arr in data.items()}
[docs] def data(self, fields=None, return_type="dict"):
return self._store.data(fields, return_type)
[docs] def sample_elites(self, n):
if self.empty:
raise IndexError("No elements in archive.")
random_indices = self._rng.integers(len(self._store), size=n)
selected_indices = self._store.occupied_list[random_indices]
_, elites = self._store.retrieve(selected_indices)
return elites