Source code for ribs.emitters._gradient_operator_emitter

"""Provides the GradientOperatorEmitter."""

from __future__ import annotations

from collections.abc import Collection
from typing import Literal

import numpy as np
from numpy.typing import ArrayLike

from ribs._utils import check_batch_shape, check_shape, validate_batch
from ribs.archives import ArchiveBase
from ribs.emitters._emitter_base import EmitterBase
from ribs.typing import BatchData, Float, Int


[docs] class GradientOperatorEmitter(EmitterBase): r"""Generates solutions with a genetic operator and gradient arborescence. This emitter is from `Fontaine 2021 <https://arxiv.org/abs/2106.03894>`_. It proceeds in two stages. The first stage samples a batch of intermediate solutions from the archive and (optionally) applies Gaussian perturbation with zero mean and fixed standard deviation ``sigma``. If the archive is empty and no initial solutions are provided, the sampled solutions will be from a Gaussian centered at ``x0``. Optionally, an additional operator, Iso+LineDD (`Vassiliades 2018 <https://arxiv.org/abs/1804.03906>`_), can be applied to the intermediate solutions in the first stage by setting ``operator_type='iso_line_dd'``. The operator samples an additional batch of archive solutions to form a line in parameter space starting from the intermediate solutions. A zero-mean Gaussian interpolation along the line is then applied to the intermediate solutions, with standard deviation ``line_sigma``. The second stage creates new solutions by branching from each of the intermediate solutions. It leverages the gradient information of the objective and measure functions, generating a new solution from each *solution point* :math:`\boldsymbol{\theta_i}` using *gradient arborescence*. The gradient coefficients :math:`\boldsymbol{c_i}` are drawn from a zero-centered Gaussian distribution with standard deviation ``sigma_g``. Note that the objective gradient coefficient is forced to be non-negative by taking its absolute value :math:`|c_{i,0}|`. Essentially, this means that the emitter samples coefficients :math:`\boldsymbol{c_i} \sim \mathcal{N}(\boldsymbol{0}, \boldsymbol{\sigma_g}I)` and creates new solutions :math:`\boldsymbol{\theta'_i}` by updating the intermediate solutions :math:`\boldsymbol{\theta_i}` from the first stage according to: .. math:: \boldsymbol{\theta'_i} \gets \boldsymbol{\theta_i} + |c_{i,0}| \boldsymbol{\nabla} f(\boldsymbol{\theta_i}) + \sum_{j=1}^k c_{i,j}\boldsymbol{\nabla}m_j(\boldsymbol{\theta_i}) Where :math:`k` is the number of measures, and :math:`\boldsymbol{\nabla} f(\boldsymbol{\theta})` and :math:`\boldsymbol{\nabla} m_j(\boldsymbol{\theta})` are the objective and measure gradients of the solution point :math:`\boldsymbol{\theta}`, respectively. Args: archive: Archive of solutions, e.g., :class:`ribs.archives.GridArchive`. sigma: Standard deviation of the Gaussian perturbation used to generate new solutions in ask_dqd(). Note we assume the Gaussian is diagonal, so if this argument is an array, it must be 1D. sigma_g: Step size used for gradient arborescence in ask(), branching from the parents generated by ask_dqd(). If measure gradients are used, this acts as the standard deviation of the Gaussian from which to sample the step size. Otherwise, this acts as the step size itself. initial_solutions: An (n, solution_dim) array of solutions to be used when the archive is empty. If this argument is None, then solutions will be sampled from a Gaussian distribution centered at ``x0`` with standard deviation ``sigma``. x0: Center of the Gaussian distribution from which to sample solutions when the archive is empty. line_sigma: the standard deviation of the line Gaussian for Iso+LineDD operator. measure_gradients: Signals if measure gradients will be used. normalize_grad: Whether gradients should be normalized before steps. epsilon: For numerical stability, we add a small epsilon when normalizing gradients in :meth:`tell_dqd` -- refer to the implementation `here <../_modules/ribs/emitters/_gradient_operator_emitter.html#GradientOperatorEmitter.tell_dqd>`_. Pass this parameter to configure that epsilon. operator_type: Either 'isotropic' or 'iso_line_dd' to mark the operator type for intermediate operations. Defaults to 'isotropic'. bounds: Bounds of the solution space. Pass None to indicate there are no bounds. Alternatively, pass an array-like to specify the bounds for each dim. Each element in this array-like can be None to indicate no bound, or a tuple of ``(lower_bound, upper_bound)``, where ``lower_bound`` or ``upper_bound`` may be None to indicate no bound. Unbounded upper bounds are set to +inf, and unbounded lower bounds are set to -inf. lower_bounds: Instead of specifying ``bounds``, ``lower_bounds`` and ``upper_bounds`` may be specified. This is useful if, for instance, solutions are multi-dimensional. Here, pass None to indicate there are no bounds (i.e., bounds are set to -inf), or pass an array specifying the lower bounds of the solution space. upper_bounds: Upper bounds of the solution space; see ``lower_bounds`` above. Pass None to indicate there are no bounds (i.e., bounds are set to inf). batch_size: Number of solutions to return in :meth:`ask`. seed: Value to seed the random number generator. Set to None to avoid a fixed seed. Raises: ValueError: There is an error in the bounds configuration. """ def __init__( self, archive: ArchiveBase, *, sigma: Float | ArrayLike, sigma_g: Float, initial_solutions: ArrayLike | None = None, x0: ArrayLike | None = None, line_sigma: Float = 0.0, measure_gradients: bool = False, normalize_grad: bool = False, epsilon: Float = 1e-8, operator_type: Literal["isotropic", "iso_line_dd"] = "isotropic", bounds: Collection[tuple[None | Float, None | Float]] | None = None, lower_bounds: ArrayLike | None = None, upper_bounds: ArrayLike | None = None, batch_size: Int = 64, seed: Int | None = None, ) -> None: EmitterBase.__init__( self, archive=archive, solution_dim=archive.solution_dim, bounds=bounds, lower_bounds=lower_bounds, upper_bounds=upper_bounds, ) self._x0 = None self._initial_solutions = None if x0 is None and initial_solutions is None: raise ValueError("Either x0 or initial_solutions must be provided.") if x0 is not None and initial_solutions is not None: raise ValueError("x0 and initial_solutions cannot both be provided.") if x0 is not None: self._x0 = np.array(x0, dtype=archive.dtypes["solution"]) check_shape(self._x0, "x0", archive.solution_dim, "archive.solution_dim") elif initial_solutions is not None: self._initial_solutions = np.asarray( initial_solutions, dtype=archive.dtypes["solution"] ) check_batch_shape( self._initial_solutions, "initial_solutions", archive.solution_dim, "archive.solution_dim", ) self._rng = np.random.default_rng(seed) # `sigma` can either be a scalar or a 1D array. self._sigma = np.asarray(sigma, dtype=archive.dtypes["solution"]) self._sigma_g = np.asarray(sigma_g, archive.dtypes["solution"]) self._line_sigma = line_sigma self._use_isolinedd = operator_type != "isotropic" self._measure_gradients = measure_gradients self._normalize_grad = normalize_grad self._epsilon = epsilon self._batch_size = batch_size self._jacobian_batch = None self._parents = None @property def x0(self) -> np.ndarray | None: """Initial Gaussian distribution center. Solutions are sampled from this distribution when the archive is empty (if :attr:`initial_solutions` is not set). """ return self._x0 @property def initial_solutions(self) -> np.ndarray | None: """Returned when the archive is empty (if :attr:`x0` is not set).""" return self._initial_solutions @property def sigma(self) -> Float | np.ndarray: """Standard deviation of the (diagonal) Gaussian distribution.""" return self._sigma @property def batch_size(self) -> Int: """Number of solutions to return in :meth:`ask`.""" return self._batch_size @property def batch_size_dqd(self) -> Int: """Number of solutions to return in :meth:`ask_dqd`.""" return self._batch_size @property def epsilon(self) -> Float: """Added for numerical stability when normalizing gradients in :meth:`tell_dqd`.""" return self._epsilon
[docs] def ask_dqd(self) -> np.ndarray: """Creates new solutions by sampling elites from the archive. If the archive is empty and initial_solutions is given, this method returns no solutions. Otherwise, this method will sample elites from the archive. **Call :meth:`ask_dqd` and :meth:`tell_dqd` (in this order) before calling :meth:`ask` and :meth:`tell`.** Returns: ``(batch_size, solution_dim)`` array -- contains ``batch_size`` new solutions to evaluate. """ if self.archive.empty and (self._initial_solutions is not None): return np.empty((0, self.archive.solution_dim)) if self.archive.empty: parents = np.expand_dims(self.x0, axis=0) else: parents = self.archive.sample_elites(self.batch_size)["solution"] if self._use_isolinedd: noise = self._rng.normal( loc=0.0, scale=self.sigma, size=(self.batch_size, self.solution_dim), ).astype(self.archive.dtypes["solution"]) directions = ( self.archive.sample_elites(self._batch_size)["solution"] - parents ) line_gaussian = self._rng.normal( loc=0.0, scale=self._line_sigma, size=(self._batch_size, 1), ).astype(self.archive.dtypes["solution"]) sol = parents + line_gaussian * directions + noise sol = np.clip(sol, self.lower_bounds, self.upper_bounds) else: noise = self._rng.normal( loc=0.0, scale=self.sigma, size=(self.batch_size, self.solution_dim), ).astype(self.archive.dtypes["solution"]) sol = parents + noise sol = np.clip(sol, self.lower_bounds, self.upper_bounds) self._parents = sol return self._parents
[docs] def ask(self) -> np.ndarray: """Samples new solutions from a gradient arborescence. The gradient arborescence is parameterized by a multivariate Gaussian distribution. If measure_gradients is used, the multivariate Gaussian is parameterized by sigma_g, and the arborescence coefficient is sampled from the multivariate Gaussian, with the objective coefficient always being non-negative. If measure_gradients is not used, the arborescence coefficient is just sigma_g itself. This method returns ``batch_size`` solutions by branching with gradient arborescence based on the solutions returned by :meth:`ask_dqd`. Returns: (:attr:`batch_size`, :attr:`solution_dim`) array -- a batch of new solutions to evaluate. Raises: RuntimeError: This method was called without first passing gradients with calls to ask_dqd() and tell_dqd(). """ if self.archive.empty and self._initial_solutions is not None: return self._initial_solutions if self._jacobian_batch is None: raise RuntimeError( "Please call ask_dqd() and tell_dqd() before calling ask()." ) if self._measure_gradients: noise = self._rng.normal( loc=0.0, scale=self._sigma_g, size=self._jacobian_batch.shape[:2], ) noise[:, 0] = np.abs( noise[:, 0] ) # obj coefficient forced to be non-negative noise = np.expand_dims(noise, axis=2) offsets = np.sum(self._jacobian_batch * noise, axis=1) sols = offsets + self._parents else: # Transform the Jacobian self._jacobian_batch = self._jacobian_batch[:, :1, :].squeeze(1) sols = self._parents + self._jacobian_batch * self._sigma_g return sols
[docs] def tell_dqd( self, solution: ArrayLike, objective: ArrayLike, measures: ArrayLike, jacobian: ArrayLike, add_info: BatchData, **fields: ArrayLike, ) -> None: """Gives the emitter results of evaluating solutions from ask_dqd(). Args: solution: (batch_size, :attr:`solution_dim`) array of solutions generated by this emitter's :meth:`ask()` method. objective: 1D array containing the objective function value of each solution. measures: (batch_size, measure space dimension) array with the measure space coordinates of each solution. jacobian: (batch_size, 1 + measure_dim, solution_dim) array consisting of Jacobian matrices of the solutions obtained from :meth:`ask_dqd`. Each matrix should consist of the objective gradient of the solution followed by the measure gradients. add_info: Data returned from the archive :meth:`~ribs.archives.ArchiveBase.add` method. fields: Additional data for each solution. Each argument should be an array with batch_size as the first dimension. """ _, add_info, jacobian = validate_batch( self.archive, { "solution": solution, "objective": objective, "measures": measures, **fields, }, add_info, jacobian, ) # normalize gradients + set jacobian # jacobian is obtained from evaluating solutions of ask_dqd() if self._normalize_grad: norms = np.linalg.norm(jacobian, axis=2, keepdims=True) + self._epsilon jacobian /= norms self._jacobian_batch = jacobian