"""Provides the GradientOperatorEmitter."""
import numbers
import numpy as np
from ribs._utils import (check_batch_shape, check_shape, np_scalar,
validate_batch)
from ribs.emitters._emitter_base import EmitterBase
[docs]class GradientOperatorEmitter(EmitterBase):
"""Generates solutions by first applying a genetic operator, then applying a
gradient arborescence with coefficients parameterized by a fixed Gaussian.
This emitter is from `Fontaine 2021 <https://arxiv.org/abs/2106.03894>`_.
It proceeds in two stages. The first stage samples a batch of intermediate
solutions from the archive and (optionally) applies Gaussian perturbation
with zero mean and fixed standard deviation ``sigma``. If the archive is
empty and no initial solutions are provided, the sampled solutions will be
from a Gaussian centered at ``x0``.
Optionally, an additional operator, Iso+LineDD
(`Vassiliades 2018 <https://arxiv.org/abs/1804.03906>`_), can be applied to
the intermediate solutions in the first stage by setting
``operator_type='iso_line_dd'``.
The operator samples an additional batch of archive solutions to form a
line in parameter space starting from the intermediate solutions.
A zero-mean Gaussian interpolation along the line is then applied to the
intermediate solutions, with standard deviation ``line_sigma``.
The second stage creates new solutions by branching from each of the
intermediate solutions. It leverages the gradient information of the
objective and measure functions, generating a new solution from each
*solution point* :math:`\\boldsymbol{\\theta_i}` using *gradient
arborescence*. The gradient coefficients :math:`\\boldsymbol{c_i}` are drawn
from a zero-centered Gaussian distribution with standard deviation
``sigma_g``. Note that the objective gradient coefficient is forced to be
non-negative by taking its absolute value :math:`|c_{i,0}|`.
Essentially, this means that the emitter samples coefficients
:math:`\\boldsymbol{c_i} \\sim
\\mathcal{N}(\\boldsymbol{0}, \\boldsymbol{\\sigma_g}I)`
and creates new solutions :math:`\\boldsymbol{\\theta'_i}` by updating the
intermediate solutions :math:`\\boldsymbol{\\theta_i}` from the first stage
according to:
.. math::
\\boldsymbol{\\theta'_i} \\gets \\boldsymbol{\\theta_i} +
|c_{i,0}| \\boldsymbol{\\nabla} f(\\boldsymbol{\\theta_i}) +
\\sum_{j=1}^k c_{i,j}\\boldsymbol{\\nabla}m_j(\\boldsymbol{\\theta_i})
Where :math:`k` is the number of measures, and
:math:`\\boldsymbol{\\nabla} f(\\boldsymbol{\\theta})` and
:math:`\\boldsymbol{\\nabla} m_j(\\boldsymbol{\\theta})` are the objective
and measure gradients of the solution point :math:`\\boldsymbol{\\theta}`,
respectively.
Args:
archive (ribs.archives.ArchiveBase): An archive to use when creating and
inserting solutions. For instance, this can be
:class:`ribs.archives.GridArchive`.
sigma (float or array-like): Standard deviation of the Gaussian
perturbation used to generate new solutions in ask_dqd().
Note we assume the Gaussian is diagonal, so if this argument is
an array, it must be 1D.
sigma_g (float): Step size used for gradient arborescence in ask(),
branching from the parents generated by ask_dqd(). If measure
gradients are used, this acts as the standard deviation of the
Gaussian from which to sample the step size. Otherwise, this acts as
the step size itself.
initial_solutions (array-like): An (n, solution_dim) array of solutions
to be used when the archive is empty. If this argument is None, then
solutions will be sampled from a Gaussian distribution centered at
``x0`` with standard deviation ``sigma``.
x0 (array-like): Center of the Gaussian distribution from which to
sample solutions when the archive is empty.
line_sigma (float): the standard deviation of the line Gaussian for
Iso+LineDD operator.
measure_gradients (bool): Signals if measure gradients will be used.
normalize_grad (bool): Whether gradients should be normalized
before steps.
epsilon (float): For numerical stability, we add a small epsilon when
normalizing gradients in :meth:`tell_dqd` -- refer to the
implementation `here
<../_modules/ribs/emitters/_gradient_operator_emitter.html#GradientOperatorEmitter.tell_dqd>`_.
Pass this parameter to configure that epsilon.
operator_type (str): Either 'isotropic' or 'iso_line_dd' to mark the
operator type for intermediate operations. Defaults to 'isotropic'.
bounds (None or array-like): Bounds of the solution space. Solutions are
clipped to these bounds. Pass None to indicate there are no bounds.
Alternatively, pass an array-like to specify the bounds for each
dim. Each element in this array-like can be None to indicate no
bound, or a tuple of ``(lower_bound, upper_bound)``, where
``lower_bound`` or ``upper_bound`` may be None to indicate no bound.
batch_size (int): Number of solutions to return in :meth:`ask`.
seed (int): Value to seed the random number generator. Set to None to
avoid a fixed seed.
Raises:
ValueError: There is an error in the bounds configuration.
"""
def __init__(self,
archive,
sigma,
sigma_g,
initial_solutions=None,
x0=None,
line_sigma=0.0,
measure_gradients=False,
normalize_grad=False,
epsilon=1e-8,
operator_type='isotropic',
bounds=None,
batch_size=64,
seed=None):
EmitterBase.__init__(
self,
archive=archive,
solution_dim=archive.solution_dim,
bounds=bounds,
)
self._initial_solutions = None
self._x0 = None
if x0 is None and initial_solutions is None:
raise ValueError("Either x0 or initial_solutions must be provided.")
if x0 is not None and initial_solutions is not None:
raise ValueError(
"x0 and initial_solutions cannot both be provided.")
if x0 is not None:
self._x0 = np.array(x0, dtype=archive.dtypes["solution"])
check_shape(self._x0, "x0", archive.solution_dim,
"archive.solution_dim")
elif initial_solutions is not None:
self._initial_solutions = np.asarray(
initial_solutions, dtype=archive.dtypes["solution"])
check_batch_shape(self._initial_solutions, "initial_solutions",
archive.solution_dim, "archive.solution_dim")
self._rng = np.random.default_rng(seed)
self._sigma = np_scalar(sigma,
dtype=archive.dtypes["solution"]) if isinstance(
sigma, numbers.Real) else np.array(sigma)
self._sigma_g = np_scalar(sigma_g, dtype=archive.dtypes["solution"])
self._line_sigma = line_sigma
self._use_isolinedd = operator_type != 'isotropic'
self._measure_gradients = measure_gradients
self._normalize_grad = normalize_grad
self._epsilon = epsilon
self._batch_size = batch_size
self._jacobian_batch = None
self._parents = None
@property
def initial_solutions(self):
"""numpy.ndarray: The initial solutions which are returned when the
archive is empty (if x0 is not set)."""
return self._initial_solutions
@property
def x0(self):
"""numpy.ndarray: Center of the Gaussian distribution from which to
sample solutions when the archive is empty (if initial_solutions is not
set)."""
return self._x0
@property
def sigma(self):
"""float or numpy.ndarray: Standard deviation of the (diagonal) Gaussian
distribution."""
return self._sigma
@property
def batch_size(self):
"""int: Number of solutions to return in :meth:`ask`."""
return self._batch_size
@property
def batch_size_dqd(self):
"""int: Number of solutions to return in :meth:`ask_dqd`."""
return self._batch_size
@property
def epsilon(self):
"""int: The epsilon added for numerical stability when normalizing
gradients in :meth:`tell_dqd`."""
return self._epsilon
[docs] def ask_dqd(self):
"""Create new solutions by sampling elites from the archive with
(optional) Gaussian perturbation.
If the archive is empty and initial_solutions is given, this method
returns no solutions. Otherwise, this method will sample elites
from the archive.
**Call :meth:`ask_dqd` and :meth:`tell_dqd` (in this order) before
calling :meth:`ask` and :meth:`tell`.**
Returns:
``(batch_size, solution_dim)`` array -- contains ``batch_size`` new
solutions to evaluate.
"""
if self.archive.empty and (self._initial_solutions is not None):
return np.empty((0, self.archive.solution_dim))
if self.archive.empty:
parents = np.expand_dims(self.x0, axis=0)
else:
parents = self.archive.sample_elites(self.batch_size)["solution"]
if self._use_isolinedd:
noise = self._rng.normal(
loc=0.0,
scale=self.sigma,
size=(self.batch_size, self.solution_dim),
).astype(self.archive.dtypes["solution"])
directions = self.archive.sample_elites(
self._batch_size)["solution"] - parents
line_gaussian = self._rng.normal(
loc=0.0,
scale=self._line_sigma,
size=(self._batch_size, 1),
).astype(self.archive.dtypes["solution"])
sol = parents + line_gaussian * directions + noise
sol = np.clip(sol, self.lower_bounds, self.upper_bounds)
else:
noise = self._rng.normal(
loc=0.0,
scale=self.sigma,
size=(self.batch_size, self.solution_dim),
).astype(self.archive.dtypes["solution"])
sol = parents + noise
sol = np.clip(sol, self.lower_bounds, self.upper_bounds)
self._parents = sol
return self._parents
[docs] def ask(self):
"""Samples new solutions from a gradient arborescence parameterized by a
multivariate Gaussian distribution.
If measure_gradients is used, the multivariate Gaussian is parameterized
by sigma_g, and the arboresecence coefficient is sampled from the
multivariate Gaussian, with the objective coefficient being always
non-negative. If measure_gradients is not used, the arboresecence
coefficient is just sigma_g itself.
This method returns ``batch_size`` solutions by branching
with gradient arborescence based on the solutions returned by
ask_dqd().
Returns:
(:attr:`batch_size`, :attr:`solution_dim`) array -- a batch of new
solutions to evaluate.
Raises:
RuntimeError: This method was called without first passing gradients
with calls to ask_dqd() and tell_dqd().
"""
if self.archive.empty and self._initial_solutions is not None:
return self._initial_solutions
if self._jacobian_batch is None:
raise RuntimeError("Please call ask_dqd() and tell_dqd() "
"before calling ask().")
if self._measure_gradients:
noise = self._rng.normal(
loc=0.0,
scale=self._sigma_g,
size=self._jacobian_batch.shape[:2],
)
noise[:, 0] = np.abs(
noise[:, 0]) # obj coefficient forced to be non-negative
noise = np.expand_dims(noise, axis=2)
offsets = np.sum(self._jacobian_batch * noise, axis=1)
sols = offsets + self._parents
else:
# Transform the Jacobian
self._jacobian_batch = self._jacobian_batch[:, :1, :].squeeze(1)
sols = self._parents + self._jacobian_batch * self._sigma_g
return sols
[docs] def tell_dqd(self, solution, objective, measures, jacobian, add_info,
**fields):
"""Gives the emitter results of evaluating solutions from ask_dqd().
Args:
solution (array-like): (batch_size, :attr:`solution_dim`) array of
solutions generated by this emitter's :meth:`ask()` method.
objective (array-like): 1D array containing the objective function
value of each solution.
measures (array-like): (batch_size, measure space dimension) array
with the measure space coordinates of each solution.
jacobian (array-like): (batch_size, 1 + measure_dim, solution_dim)
array consisting of Jacobian matrices of the solutions obtained
from :meth:`ask_dqd`. Each matrix should consist of the
objective gradient of the solution followed by the measure
gradients.
add_info (dict): Data returned from the archive
:meth:`~ribs.archives.ArchiveBase.add` method.
fields (keyword arguments): Additional data for each solution. Each
argument should be an array with batch_size as the first
dimension.
"""
data, add_info, jacobian = validate_batch( # pylint: disable = unused-variable
self.archive,
{
"solution": solution,
"objective": objective,
"measures": measures,
**fields,
},
add_info,
jacobian,
)
# normalize gradients + set jacobian
# jacobian is obtained from evaluating solutions of ask_dqd()
if self._normalize_grad:
norms = np.linalg.norm(jacobian, axis=2,
keepdims=True) + self._epsilon
jacobian /= norms
self._jacobian_batch = jacobian