"""Implementation of OpenAI ES that can be used across various emitters.
See here for more info: https://arxiv.org/abs/1703.03864
"""
import warnings
import numpy as np
from ribs._utils import readonly
from ribs.emitters.opt._adam_opt import AdamOpt
from ribs.emitters.opt._evolution_strategy_base import (
BOUNDS_SAMPLING_THRESHOLD, BOUNDS_WARNING, EvolutionStrategyBase)
[docs]class OpenAIEvolutionStrategy(EvolutionStrategyBase):
"""OpenAI-ES optimizer for use with emitters.
Refer to :class:`EvolutionStrategyBase` for usage instruction.
Args:
sigma0 (float): Initial step size.
batch_size (int): Number of solutions to evaluate at a time. If None, we
calculate a default batch size based on solution_dim.
solution_dim (int): Size of the solution space.
seed (int): Seed for the random number generator.
dtype (str or data-type): Data type of solutions.
lower_bounds (float or np.ndarray): scalar or (solution_dim,) array
indicating lower bounds of the solution space. Scalars specify
the same bound for the entire space, while arrays specify a
bound for each dimension. Pass -np.inf in the array or scalar to
indicated unbounded space.
upper_bounds (float or np.ndarray): Same as above, but for upper
bounds (and pass np.inf instead of -np.inf).
mirror_sampling (bool): Whether to use mirror sampling when gathering
solutions. Defaults to True.
adam_kwargs (dict): Keyword arguments passed to :class:`AdamOpt`.
"""
def __init__( # pylint: disable = super-init-not-called
self,
sigma0,
solution_dim,
batch_size=None,
seed=None,
dtype=np.float64,
lower_bounds=-np.inf,
upper_bounds=np.inf,
mirror_sampling=True,
**adam_kwargs):
self.batch_size = (4 + int(3 * np.log(solution_dim))
if batch_size is None else batch_size)
self.sigma0 = sigma0
self.solution_dim = solution_dim
self.dtype = dtype
# Even scalars must be converted into 0-dim arrays so that they work
# with the bound check in numba.
self.lower_bounds = np.asarray(lower_bounds, dtype=self.dtype)
self.upper_bounds = np.asarray(upper_bounds, dtype=self.dtype)
self._rng = np.random.default_rng(seed)
self._solutions = None
if mirror_sampling and not (np.all(lower_bounds == -np.inf) and
np.all(upper_bounds == np.inf)):
raise ValueError("Bounds are currently not supported when using "
"mirror_sampling in OpenAI-ES; see "
"OpenAIEvolutionStrategy.ask() for more info.")
self.mirror_sampling = mirror_sampling
# Default batch size should be an even number for mirror sampling.
if batch_size is None and self.batch_size % 2 != 0:
self.batch_size += 1
if self.batch_size <= 1:
raise ValueError("Batch size of 1 is not supported because rank"
" normalization does not work with batch size of"
" 1.")
if self.mirror_sampling and self.batch_size % 2 != 0:
raise ValueError("If using mirror sampling, batch_size must be an"
" even number.")
# Strategy-specific params -> initialized in reset().
self.adam_opt = AdamOpt(self.solution_dim, **adam_kwargs)
self.last_update_ratio = None
self.noise = None
[docs] def reset(self, x0):
self.adam_opt.reset(x0)
self.last_update_ratio = np.inf # Updated at end of tell().
self.noise = None # Becomes (batch_size, solution_dim) array in ask().
[docs] def check_stop(self, ranking_values):
if self.last_update_ratio < 1e-9:
return True
# Fitness is too flat (only applies if there are at least 2 parents).
# NOTE: We use norm here because we may have multiple ranking values.
if (len(ranking_values) >= 2 and
np.linalg.norm(ranking_values[0] - ranking_values[-1]) < 1e-12):
return True
return False
[docs] def ask(self, batch_size=None):
if batch_size is None:
batch_size = self.batch_size
self._solutions = np.empty((batch_size, self.solution_dim),
dtype=self.dtype)
# Resampling method for bound constraints -> sample new solutions until
# all solutions are within bounds.
remaining_indices = np.arange(batch_size)
sampling_itrs = 0
while len(remaining_indices) > 0:
if self.mirror_sampling:
# Note that we sample batch_size // 2 here rather than
# accounting for len(remaining_indices). This is because we
# assume we only run this loop once when mirror_sampling is
# True. It is unclear how to do bounds handling when mirror
# sampling is involved since the two entries need to be
# mirrored. For instance, should we throw out both solutions if
# one is out of bounds?
noise_half = self._rng.standard_normal(
(batch_size // 2, self.solution_dim), dtype=self.dtype)
self.noise = np.concatenate((noise_half, -noise_half))
else:
self.noise = self._rng.standard_normal(
(len(remaining_indices), self.solution_dim),
dtype=self.dtype)
new_solutions = (self.adam_opt.theta[None] +
self.sigma0 * self.noise)
out_of_bounds = np.logical_or(
new_solutions < np.expand_dims(self.lower_bounds, axis=0),
new_solutions > np.expand_dims(self.upper_bounds, axis=0),
)
self._solutions[remaining_indices] = new_solutions
# Find indices in remaining_indices that are still out of bounds
# (out_of_bounds indicates whether each value in each solution is
# out of bounds).
remaining_indices = remaining_indices[np.any(out_of_bounds, axis=1)]
# Warn if we have resampled too many times.
sampling_itrs += 1
if sampling_itrs > BOUNDS_SAMPLING_THRESHOLD:
warnings.warn(BOUNDS_WARNING)
return readonly(self._solutions)
[docs] def tell(self, ranking_indices, ranking_values, num_parents):
# Indices come in decreasing order, so we reverse to get them to
# increasing order.
ranks = np.empty(self.batch_size, dtype=np.int32)
# Assign ranks -- ranks[i] tells the rank of noise[i].
ranks[ranking_indices[::-1]] = np.arange(self.batch_size)
# Normalize ranks to [-0.5, 0.5].
ranks = (ranks / (self.batch_size - 1)) - 0.5
# Compute the gradient.
if self.mirror_sampling:
half_batch = self.batch_size // 2
gradient = np.sum(
self.noise[:half_batch] *
(ranks[:half_batch] - ranks[half_batch:])[:, None],
axis=0)
gradient /= half_batch * self.sigma0
else:
gradient = np.sum(self.noise * ranks[:, None], axis=0)
gradient /= self.batch_size * self.sigma0
# Used to compute last update ratio.
theta_prev = self.adam_opt.theta.copy()
self.adam_opt.step(gradient)
self.last_update_ratio = (
np.linalg.norm(self.adam_opt.theta - theta_prev) /
np.linalg.norm(self.adam_opt.theta))