Source code for ribs.emitters.opt._pycma_es

"""ES that wraps pycma."""

from __future__ import annotations

import numpy as np
from numpy.typing import DTypeLike
from threadpoolctl import threadpool_limits

from ribs._utils import arr_readonly
from ribs.emitters.opt._evolution_strategy_base import EvolutionStrategyBase
from ribs.typing import Float, Int


[docs] class PyCMAEvolutionStrategy(EvolutionStrategyBase): """Wrapper around the pycma :class:`~cma.evolution_strategy.CMAEvolutionStrategy`. .. note:: This optimizer requires the `pycma <https://github.com/CMA-ES/pycma>`_ package, which can be installed with ``pip install cma`` or ``conda install cma``. Args: sigma0: Initial step size. batch_size: Number of solutions to evaluate at a time. This is passed directly as ``popsize`` in ``opts``. solution_dim: Size of the solution space. seed: Seed for the random number generator. dtype: Data type of solutions. lower_bounds: scalar or (solution_dim,) array indicating lower bounds of the solution space. Scalars specify the same bound for the entire space, while arrays specify a bound for each dimension. Pass -np.inf in the array or scalar to indicated unbounded space. upper_bounds: Same as above, but for upper bounds (and pass np.inf instead of -np.inf). opts: Additional options for pycma. Note that ``popsize``, ``bounds``, ``randn``, and ``seed`` are overwritten by us and thus should not be provided in this dict. We also make ``verbose`` default to -9, but you can also pass in a custom value here. """ def __init__( self, sigma0: Float, solution_dim: Int, batch_size: int | None = None, seed: Int | None = None, dtype: DTypeLike = np.float64, lower_bounds: Float | np.ndarray | None = None, upper_bounds: Float | np.ndarray | None = None, opts: dict | None = None, ) -> None: self.sigma0 = sigma0 self.solution_dim = solution_dim self.dtype = dtype self._solutions = None self._es = None self._opts = opts or {} self._opts["popsize"] = batch_size self._opts["bounds"] = [lower_bounds, upper_bounds] # Default CMAEvolutionStrategy uses global seed. To avoid this, we must # provide a randn function tied to our rng -- see: # https://github.com/CMA-ES/pycma/issues/221 self._rng = np.random.default_rng(seed) self._opts["randn"] = lambda batch_size, n: self._rng.standard_normal( (batch_size, n) ) self._opts["seed"] = np.nan self._opts.setdefault("verbose", -9) @property def batch_size(self) -> Int: """Number of solutions per iteration. Only valid after a call to :meth:`reset`. """ return self._es.popsize
[docs] def reset(self, x0: np.ndarray) -> None: """Resets the optimizer to start at x0. Args: x0: Initial mean. """ try: # We do not want to import at the top because that would require cma to # always be installed, as cma would be imported whenever this class is # imported. import cma # pylint: disable = import-outside-toplevel except ImportError as e: raise ImportError( "pycma must be installed -- please run `pip install cma` or " "`conda install cma`" ) from e else: self._es = cma.CMAEvolutionStrategy(x0, self.sigma0, self._opts)
[docs] def check_stop(self, ranking_values: np.ndarray) -> bool: """Checks if the optimization should stop and be reset. Args: ranking_values: Not used. Returns: Output of cma.CMAEvolutionStrategy.stop """ # Convert (batch_size, 1) array into (batch_size,). if ranking_values.ndim == 2 and ranking_values.shape[1] == 1: ranking_values = ranking_values[:, 0] # If ranking_values are scalar, we use pycma's termination criterion # directly. if ranking_values.ndim == 1: return self._es.stop() # Otherwise, we check our own criteria like in CMAEvolutionStrategy, # since the objectives passed into tell() were fictitious and thus we # need to check ranking_values on our own. # Tolerances from pycma CMA-ES. if self._es.condition_number > 1e14: return True # Area of distribution too small. area = self._es.sigma * np.sqrt(np.max(self._es.sm.eigenspectrum)) if area < 1e-11: return True # Fitness is too flat (only applies if there are at least 2 parents). # NOTE: We use norm here because we may have multiple ranking values. if ( # noqa: SIM103 len(ranking_values) >= 2 and np.linalg.norm(ranking_values[0] - ranking_values[-1]) < 1e-12 ): return True return False
# Limit OpenBLAS to single thread. This is typically faster than # multithreading because our data is too small.
[docs] @threadpool_limits.wrap(limits=1, user_api="blas") def ask(self, batch_size: Int | None = None) -> np.ndarray: """Samples new solutions from the Gaussian distribution. Args: batch_size: batch size of the sample. Defaults to ``self.batch_size``. """ # batch_size defaults to popsize in CMA-ES. self._solutions = np.asarray(self._es.ask(batch_size)) return arr_readonly(self._solutions.astype(self.dtype))
# Limit OpenBLAS to single thread. This is typically faster than # multithreading because our data is too small.
[docs] @threadpool_limits.wrap(limits=1, user_api="blas") def tell( self, ranking_indices: np.ndarray, ranking_values: np.ndarray, num_parents: Int ) -> None: # Convert (batch_size, 1) array into (batch_size,). if ranking_values.ndim == 2 and ranking_values.shape[1] == 1: ranking_values = ranking_values[:, 0] if ranking_values.ndim == 1: # Directly tell values to ES since these are just 1D. The # ranking_values are presented with higher being better, but CMA-ES # minimizes, so we need to invert all values. self._es.tell(self._solutions, -ranking_values) else: # This case is when ranking_values is 2D. In this case, we cannot # tell the values to the ES because the ES expects scalars. Thus, we # make up our own scalars, which is fine because CMA-ES only looks # at the ranks of the solutions during search. However, it does look # at values in the termination criterion, hence we use custom # conditions in check_stop. self._es.tell( self._solutions[ranking_indices], np.arange(len(ranking_indices)) )