Source code for ribs.archives._cvt_archive

"""Contains the CVTArchive class."""
import numbers

import numpy as np
from scipy.spatial import cKDTree  # pylint: disable=no-name-in-module
from scipy.stats.qmc import Halton, Sobol
from sklearn.cluster import k_means

from ribs._utils import check_batch_shape, check_finite
from ribs.archives._archive_base import ArchiveBase

[docs]class CVTArchive(ArchiveBase): """An archive that divides the entire measure space into a fixed number of cells. This archive originates in `Vassiliades 2018 <>`_. It uses Centroidal Voronoi Tessellation (CVT) to divide an n-dimensional measure space into k cells. The CVT is created by sampling points uniformly from the n-dimensional measure space and using k-means clustering to identify k centroids. When items are inserted into the archive, we identify their cell by identifying the closest centroid in measure space (using Euclidean distance). For k-means clustering, we use :func:`sklearn.cluster.k_means`. By default, finding the closest centroid is done in roughly O(log(number of cells)) time using :class:`scipy.spatial.cKDTree`. To switch to brute force, which takes O(number of cells) time, pass ``use_kd_tree=False``. To compare the performance of using the k-D tree vs brute force, we ran benchmarks where we inserted 1k batches of 100 solutions into a 2D archive with varying numbers of cells. We took the minimum over 5 runs for each data point, as recommended in the docs for :meth:`timeit.Timer.repeat`. Note the logarithmic scales. This plot was generated on a reasonably modern laptop. .. image:: ../_static/imgs/cvt_add_plot.png :alt: Runtime to insert 100k entries into CVTArchive Across almost all numbers of cells, using the k-D tree is faster than using brute force. Thus, **we recommend always using the k-D tree.** See `benchmarks/ <>`_ in the project repo for more information about how this plot was generated. Finally, if running multiple experiments, it may be beneficial to use the same centroids across each experiment. Doing so can keep experiments consistent and reduce execution time. To do this, either (1) construct custom centroids and pass them in via the ``custom_centroids`` argument, or (2) access the centroids created in the first archive with :attr:`centroids` and pass them into ``custom_centroids`` when constructing archives for subsequent experiments. .. note:: The idea of archive thresholds was introduced in `Fontaine 2022 <>`_. For more info on thresholds, including the ``learning_rate`` and ``threshold_min`` parameters, refer to our tutorial :doc:`/tutorials/cma_mae`. .. note:: For more information on our choice of k-D tree implementation, see :pr:`38`. Args: solution_dim (int): Dimension of the solution space. cells (int): The number of cells to use in the archive, equivalent to the number of centroids/areas in the CVT. ranges (array-like of (float, float)): Upper and lower bound of each dimension of the measure space, e.g. ``[(-1, 1), (-2, 2)]`` indicates the first dimension should have bounds :math:`[-1,1]` (inclusive), and the second dimension should have bounds :math:`[-2,2]` (inclusive). ``ranges`` should be the same length as ``dims``. learning_rate (float): The learning rate for threshold updates. Defaults to 1.0. threshold_min (float): The initial threshold value for all the cells. qd_score_offset (float): Archives often contain negative objective values, and if the QD score were to be computed with these negative objectives, the algorithm would be penalized for adding new cells with negative objectives. Thus, a standard practice is to normalize all the objectives so that they are non-negative by introducing an offset. This QD score offset will be *subtracted* from all objectives in the archive, e.g., if your objectives go as low as -300, pass in -300 so that each objective will be transformed as ``objective - (-300)``. seed (int): Value to seed the random number generator as well as :func:`~sklearn.cluster.k_means`. Set to None to avoid a fixed seed. dtype (str or data-type): Data type of the solutions, objectives, and measures. We only support ``"f"`` / ``np.float32`` and ``"d"`` / ``np.float64``. extra_fields (dict): Description of extra fields of data that is stored next to elite data like solutions and objectives. The description is a dict mapping from a field name (str) to a tuple of ``(shape, dtype)``. For instance, ``{"foo": ((), np.float32), "bar": ((10,), np.float32)}`` will create a "foo" field that contains scalar values and a "bar" field that contains 10D values. Note that field names must be valid Python identifiers, and names already used in the archive are not allowed. custom_centroids (array-like): If passed in, this (cells, measure_dim) array will be used as the centroids of the CVT instead of generating new ones. In this case, ``samples`` will be ignored, and ``archive.samples`` will be None. This can be useful when one wishes to use the same CVT across experiments for fair comparison. centroid_method (str): Pass in the following methods for generating centroids: "random", "sobol", "scrambled_sobol", "halton". Default method is "kmeans". These methods are derived from Mouret 2023: Note: Samples are only used when method is "kmeans". samples (int or array-like): If it is an int, this specifies the number of samples to generate when creating the CVT. Otherwise, this must be a (num_samples, measure_dim) array where samples[i] is a sample to use when creating the CVT. It can be useful to pass in custom samples when there are restrictions on what samples in the measure space are (physically) possible. k_means_kwargs (dict): kwargs for :func:`~sklearn.cluster.k_means`. By default, we pass in `n_init=1`, `init="random"`, `algorithm="lloyd"`, and `random_state=seed`. use_kd_tree (bool): If True, use a k-D tree for finding the closest centroid when inserting into the archive. If False, brute force will be used instead. ckdtree_kwargs (dict): kwargs for :class:`~scipy.spatial.cKDTree`. By default, we do not pass in any kwargs. chunk_size (int): If passed, brute forcing the closest centroid search will chunk the distance calculations to compute chunk_size inputs at a time. Raises: ValueError: The ``samples`` array or the ``custom_centroids`` array has the wrong shape. """ def __init__(self, *, solution_dim, cells, ranges, learning_rate=None, threshold_min=-np.inf, qd_score_offset=0.0, seed=None, dtype=np.float64, extra_fields=None, custom_centroids=None, centroid_method="kmeans", samples=100_000, k_means_kwargs=None, use_kd_tree=True, ckdtree_kwargs=None, chunk_size=None): ArchiveBase.__init__( self, solution_dim=solution_dim, cells=cells, measure_dim=len(ranges), learning_rate=learning_rate, threshold_min=threshold_min, qd_score_offset=qd_score_offset, seed=seed, dtype=dtype, extra_fields=extra_fields, ) ranges = list(zip(*ranges)) self._lower_bounds = np.array(ranges[0], dtype=self.dtype) self._upper_bounds = np.array(ranges[1], dtype=self.dtype) self._interval_size = self._upper_bounds - self._lower_bounds # Apply default args for k-means. Users can easily override these, # particularly if they want higher quality clusters. self._k_means_kwargs = ({} if k_means_kwargs is None else k_means_kwargs.copy()) self._k_means_kwargs.setdefault( # Only run one iter to be fast. "n_init", 1) self._k_means_kwargs.setdefault( # The default "k-means++" takes very long to init. "init", "random") self._k_means_kwargs.setdefault("algorithm", "lloyd") self._k_means_kwargs.setdefault("random_state", seed) if custom_centroids is None: self._samples = None if centroid_method == "kmeans": if not isinstance(samples, numbers.Integral): # Validate shape of custom samples. samples = np.asarray(samples, dtype=self.dtype) if samples.shape[1] != self._measure_dim: raise ValueError( f"Samples has shape {samples.shape} but must be of " f"shape (n_samples, len(ranges)=" f"{self._measure_dim})") self._samples = samples else: self._samples = self._rng.uniform( self._lower_bounds, self._upper_bounds, size=(samples, self._measure_dim), ).astype(self.dtype) self._centroids = k_means(self._samples, self._cells, **self._k_means_kwargs)[0] if self._centroids.shape[0] < self._cells: raise RuntimeError( "While generating the CVT, k-means clustering found " f"{self._centroids.shape[0]} centroids, but this " f"archive needs {self._cells} cells. This most " "likely happened because there are too few samples " "and/or too many cells.") elif centroid_method == "random": # Generates random centroids. self._centroids = self._rng.uniform(self._lower_bounds, self._upper_bounds, size=(self._cells, self._measure_dim)) elif centroid_method == "sobol": # Generates centroids as a Sobol sequence. sampler = Sobol(d=self._measure_dim, scramble=False) sobol_nums = sampler.random(n=self._cells) self._centroids = (self._lower_bounds + sobol_nums * (self._upper_bounds - self._lower_bounds)) elif centroid_method == "scrambled_sobol": # Generates centroids as a scrambled Sobol sequence. sampler = Sobol(d=self._measure_dim, scramble=True) sobol_nums = sampler.random(n=self._cells) self._centroids = (self._lower_bounds + sobol_nums * (self._upper_bounds - self._lower_bounds)) elif centroid_method == "halton": # Generates centroids with a Halton sequence. sampler = Halton(d=self._measure_dim) halton_nums = sampler.random(n=self._cells) self._centroids = (self._lower_bounds + halton_nums * (self._upper_bounds - self._lower_bounds)) else: # Validate shape of `custom_centroids` when they are provided. custom_centroids = np.asarray(custom_centroids, dtype=self.dtype) if custom_centroids.shape != (cells, self._measure_dim): raise ValueError( f"custom_centroids has shape {custom_centroids.shape} but " f"must be of shape (cells={cells}, len(ranges)=" f"{self._measure_dim})") self._centroids = custom_centroids self._samples = None self._use_kd_tree = use_kd_tree self._centroid_kd_tree = None self._ckdtree_kwargs = ({} if ckdtree_kwargs is None else ckdtree_kwargs.copy()) self._chunk_size = chunk_size if self._use_kd_tree: self._centroid_kd_tree = cKDTree(self._centroids, **self._ckdtree_kwargs) @property def lower_bounds(self): """(measure_dim,) numpy.ndarray: Lower bound of each dimension.""" return self._lower_bounds @property def upper_bounds(self): """(measure_dim,) numpy.ndarray: Upper bound of each dimension.""" return self._upper_bounds @property def interval_size(self): """(measure_dim,) numpy.ndarray: The size of each dim (upper_bounds - lower_bounds).""" return self._interval_size @property def samples(self): """(num_samples, measure_dim) numpy.ndarray: The samples used in creating the CVT. Will be None if custom centroids were passed in to the archive. """ return self._samples @property def centroids(self): """(n_centroids, measure_dim) numpy.ndarray: The centroids used in the CVT. """ return self._centroids
[docs] def index_of(self, measures): """Finds the indices of the centroid closest to the given coordinates in measure space. If ``index_batch`` is the batch of indices returned by this method, then ``archive.centroids[index_batch[i]]`` holds the coordinates of the centroid closest to ``measures[i]``. See :attr:`centroids` for more info. The centroid indices are located using either the k-D tree or brute force, depending on the value of ``use_kd_tree`` in the constructor. Args: measures (array-like): (batch_size, :attr:`measure_dim`) array of coordinates in measure space. Returns: numpy.ndarray: (batch_size,) array of centroid indices corresponding to each measure space coordinate. Raises: ValueError: ``measures`` is not of shape (batch_size, :attr:`measure_dim`). ValueError: ``measures`` has non-finite values (inf or NaN). """ measures = np.asarray(measures) check_batch_shape(measures, "measures", self.measure_dim, "measure_dim") check_finite(measures, "measures") if self._use_kd_tree: _, indices = self._centroid_kd_tree.query(measures) return indices.astype(np.int32) else: expanded_measures = np.expand_dims(measures, axis=1) # Compute indices chunks at a time if self._chunk_size is not None and \ self._chunk_size < measures.shape[0]: indices = [] chunks = np.array_split( expanded_measures, np.ceil(len(expanded_measures) / self._chunk_size)) for chunk in chunks: distances = chunk - self.centroids distances = np.sum(np.square(distances), axis=2) current_res = np.argmin(distances, axis=1).astype(np.int32) indices.append(current_res) return np.concatenate(tuple(indices)) else: # Brute force distance calculation -- start by taking the # difference between each measure i and all the centroids. distances = expanded_measures - self.centroids # Compute the total squared distance -- no need to compute # actual distance with a sqrt. distances = np.sum(np.square(distances), axis=2) return np.argmin(distances, axis=1).astype(np.int32)