Source code for ribs.archives._proximity_archive

"""Contains the ProximityArchive."""

from __future__ import annotations

from collections.abc import Collection, Iterator
from typing import Literal, overload

import numpy as np
from numpy.typing import ArrayLike, DTypeLike
from numpy_groupies import aggregate_nb as aggregate
from scipy.spatial import KDTree

from ribs._utils import (
    check_batch_shape,
    check_finite,
    check_shape,
    validate_batch,
    validate_single,
)
from ribs.archives._archive_base import ArchiveBase
from ribs.archives._archive_data_frame import ArchiveDataFrame
from ribs.archives._archive_stats import ArchiveStats
from ribs.archives._array_store import ArrayStore
from ribs.archives._utils import fill_sentinel_values, parse_all_dtypes
from ribs.typing import BatchData, FieldDesc, Float, Int, SingleData


[docs] class ProximityArchive(ArchiveBase): r"""An archive that adds new solutions based on novelty. This archive originates in Novelty Search and is described in `Lehman 2011 <https://web.archive.org/web/20220707041732/https://eplex.cs.ucf.edu/papers/lehman_ecj11.pdf>`_. Solutions are added to the archive if their `novelty` exceeds a certain threshold. `Novelty` :math:`\rho` is defined as the average (Euclidean) distance in measure space to the :math:`k`-nearest neighbors of the solution in the archive: .. math:: \rho(x) = \frac{1}{k}\sum_{i=1}^{k}\text{dist}(x, \mu_i) Where :math:`x` is the measure value of some solution, and :math:`\mu_{1..k}` are the measure values of the :math:`k`-nearest neighbors in measure space. This archive also supports the local competition computation from Novelty Search with Local Competition, described in `Lehman 2011b <https://web.archive.org/web/20111206122453/http://eplex.cs.ucf.edu/papers/lehman_gecco11.pdf>`_. .. note:: When used for diversity optimization, this archive does not require any objectives, and ``objective=None`` can be passed into :meth:`add`. For consistency with the rest of pyribs, ``objective=None`` will result in a default objective value of 0, which will also cause stats like QD score and best objective to be 0. Alternatively, it is possible to associate objectives with the solutions by passing ``objective`` to :meth:`add` just like in other archives. .. note:: Some statistics will behave differently than in other archives: - If this archive has any solutions in it, the coverage (``archive.stats.coverage``) will always be reported as 1. This is because the archive is unbounded, so there is no predefined number of cells to fill. As such, ``archive.stats.num_elites`` may provide a more meaningful coverage metric. It is also common to create a :class:`~ribs.archives.GridArchive` or :class:`~ribs.archives.CVTArchive` as a result archive, from which a meaningful coverage can be computed. - Since the number of archive cells equals the number of elites in the archive, the normalized QD score (``archive.stats.norm_qd_score``) will always equal the mean objective (``archive.stats.obj_mean``). By default, this archive stores the following data fields: ``solution``, ``objective``, ``measures``, and ``index``. The integer ``index`` uniquely identifies each cell. Args: solution_dim: Dimensionality of the solution space. Scalar or multi-dimensional solution shapes are allowed by passing an empty tuple or tuple of integers, respectively. measure_dim: Dimensionality of the measure space. k_neighbors: The maximum number of nearest neighbors for computing novelty (`maximum` here is indicated since there may be fewer than ``k_neighbors`` solutions in the archive). novelty_threshold: The level of novelty required to add a solution to the archive. local_competition: Whether to turn on local competition behavior. If turned on, the archive will require objectives to be passed in during :meth:`add`. Furthermore, the ``add_info`` returned by :meth:`add` will include local competition information. Finally, solutions can be replaced in the archive. Specifically, if a candidate solution's novelty is below the novelty threshold, its objective will be compared to that of its nearest neighbor. If the candidate's objective is higher, it will replace the nearest neighbor. initial_capacity: Since this archive is unstructured, it does not have a fixed size, and it will grow as solutions are added. In the implementation, we store solutions in fixed-size arrays, and every time the capacity of these arrays is reached, we double their sizes (similar to the vector in C++). This parameter determines the initial capacity of the archive's arrays. It may be useful when it is known in advance how large the archive will grow. qd_score_offset: Archives often contain negative objective values, and if the QD score were to be computed with these negative objectives, the algorithm would be penalized for adding new cells with negative objectives. Thus, a standard practice is to normalize all the objectives so that they are non-negative by introducing an offset. This QD score offset will be *subtracted* from all objectives in the archive, e.g., if your objectives go as low as -300, pass in -300 so that each objective will be transformed as ``objective - (-300)``. seed: Value to seed the random number generator. Set to None to avoid a fixed seed. solution_dtype: Data type of the solutions. Defaults to float64 (numpy's default floating point type). objective_dtype: Data type of the objectives. Defaults to float64 (numpy's default floating point type). measures_dtype: Data type of the measures. Defaults to float64 (numpy's default floating point type). dtype: Shortcut for providing data type of the solutions, objectives, and measures. Defaults to float64 (numpy's default floating point type). This parameter sets all the dtypes simultaneously. To set individual dtypes, pass ``solution_dtype``, ``objective_dtype``, and ``measures_dtype``. Note that ``dtype`` cannot be used at the same time as those parameters. extra_fields: Description of extra fields of data that are stored next to elite data like solutions and objectives. The description is a dict mapping from a field name (str) to a tuple of ``(shape, dtype)``. For instance, ``{"foo": ((), np.float32), "bar": ((10,), np.float32)}`` will create a "foo" field that contains scalar values and a "bar" field that contains 10D values. Note that field names must be valid Python identifiers, and names already used in the archive are not allowed. kdtree_kwargs: When computing nearest neighbors, we construct a :class:`~scipy.spatial.KDTree`. This parameter will pass additional kwargs when constructing the tree. By default, we do not pass in any kwargs. Raises: ValueError: ``initial_capacity`` must be at least 1. """ def __init__( self, *, solution_dim: Int | tuple[Int, ...], measure_dim: Int, k_neighbors: Int, novelty_threshold: Float, local_competition: bool = False, initial_capacity: Int = 128, qd_score_offset: Float = 0.0, seed: Int | None = None, solution_dtype: DTypeLike = None, objective_dtype: DTypeLike = None, measures_dtype: DTypeLike = None, dtype: DTypeLike = None, extra_fields: FieldDesc | None = None, kdtree_kwargs: dict | None = None, # Deprecated parameters. ckdtree_kwargs: None = None, ) -> None: if ckdtree_kwargs is not None: raise ValueError( "`ckdtree_kwargs` is deprecated in pyribs 0.9.0. " "Please use `kdtree_kwargs` instead." ) self._rng = np.random.default_rng(seed) ArchiveBase.__init__( self, solution_dim=solution_dim, objective_dim=(), measure_dim=measure_dim, ) # Set up the ArrayStore, which is a data structure that stores all the elites' # data in arrays sharing a common index. extra_fields = extra_fields or {} reserved_fields = {"solution", "objective", "measures", "index"} if reserved_fields & extra_fields.keys(): raise ValueError( "The following names are not allowed in " f"extra_fields: {reserved_fields}" ) if initial_capacity < 1: raise ValueError("initial_capacity must be at least 1.") solution_dtype, objective_dtype, measures_dtype = parse_all_dtypes( dtype, solution_dtype, objective_dtype, measures_dtype, np ) self._store = ArrayStore( field_desc={ "solution": (self.solution_dim, solution_dtype), "objective": ((), objective_dtype), "measures": (self.measure_dim, measures_dtype), **extra_fields, }, capacity=initial_capacity, ) # Set up constant properties. self._k_neighbors = int(k_neighbors) self._novelty_threshold = np.asarray( novelty_threshold, dtype=self.dtypes["measures"] ) self._local_competition = local_competition self._kdtree_kwargs = {} if kdtree_kwargs is None else kdtree_kwargs.copy() self._qd_score_offset = np.asarray( qd_score_offset, dtype=self.dtypes["objective"] ) # Set up k-D tree with current measures in the archive. Updated on add(). self._cur_kd_tree = KDTree(self._store.data("measures"), **self._kdtree_kwargs) # Set up statistics -- objective_sum is the sum of all objective values in the # archive; it is useful for computing qd_score and obj_mean. self._best_elite = None self._objective_sum = None self._stats = None self._stats_reset() ## Properties inherited from ArchiveBase ## @property def field_list(self) -> list[str]: return self._store.field_list_with_index @property def dtypes(self) -> dict[str, np.dtype]: return self._store.dtypes_with_index @property def stats(self) -> ArchiveStats: return self._stats @property def empty(self) -> bool: return len(self._store) == 0 ## Properties that are not in ArchiveBase ## ## Roughly ordered by the parameter list in the constructor. ## @property def best_elite(self) -> SingleData: """The elite with the highest objective in the archive. None if there are no elites in the archive. """ return self._best_elite @property def k_neighbors(self) -> int: """The number of nearest neighbors for computing novelty.""" return self._k_neighbors @property def novelty_threshold(self) -> float: """The degree of novelty required add a solution to the archive.""" return self._novelty_threshold @property def local_competition(self) -> bool: """Whether local competition behavior is turned on.""" return self._local_competition @property def capacity(self) -> int: """Number of solutions that can currently be stored in this archive. The capacity doubles every time the archive fills up. """ return self._store.capacity @property def cells(self) -> int: """Included for API compatibility; equivalent to :meth:`__len__`. Strictly speaking, this archive does not have "cells" since it does not have a tessellation like other archives. However, for API compatibility, we set the number of cells as equal to the number of solutions currently in the archive. """ return len(self) @property def qd_score_offset(self) -> float: """Subtracted from objective values when computing the QD score.""" return self._qd_score_offset ## dunder methods ## def __len__(self) -> int: return len(self._store) def __iter__(self) -> Iterator[SingleData]: return iter(self._store) ## Utilities ## def _stats_reset(self) -> None: """Resets the archive stats.""" self._best_elite = None self._objective_sum = np.asarray(0.0, dtype=self.dtypes["objective"]) self._stats = ArchiveStats( num_elites=0, coverage=np.asarray(0.0, dtype=self.dtypes["objective"]), qd_score=np.asarray(0.0, dtype=self.dtypes["objective"]), norm_qd_score=np.asarray(0.0, dtype=self.dtypes["objective"]), obj_max=None, obj_mean=None, ) def _stats_update(self, new_objective_sum: Float, new_best_index: Float) -> None: """Updates statistics. Update is based on a new sum of objective values (new_objective_sum) and the index of a potential new best elite (new_best_index). """ _, new_best_elite = self._store.retrieve([new_best_index]) new_best_elite = {k: v[0] for k, v in new_best_elite.items()} if ( self._stats.obj_max is None or new_best_elite["objective"] > self._stats.obj_max ): self._best_elite = new_best_elite new_obj_max = new_best_elite["objective"] else: new_obj_max = self._stats.obj_max self._objective_sum = new_objective_sum new_qd_score = ( self._objective_sum - np.asarray(len(self), dtype=self.dtypes["objective"]) * self._qd_score_offset ) self._stats = ArchiveStats( num_elites=len(self), coverage=np.asarray(len(self) / self.cells, dtype=self.dtypes["objective"]), qd_score=new_qd_score, norm_qd_score=np.asarray( new_qd_score / self.cells, dtype=self.dtypes["objective"] ), obj_max=new_obj_max, obj_mean=np.asarray( self._objective_sum / len(self), dtype=self.dtypes["objective"] ), )
[docs] def index_of(self, measures: ArrayLike) -> np.ndarray: """Returns the index of the closest solution to the given measures. Unlike the structured archives like :class:`~ribs.archives.GridArchive`, this archive does not have indexed cells where each measure "belongs." Thus, this method instead returns the index of the solution with the closest measure to each solution passed in. This means that :meth:`retrieve` will return the solution with the closest measure to each measure passed into that method. Args: measures: (batch_size, :attr:`measure_dim`) array of coordinates in measure space. Returns: (batch_size,) array of integer indices representing the location of the solution in the archive. Raises: RuntimeError: There were no entries in the archive. ValueError: ``measures`` is not of shape (batch_size, :attr:`measure_dim`). ValueError: ``measures`` has non-finite values (inf or NaN). """ measures = np.asarray(measures, dtype=self.dtypes["measures"]) check_batch_shape(measures, "measures", self.measure_dim, "measure_dim") check_finite(measures, "measures") if self.empty: raise RuntimeError( "There were no solutions in the archive. " "`ProximityArchive.index_of` computes the nearest " "neighbor to the input measures, so there must be at least one " "solution present in the archive." ) _, indices = self._cur_kd_tree.query(measures) return indices.astype(np.int32)
[docs] def index_of_single(self, measures: ArrayLike) -> Int: """Returns the index of the measures for one solution. See :meth:`index_of`. Args: measures: (:attr:`measure_dim`,) array of measures for a single solution. Returns: Integer index of the measures in the archive's storage arrays. Raises: ValueError: ``measures`` is not of shape (:attr:`measure_dim`,). ValueError: ``measures`` has non-finite values (inf or NaN). """ measures = np.asarray(measures, dtype=self.dtypes["measures"]) check_shape(measures, "measures", self.measure_dim, "measure_dim") check_finite(measures, "measures") return self.index_of(measures[None])[0]
[docs] def compute_novelty( self, measures: ArrayLike, local_competition: ArrayLike | None = None ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: """Computes the novelty and local competition of the given measures. Args: measures: (batch_size, :attr:`measure_dim`) array of coordinates in measure space. local_competition: This can be None to indicate not to compute local competition. Otherwise, it can be a (batch_size,) array of objective values to use as references for computing objective values. Returns: Either one array or a tuple of two arrays: - (batch_size,) array holding the novelty score of each measure. If the archive is empty, the novelty is set to the :attr:`novelty_threshold`. - If ``local_competition`` is passed in, a (batch_size,) array holding the local competition of each solution will also be returned. If the archive is empty, the local competition will be set to 0. """ measures = np.asarray(measures) batch_size = len(measures) use_local_competition = local_competition is not None if use_local_competition: objectives = np.asarray(local_competition) if self.empty: # Set default values for novelty and local competition when archive is # empty. novelty = np.full( batch_size, self.novelty_threshold, dtype=self.dtypes["measures"] ) if use_local_competition: local_competition_scores = np.zeros(len(novelty), dtype=np.int32) else: # Compute nearest neighbors. k_neighbors = min(len(self), self.k_neighbors) dists, indices = self._cur_kd_tree.query(measures, k=k_neighbors) # Expand since query() automatically squeezes the last dim when k=1. dists = dists[:, None] if k_neighbors == 1 else dists novelty = np.mean(dists, axis=1) if use_local_competition: indices = indices[:, None] if k_neighbors == 1 else indices # The first item returned by `retrieve` is `occupied` -- all these # indices are occupied since they are indices of solutions in the # archive. neighbor_objectives = self._store.retrieve( indices.ravel(), "objective" )[1] neighbor_objectives = neighbor_objectives.reshape(indices.shape) # Local competition is the number of neighbors who have a lower # objective. local_competition_scores = np.sum( neighbor_objectives < objectives[:, None], axis=1, dtype=np.int32, ) if use_local_competition: return novelty, local_competition_scores # pylint: disable = used-before-assignment else: return novelty
## Methods for writing to the archive ## def _maybe_resize(self, new_size: int) -> None: """Resizes the store by doubling its capacity. We may need to double the capacity multiple times. The log2 below indicates how many times we would need to double the capacity, and we obtain the final multiplier by raising to a power of 2. """ if new_size > self.capacity: multiplier = 2 ** int(np.ceil(np.log2(new_size / self.capacity))) self._store.resize(multiplier * self.capacity)
[docs] def add( self, solution: ArrayLike, objective: ArrayLike | None, measures: ArrayLike, **fields: ArrayLike, ) -> BatchData: """Inserts a batch of solutions into the archive. Solutions are inserted if they have a high enough novelty score as discussed in the documentation for this class. The novelty is determined by comparing to solutions currently in the archive. If :attr:`local_competition` is turned on, solutions can also replace existing solutions in the archive. Namely, if the solution was not novel enough to be added, it will be compared to its nearest neighbor, and if it exceeds the objective value of its nearest neighbor, it will replace the nearest neighbor. If there are conflicts where multiple solutions may replace a single solution, the highest-performing is chosen. .. note:: The indices of all arguments should "correspond" to each other, i.e. ``solution[i]``, ``objective[i]``, ``measures[i]``, and should be the solution parameters, objective, and measures for solution ``i``. Args: solution: (batch_size, :attr:`solution_dim`) array of solution parameters. objective: A value of None will cause the objective values to default to 0. However, if the user wishes to associate an objective with each solution, this can be a (batch_size,) array with objective function evaluations of the solutions. If :attr:`local_competition` is turned on, this argument must be provided. measures: (batch_size, :attr:`measure_dim`) array with measure space coordinates of all the solutions. fields: Additional data for each solution. Each argument should be an array with batch_size as the first dimension. Returns: Information describing the result of the add operation. The dict contains the following keys: - ``"status"`` (:class:`numpy.ndarray` of :class:`numpy.int32`): An array of integers that represent the "status" obtained when attempting to insert each solution in the batch. Each item has the following possible values: - ``0``: The solution was not added to the archive. - ``1``: The solution replaced an existing solution in the archive due to having a higher objective (only applies if :attr:`local_competition` is turned on). - ``2``: The solution was added to the archive due to being sufficiently novel. To convert statuses to a more semantic format, cast all statuses to :class:`AddStatus` e.g. with ``[AddStatus(s) for s in add_info["status"]]``. - ``"novelty"`` (:class:`numpy.ndarray` of :attr:`dtypes` ["measures"]): The computed novelty of the solutions passed in. If there were no solutions to compute novelty with respect to (i.e., the archive was empty), the novelty is set to the :attr:`novelty_threshold`. - ``"local_competition"`` (:class:`numpy.ndarray` of :class:`int`): Only available if :attr:`local_competition` is turned on. Indicates, for each solution, how many of the nearest neighbors had lower objective values. Maximum value is :attr:`k_neighbors`. If there were no solutions to compute novelty with respect to, (i.e., the archive was empty), the local competition is set to 0. - ``"value"`` (:class:`numpy.ndarray` of :attr:`dtypes` ["objective"]): Only available if :attr:`local_competition` is turned on. The meaning of each value depends on the corresponding ``status`` and is inspired by the values in CMA-ME (`Fontaine 2020 <https://arxiv.org/abs/1912.02400>`_): - ``0`` (not added): The value is the "negative improvement," i.e., the objective of the solution passed in minus the objective of the nearest neighbor (this value is negative because the solution did not have a high enough objective to be added to the archive). - ``1`` (replace/improve existing solution): The value is the "improvement," i.e., the objective of the solution passed in minus the objective of the elite that was replaced. - ``2`` (new solution): The value is just the objective of the solution. Raises: ValueError: The array arguments do not match their specified shapes. ValueError: ``objective`` or ``measures`` has non-finite values (inf or NaN). ValueError: ``local_competition`` is turned on but objective was not passed in. """ if objective is None: if self.local_competition: raise ValueError( "If local competition is turned on, objective " "must be passed in to add()." ) else: objective = np.zeros(len(solution), dtype=self.dtypes["objective"]) data = validate_batch( self, { "solution": solution, "objective": objective, "measures": measures, **fields, }, ) # Delete these so that we only use the clean, validated data in `data`. del solution, objective, measures, fields if not self.local_competition: # Regular addition -- add solutions that are novel enough. novelty = self.compute_novelty(measures=data["measures"]) novel_enough = novelty >= self.novelty_threshold n_novel_enough = np.sum(novel_enough) new_size = len(self) + n_novel_enough self._maybe_resize(new_size) add_info = {} add_info["status"] = np.zeros(len(data["measures"]), dtype=np.int32) add_info["status"][novel_enough] = 2 # New solution. add_info["novelty"] = novelty if n_novel_enough > 0: # Filter the data to solutions that were novel enough. data = {key: val[novel_enough] for key, val in data.items()} # These are the new indices where novel solutions will be placed. We # append to the current collection of solutions by getting the next # `new_size` indices. indices = np.arange(len(self), new_size) # Add to archive. self._store.add(indices, data) # Compute statistics. best_index = indices[np.argmax(data["objective"])] objective_sum = self._objective_sum + np.sum(data["objective"]) self._stats_update(objective_sum, best_index) # Make a new tree with the updated solutions. self._cur_kd_tree = KDTree( self._store.data("measures"), **self._kdtree_kwargs ) return add_info else: batch_size = len(data["measures"]) # Addition with local competition. The key difference from above is that # solutions that are not novel enough have the potential to replace their # nearest neighbors in the archive. As such, similar to GridArchive.add, we # need to handle batch additions. novelty, local_competition = self.compute_novelty( measures=data["measures"], local_competition=data["objective"], ) novel_enough = novelty >= self.novelty_threshold not_novel_enough = ~novel_enough n_novel_enough = np.sum(novel_enough) n_not_novel_enough = batch_size - n_novel_enough new_size = len(self) + n_novel_enough self._maybe_resize(new_size) # Separate out the novel data for the final addition. New solutions are # assigned indices such that they append to the current store. novel_data = {name: arr[novel_enough] for name, arr in data.items()} novel_indices = np.arange(len(self), new_size) # Separate out the non-novel data for further processing. Solutions that # were not novel enough have the potential to replace their nearest # neighbors in the archive. data = {name: arr[not_novel_enough] for name, arr in data.items()} indices = ( self.index_of(data["measures"]) if n_not_novel_enough > 0 else np.array([], dtype=np.int32) ) # All entries are occupied since these solutions were not novel, and their # index from `index_of` is the index of their nearest neighbor. _, cur_data = self._store.retrieve(indices) cur_objective = cur_data["objective"] # Can only be used to index `data` and `indices`. improve_existing = data["objective"] > cur_objective # Information to return about the addition. add_info = {} add_info["status"] = np.zeros(batch_size, dtype=np.int32) add_info["status"][novel_enough] = 2 # Sets to 1 if improves over the neighbor. add_info["status"][not_novel_enough] = improve_existing add_info["value"] = np.empty(batch_size, dtype=self.dtypes["objective"]) add_info["value"][novel_enough] = novel_data["objective"] add_info["value"][not_novel_enough] = data["objective"] - cur_objective add_info["novelty"] = novelty add_info["local_competition"] = local_competition if np.any(improve_existing): # Select all solutions that can be inserted due to beating their # neighbors -- at this point, there are still conflicts in the # insertions, e.g., multiple solutions can map to index 0. indices = indices[improve_existing] data = {name: arr[improve_existing] for name, arr in data.items()} cur_objective = cur_objective[improve_existing] # Retrieve indices of solutions that _should_ be inserted into the # archive. Currently, multiple solutions may be inserted at each archive # index, but we only want to insert the maximum among these solutions. # Thus, we obtain the argmax for each archive index. # # We use a fill_value of -1 to indicate archive indices that were not # covered in the batch. Note that the length of archive_argmax is only # max(indices), rather than the total number of grid cells. However, # this is okay because we only need the indices of the solutions, which # we store in should_insert. # # aggregate() always chooses the first item if there are ties, so the # first elite will be inserted if there is a tie. See their default # numpy implementation for more info: # https://github.com/ml31415/numpy-groupies/blob/master/numpy_groupies/aggregate_numpy.py#L107 archive_argmax = aggregate( indices, data["objective"], func="argmax", fill_value=-1 ) should_insert = archive_argmax[archive_argmax != -1] # Select only solutions that will be inserted into the archive. indices = indices[should_insert] data = {name: arr[should_insert] for name, arr in data.items()} cur_objective = cur_objective[should_insert] if np.any(improve_existing) or n_novel_enough > 0: combined_indices = np.concatenate((indices, novel_indices), axis=0) combined_data = { name: np.concatenate((data[name], novel_data[name]), axis=0) for name in data } # Insert the solutions that improved over their neighbors, as well as # the solutions that are novel. self._store.add(combined_indices, combined_data) # Compute statistics. objective_sum = ( self._objective_sum + np.sum(novel_data["objective"]) + np.sum(data["objective"] - cur_objective) ) best_index = combined_indices[np.argmax(combined_data["objective"])] self._stats_update(objective_sum, best_index) # Make a new tree with the updated solutions. self._cur_kd_tree = KDTree( self._store.data("measures"), **self._kdtree_kwargs ) return add_info
[docs] def add_single( self, solution: ArrayLike, objective: ArrayLike | None, measures: ArrayLike, **fields: ArrayLike, ) -> SingleData: """Inserts a single solution into the archive. Args: solution: Parameters of the solution. objective: Set to None to get the default value of 0; otherwise, a valid objective value is also acceptable. measures: Coordinates in measure space of the solution. fields: Additional data for the solution. Returns: Information describing the result of the add operation. The dict contains ``status`` and ``novelty`` keys; refer to :meth:`add` for the meaning of status and novelty. Raises: ValueError: The array arguments do not match their specified shapes. ValueError: ``objective`` is non-finite (inf or NaN) or ``measures`` has non-finite values. ValueError: ``local_competition`` is turned on but objective was not passed in. """ if objective is None: if self.local_competition: raise ValueError( "If local competition is turned on, objective " "must be passed in to add_single()." ) else: objective = 0.0 data = validate_single( self, { "solution": solution, "objective": objective, "measures": measures, **fields, }, ) return self.add(**{key: [val] for key, val in data.items()})
[docs] def clear(self) -> None: """Removes all elites in the archive.""" self._store.clear() self._stats_reset()
## Methods for reading from the archive ## ## Refer to ArchiveBase for documentation of these methods. ##
[docs] def retrieve(self, measures: ArrayLike) -> tuple[np.ndarray, BatchData]: measures = np.asarray(measures, dtype=self.dtypes["measures"]) check_batch_shape(measures, "measures", self.measure_dim, "measure_dim") check_finite(measures, "measures") occupied, data = self._store.retrieve(self.index_of(measures)) fill_sentinel_values(occupied, data) return occupied, data
[docs] def retrieve_single(self, measures: ArrayLike) -> tuple[bool, SingleData]: measures = np.asarray(measures, dtype=self.dtypes["measures"]) check_shape(measures, "measures", self.measure_dim, "measure_dim") check_finite(measures, "measures") occupied, data = self.retrieve(measures[None]) return occupied[0], {field: arr[0] for field, arr in data.items()}
@overload def data( self, fields: str, return_type: Literal["dict", "tuple", "pandas"] = "dict", ) -> np.ndarray: ... @overload def data( self, fields: None | Collection[str] = None, return_type: Literal["dict"] = "dict", ) -> BatchData: ... @overload def data( self, fields: None | Collection[str] = None, return_type: Literal["tuple"] = "tuple", ) -> tuple[np.ndarray]: ... @overload def data( self, fields: None | Collection[str] = None, return_type: Literal["pandas"] = "pandas", ) -> ArchiveDataFrame: ...
[docs] def data( self, fields: None | Collection[str] | str = None, return_type: Literal["dict", "tuple", "pandas"] = "dict", ) -> np.ndarray | BatchData | tuple[np.ndarray] | ArchiveDataFrame: return self._store.data(fields, return_type)
[docs] def sample_elites(self, n: Int, replace: bool = True) -> BatchData: if self.empty: raise IndexError("No elements in archive.") if not replace and n > len(self._store): raise ValueError( "Cannot take a larger sample than the number of elites " "in the archive when 'replace=False'" ) random_indices = self._rng.choice(len(self._store), size=n, replace=replace) selected_indices = self._store.occupied_list[random_indices] _, elites = self._store.retrieve(selected_indices) return elites