Source code for ribs.emitters.opt._openai_es

"""Implementation of OpenAI ES that can be used across various emitters.

See here for more info: https://arxiv.org/abs/1703.03864
"""

from __future__ import annotations

import warnings

import numpy as np
from numpy.typing import DTypeLike
from typing_extensions import ParamSpec

from ribs._utils import arr_readonly
from ribs.emitters.opt._adam_opt import AdamOpt
from ribs.emitters.opt._evolution_strategy_base import (
    BOUNDS_SAMPLING_THRESHOLD,
    BOUNDS_WARNING,
    EvolutionStrategyBase,
)
from ribs.typing import Float, Int

P = ParamSpec("P")


[docs] class OpenAIEvolutionStrategy(EvolutionStrategyBase): """OpenAI-ES optimizer for use with emitters. Refer to :class:`EvolutionStrategyBase` for usage instruction. Args: sigma0: Initial step size. batch_size: Number of solutions to evaluate at a time. If None, we calculate a default batch size based on solution_dim. solution_dim: Size of the solution space. seed: Seed for the random number generator. dtype: Data type of solutions. lower_bounds: scalar or (solution_dim,) array indicating lower bounds of the solution space. Scalars specify the same bound for the entire space, while arrays specify a bound for each dimension. Pass -np.inf in the array or scalar to indicated unbounded space. upper_bounds: Same as above, but for upper bounds (and pass np.inf instead of -np.inf). mirror_sampling: Whether to use mirror sampling when gathering solutions. Defaults to True. adam_kwargs: Keyword arguments passed to :class:`AdamOpt`. """ def __init__( self, sigma0: Float, solution_dim: Int, batch_size: Int | None = None, seed: Int | None = None, dtype: DTypeLike = np.float64, lower_bounds: Float | np.ndarray = -np.inf, upper_bounds: Float | np.ndarray = np.inf, mirror_sampling: bool = True, **adam_kwargs: P.kwargs, ) -> None: self.batch_size = ( 4 + int(3 * np.log(solution_dim)) if batch_size is None else batch_size ) self.sigma0 = sigma0 self.solution_dim = solution_dim self.dtype = dtype # Even scalars must be converted into 0-dim arrays so that they work # with the bound check in numba. self.lower_bounds = np.asarray(lower_bounds, dtype=self.dtype) self.upper_bounds = np.asarray(upper_bounds, dtype=self.dtype) self._rng = np.random.default_rng(seed) self._solutions = None if mirror_sampling and not ( np.all(lower_bounds == -np.inf) and np.all(upper_bounds == np.inf) ): raise ValueError( "Bounds are currently not supported when using " "mirror_sampling in OpenAI-ES; see " "OpenAIEvolutionStrategy.ask() for more info." ) self.mirror_sampling = mirror_sampling # Default batch size should be an even number for mirror sampling. if batch_size is None and self.batch_size % 2 != 0: self.batch_size += 1 if self.batch_size <= 1: raise ValueError( "Batch size of 1 is not supported because rank" " normalization does not work with batch size of" " 1." ) if self.mirror_sampling and self.batch_size % 2 != 0: raise ValueError( "If using mirror sampling, batch_size must be an even number." ) # Strategy-specific params -> initialized in reset(). self.adam_opt = AdamOpt(self.solution_dim, **adam_kwargs) self.last_update_ratio = None self.noise = None
[docs] def reset(self, x0: np.ndarray) -> None: self.adam_opt.reset(x0) self.last_update_ratio = np.inf # Updated at end of tell(). self.noise = None # Becomes (batch_size, solution_dim) array in ask().
[docs] def check_stop(self, ranking_values: np.ndarray) -> bool: if self.last_update_ratio < 1e-9: return True # Fitness is too flat (only applies if there are at least 2 parents). # NOTE: We use norm here because we may have multiple ranking values. if ( # noqa: SIM103 len(ranking_values) >= 2 and np.linalg.norm(ranking_values[0] - ranking_values[-1]) < 1e-12 ): return True return False
[docs] def ask(self, batch_size: Int | None = None) -> np.ndarray: if batch_size is None: batch_size = self.batch_size self._solutions = np.empty((batch_size, self.solution_dim), dtype=self.dtype) # Resampling method for bound constraints -> sample new solutions until # all solutions are within bounds. remaining_indices = np.arange(batch_size) sampling_itrs = 0 while len(remaining_indices) > 0: if self.mirror_sampling: # Note that we sample batch_size // 2 here rather than # accounting for len(remaining_indices). This is because we # assume we only run this loop once when mirror_sampling is # True. It is unclear how to do bounds handling when mirror # sampling is involved since the two entries need to be # mirrored. For instance, should we throw out both solutions if # one is out of bounds? noise_half = self._rng.standard_normal( (batch_size // 2, self.solution_dim), dtype=self.dtype ) self.noise = np.concatenate((noise_half, -noise_half)) else: self.noise = self._rng.standard_normal( (len(remaining_indices), self.solution_dim), dtype=self.dtype ) new_solutions = self.adam_opt.theta[None] + self.sigma0 * self.noise out_of_bounds = np.logical_or( new_solutions < np.expand_dims(self.lower_bounds, axis=0), new_solutions > np.expand_dims(self.upper_bounds, axis=0), ) self._solutions[remaining_indices] = new_solutions # Find indices in remaining_indices that are still out of bounds # (out_of_bounds indicates whether each value in each solution is # out of bounds). remaining_indices = remaining_indices[np.any(out_of_bounds, axis=1)] # Warn if we have resampled too many times. sampling_itrs += 1 if sampling_itrs > BOUNDS_SAMPLING_THRESHOLD: warnings.warn(BOUNDS_WARNING, stacklevel=2) return arr_readonly(self._solutions)
[docs] def tell( self, ranking_indices: np.ndarray, ranking_values: np.ndarray, num_parents: Int ) -> None: # Indices come in decreasing order, so we reverse to get them to # increasing order. ranks = np.empty(self.batch_size, dtype=np.int32) # Assign ranks -- ranks[i] tells the rank of noise[i]. ranks[ranking_indices[::-1]] = np.arange(self.batch_size) # Normalize ranks to [-0.5, 0.5]. ranks = (ranks / (self.batch_size - 1)) - 0.5 # Compute the gradient. if self.mirror_sampling: half_batch = self.batch_size // 2 gradient = np.sum( self.noise[:half_batch] * (ranks[:half_batch] - ranks[half_batch:])[:, None], axis=0, ) gradient /= half_batch * self.sigma0 else: gradient = np.sum(self.noise * ranks[:, None], axis=0) gradient /= self.batch_size * self.sigma0 # Used to compute last update ratio. theta_prev = self.adam_opt.theta.copy() self.adam_opt.step(gradient) self.last_update_ratio = np.linalg.norm( self.adam_opt.theta - theta_prev ) / np.linalg.norm(self.adam_opt.theta)