"""Provides the GradientOperatorEmitter."""
from __future__ import annotations
from collections.abc import Collection
from typing import Literal
import numpy as np
from numpy.typing import ArrayLike
from ribs._utils import check_batch_shape, check_shape, validate_batch
from ribs.archives import ArchiveBase
from ribs.emitters._emitter_base import EmitterBase
from ribs.typing import BatchData, Float, Int
[docs]
class GradientOperatorEmitter(EmitterBase):
r"""Generates solutions with a genetic operator and gradient arborescence.
This emitter is from `Fontaine 2021 <https://arxiv.org/abs/2106.03894>`_. It
proceeds in two stages. The first stage samples a batch of intermediate solutions
from the archive and (optionally) applies Gaussian perturbation with zero mean and
fixed standard deviation ``sigma``. If the archive is empty and no initial solutions
are provided, the sampled solutions will be from a Gaussian centered at ``x0``.
Optionally, an additional operator, Iso+LineDD (`Vassiliades 2018
<https://arxiv.org/abs/1804.03906>`_), can be applied to the intermediate solutions
in the first stage by setting ``operator_type='iso_line_dd'``. The operator samples
an additional batch of archive solutions to form a line in parameter space starting
from the intermediate solutions. A zero-mean Gaussian interpolation along the line
is then applied to the intermediate solutions, with standard deviation
``line_sigma``.
The second stage creates new solutions by branching from each of the intermediate
solutions. It leverages the gradient information of the objective and measure
functions, generating a new solution from each *solution point*
:math:`\boldsymbol{\theta_i}` using *gradient arborescence*. The gradient
coefficients :math:`\boldsymbol{c_i}` are drawn from a zero-centered Gaussian
distribution with standard deviation ``sigma_g``. Note that the objective gradient
coefficient is forced to be non-negative by taking its absolute value
:math:`|c_{i,0}|`.
Essentially, this means that the emitter samples coefficients
:math:`\boldsymbol{c_i} \sim \mathcal{N}(\boldsymbol{0}, \boldsymbol{\sigma_g}I)`
and creates new solutions :math:`\boldsymbol{\theta'_i}` by updating the
intermediate solutions :math:`\boldsymbol{\theta_i}` from the first stage according
to:
.. math::
\boldsymbol{\theta'_i} \gets \boldsymbol{\theta_i} +
|c_{i,0}| \boldsymbol{\nabla} f(\boldsymbol{\theta_i}) +
\sum_{j=1}^k c_{i,j}\boldsymbol{\nabla}m_j(\boldsymbol{\theta_i})
Where :math:`k` is the number of measures, and :math:`\boldsymbol{\nabla}
f(\boldsymbol{\theta})` and :math:`\boldsymbol{\nabla} m_j(\boldsymbol{\theta})` are
the objective and measure gradients of the solution point
:math:`\boldsymbol{\theta}`, respectively.
Args:
archive: Archive of solutions, e.g., :class:`ribs.archives.GridArchive`.
sigma: Standard deviation of the Gaussian perturbation used to generate new
solutions in ask_dqd(). Note we assume the Gaussian is diagonal, so if this
argument is an array, it must be 1D.
sigma_g: Step size used for gradient arborescence in ask(), branching from the
parents generated by ask_dqd(). If measure gradients are used, this acts as
the standard deviation of the Gaussian from which to sample the step size.
Otherwise, this acts as the step size itself.
initial_solutions: An (n, solution_dim) array of solutions to be used when the
archive is empty. If this argument is None, then solutions will be sampled
from a Gaussian distribution centered at ``x0`` with standard deviation
``sigma``.
x0: Center of the Gaussian distribution from which to sample solutions when the
archive is empty.
line_sigma: the standard deviation of the line Gaussian for Iso+LineDD operator.
measure_gradients: Signals if measure gradients will be used.
normalize_grad: Whether gradients should be normalized before steps.
epsilon: For numerical stability, we add a small epsilon when normalizing
gradients in :meth:`tell_dqd` -- refer to the implementation `here
<../_modules/ribs/emitters/_gradient_operator_emitter.html#GradientOperatorEmitter.tell_dqd>`_.
Pass this parameter to configure that epsilon.
operator_type: Either 'isotropic' or 'iso_line_dd' to mark the operator type for
intermediate operations. Defaults to 'isotropic'.
bounds: Bounds of the solution space. Pass None to indicate there are no bounds.
Alternatively, pass an array-like to specify the bounds for each dim. Each
element in this array-like can be None to indicate no bound, or a tuple of
``(lower_bound, upper_bound)``, where ``lower_bound`` or ``upper_bound`` may
be None to indicate no bound. Unbounded upper bounds are set to +inf, and
unbounded lower bounds are set to -inf.
lower_bounds: Instead of specifying ``bounds``, ``lower_bounds`` and
``upper_bounds`` may be specified. This is useful if, for instance,
solutions are multi-dimensional. Here, pass None to indicate there are no
bounds (i.e., bounds are set to -inf), or pass an array specifying the lower
bounds of the solution space.
upper_bounds: Upper bounds of the solution space; see ``lower_bounds`` above.
Pass None to indicate there are no bounds (i.e., bounds are set to inf).
batch_size: Number of solutions to return in :meth:`ask`.
seed: Value to seed the random number generator. Set to None to avoid a fixed
seed.
Raises:
ValueError: There is an error in the bounds configuration.
"""
def __init__(
self,
archive: ArchiveBase,
*,
sigma: Float | ArrayLike,
sigma_g: Float,
initial_solutions: ArrayLike | None = None,
x0: ArrayLike | None = None,
line_sigma: Float = 0.0,
measure_gradients: bool = False,
normalize_grad: bool = False,
epsilon: Float = 1e-8,
operator_type: Literal["isotropic", "iso_line_dd"] = "isotropic",
bounds: Collection[tuple[None | Float, None | Float]] | None = None,
lower_bounds: ArrayLike | None = None,
upper_bounds: ArrayLike | None = None,
batch_size: Int = 64,
seed: Int | None = None,
) -> None:
EmitterBase.__init__(
self,
archive=archive,
solution_dim=archive.solution_dim,
bounds=bounds,
lower_bounds=lower_bounds,
upper_bounds=upper_bounds,
)
self._x0 = None
self._initial_solutions = None
if x0 is None and initial_solutions is None:
raise ValueError("Either x0 or initial_solutions must be provided.")
if x0 is not None and initial_solutions is not None:
raise ValueError("x0 and initial_solutions cannot both be provided.")
if x0 is not None:
self._x0 = np.array(x0, dtype=archive.dtypes["solution"])
check_shape(self._x0, "x0", archive.solution_dim, "archive.solution_dim")
elif initial_solutions is not None:
self._initial_solutions = np.asarray(
initial_solutions, dtype=archive.dtypes["solution"]
)
check_batch_shape(
self._initial_solutions,
"initial_solutions",
archive.solution_dim,
"archive.solution_dim",
)
self._rng = np.random.default_rng(seed)
# `sigma` can either be a scalar or a 1D array.
self._sigma = np.asarray(sigma, dtype=archive.dtypes["solution"])
self._sigma_g = np.asarray(sigma_g, archive.dtypes["solution"])
self._line_sigma = line_sigma
self._use_isolinedd = operator_type != "isotropic"
self._measure_gradients = measure_gradients
self._normalize_grad = normalize_grad
self._epsilon = epsilon
self._batch_size = batch_size
self._jacobian_batch = None
self._parents = None
@property
def x0(self) -> np.ndarray | None:
"""Initial Gaussian distribution center.
Solutions are sampled from this distribution when the archive is empty (if
:attr:`initial_solutions` is not set).
"""
return self._x0
@property
def initial_solutions(self) -> np.ndarray | None:
"""Returned when the archive is empty (if :attr:`x0` is not set)."""
return self._initial_solutions
@property
def sigma(self) -> Float | np.ndarray:
"""Standard deviation of the (diagonal) Gaussian distribution."""
return self._sigma
@property
def batch_size(self) -> Int:
"""Number of solutions to return in :meth:`ask`."""
return self._batch_size
@property
def batch_size_dqd(self) -> Int:
"""Number of solutions to return in :meth:`ask_dqd`."""
return self._batch_size
@property
def epsilon(self) -> Float:
"""Added for numerical stability when normalizing gradients in :meth:`tell_dqd`."""
return self._epsilon
[docs]
def ask_dqd(self) -> np.ndarray:
"""Creates new solutions by sampling elites from the archive.
If the archive is empty and initial_solutions is given, this method returns no
solutions. Otherwise, this method will sample elites from the archive.
**Call :meth:`ask_dqd` and :meth:`tell_dqd` (in this order) before calling
:meth:`ask` and :meth:`tell`.**
Returns:
``(batch_size, solution_dim)`` array -- contains ``batch_size`` new
solutions to evaluate.
"""
if self.archive.empty and (self._initial_solutions is not None):
return np.empty((0, self.archive.solution_dim))
if self.archive.empty:
parents = np.expand_dims(self.x0, axis=0)
else:
parents = self.archive.sample_elites(self.batch_size)["solution"]
if self._use_isolinedd:
noise = self._rng.normal(
loc=0.0,
scale=self.sigma,
size=(self.batch_size, self.solution_dim),
).astype(self.archive.dtypes["solution"])
directions = (
self.archive.sample_elites(self._batch_size)["solution"] - parents
)
line_gaussian = self._rng.normal(
loc=0.0,
scale=self._line_sigma,
size=(self._batch_size, 1),
).astype(self.archive.dtypes["solution"])
sol = parents + line_gaussian * directions + noise
sol = np.clip(sol, self.lower_bounds, self.upper_bounds)
else:
noise = self._rng.normal(
loc=0.0,
scale=self.sigma,
size=(self.batch_size, self.solution_dim),
).astype(self.archive.dtypes["solution"])
sol = parents + noise
sol = np.clip(sol, self.lower_bounds, self.upper_bounds)
self._parents = sol
return self._parents
[docs]
def ask(self) -> np.ndarray:
"""Samples new solutions from a gradient arborescence.
The gradient arborescence is parameterized by a multivariate Gaussian
distribution.
If measure_gradients is used, the multivariate Gaussian is parameterized by
sigma_g, and the arborescence coefficient is sampled from the multivariate
Gaussian, with the objective coefficient always being non-negative. If
measure_gradients is not used, the arborescence coefficient is just sigma_g
itself.
This method returns ``batch_size`` solutions by branching with gradient
arborescence based on the solutions returned by :meth:`ask_dqd`.
Returns:
(:attr:`batch_size`, :attr:`solution_dim`) array -- a batch of new solutions
to evaluate.
Raises:
RuntimeError: This method was called without first passing gradients with
calls to ask_dqd() and tell_dqd().
"""
if self.archive.empty and self._initial_solutions is not None:
return self._initial_solutions
if self._jacobian_batch is None:
raise RuntimeError(
"Please call ask_dqd() and tell_dqd() before calling ask()."
)
if self._measure_gradients:
noise = self._rng.normal(
loc=0.0,
scale=self._sigma_g,
size=self._jacobian_batch.shape[:2],
)
noise[:, 0] = np.abs(
noise[:, 0]
) # obj coefficient forced to be non-negative
noise = np.expand_dims(noise, axis=2)
offsets = np.sum(self._jacobian_batch * noise, axis=1)
sols = offsets + self._parents
else:
# Transform the Jacobian
self._jacobian_batch = self._jacobian_batch[:, :1, :].squeeze(1)
sols = self._parents + self._jacobian_batch * self._sigma_g
return sols
[docs]
def tell_dqd(
self,
solution: ArrayLike,
objective: ArrayLike,
measures: ArrayLike,
jacobian: ArrayLike,
add_info: BatchData,
**fields: ArrayLike,
) -> None:
"""Gives the emitter results of evaluating solutions from ask_dqd().
Args:
solution: (batch_size, :attr:`solution_dim`) array of solutions generated by
this emitter's :meth:`ask()` method.
objective: 1D array containing the objective function value of each
solution.
measures: (batch_size, measure space dimension) array with the measure space
coordinates of each solution.
jacobian: (batch_size, 1 + measure_dim, solution_dim) array consisting of
Jacobian matrices of the solutions obtained from :meth:`ask_dqd`. Each
matrix should consist of the objective gradient of the solution followed
by the measure gradients.
add_info: Data returned from the archive
:meth:`~ribs.archives.ArchiveBase.add` method.
fields: Additional data for each solution. Each argument should be an array
with batch_size as the first dimension.
"""
_, add_info, jacobian = validate_batch(
self.archive,
{
"solution": solution,
"objective": objective,
"measures": measures,
**fields,
},
add_info,
jacobian,
)
# normalize gradients + set jacobian
# jacobian is obtained from evaluating solutions of ask_dqd()
if self._normalize_grad:
norms = np.linalg.norm(jacobian, axis=2, keepdims=True) + self._epsilon
jacobian /= norms
self._jacobian_batch = jacobian