Source code for ribs.emitters.opt._sep_cma_es

"""Implementation of sep-CMA-ES that can be used across various emitters.

Adapted from Nikolaus Hansen's pycma:
https://github.com/CMA-ES/pycma/blob/master/cma/purecma.py
"""

from __future__ import annotations

import warnings

import numba as nb
import numpy as np
from numpy.typing import DTypeLike

from ribs._utils import arr_readonly
from ribs.emitters.opt._evolution_strategy_base import (
    BOUNDS_SAMPLING_THRESHOLD,
    BOUNDS_WARNING,
    EvolutionStrategyBase,
)
from ribs.typing import Float, Int


class DiagonalMatrix:
    """Maintains a diagonal covariance matrix.

    Args:
        dimension: Size of the (square) covariance matrix.
        dtype: Data type of the matrix, typically np.float32 or np.float64.
    """

    def __init__(self, dimension: Int, dtype: DTypeLike) -> None:
        self.cov = np.ones((dimension,), dtype=dtype)
        self.dtype = dtype

        # The last evaluation on which the eigensystem was updated.
        self.updated_eval = 0

    @property
    def condition_number(self) -> np.floating:
        """Condition number of the covariance matrix."""
        return np.max(self.cov) / np.min(self.cov)

    @property
    def eigenvalues(self) -> np.ndarray:
        """Eigenvalues (equal to covariance matrix since it is diagonal)."""
        return self.cov

    @property
    def invsqrt(self) -> np.ndarray:
        """C^-1/2."""
        return 1 / np.sqrt(self.cov)


[docs] class SeparableCMAEvolutionStrategy(EvolutionStrategyBase): """sep-CMA-ES optimizer for use with emitters. Refer to :class:`EvolutionStrategyBase` for usage instruction. Args: sigma0: Initial step size. batch_size: Number of solutions to evaluate at a time. If None, we calculate a default batch size based on solution_dim. solution_dim: Size of the solution space. seed: Seed for the random number generator. dtype: Data type of solutions. lower_bounds: scalar or (solution_dim,) array indicating lower bounds of the solution space. Scalars specify the same bound for the entire space, while arrays specify a bound for each dimension. Pass -np.inf in the array or scalar to indicated unbounded space. upper_bounds: Same as above, but for upper bounds (and pass np.inf instead of -np.inf). """ def __init__( self, sigma0: Float, solution_dim: Int, batch_size: Int | None = None, seed: Int | None = None, dtype: DTypeLike = np.float64, lower_bounds: Float | np.ndarray = -np.inf, upper_bounds: Float | np.ndarray = np.inf, ) -> None: self.batch_size = ( 4 + int(3 * np.log(solution_dim)) if batch_size is None else batch_size ) self.sigma0 = sigma0 self.solution_dim = solution_dim self.dtype = dtype # Even scalars must be converted into 0-dim arrays so that they work # with the bound check in numba. self.lower_bounds = np.asarray(lower_bounds, dtype=self.dtype) self.upper_bounds = np.asarray(upper_bounds, dtype=self.dtype) self._rng = np.random.default_rng(seed) self._solutions = None # Strategy-specific params -> initialized in reset(). self.current_eval = None self.mean = None self.sigma = None self.pc = None self.ps = None self.cov = None
[docs] def reset(self, x0: np.ndarray) -> None: self.current_eval = 0 self.sigma = self.sigma0 self.mean = np.array(x0, self.dtype) # Setup evolution path variables. self.pc = np.zeros(self.solution_dim, dtype=self.dtype) self.ps = np.zeros(self.solution_dim, dtype=self.dtype) # Setup the covariance matrix. self.cov = DiagonalMatrix(self.solution_dim, self.dtype)
[docs] def check_stop(self, ranking_values: np.ndarray) -> bool: # Tolerances from pycma CMA-ES. if self.cov.condition_number > 1e14: return True # Area of distribution too small. area = self.sigma * np.sqrt(max(self.cov.eigenvalues)) if area < 1e-11: return True # Fitness is too flat (only applies if there are at least 2 parents). # NOTE: We use norm here because we may have multiple ranking values. if ( # noqa: SIM103 len(ranking_values) >= 2 and np.linalg.norm(ranking_values[0] - ranking_values[-1]) < 1e-12 ): return True return False
@staticmethod @nb.jit(nopython=True) def _transform_and_check_sol( unscaled_params, transform_vec, mean, lower_bounds, upper_bounds ): """Numba helper for transforming parameters to the solution space.""" solutions = np.expand_dims( transform_vec, axis=0 ) * unscaled_params + np.expand_dims(mean, axis=0) out_of_bounds = np.logical_or( solutions < np.expand_dims(lower_bounds, axis=0), solutions > np.expand_dims(upper_bounds, axis=0), ) return solutions, out_of_bounds
[docs] def ask(self, batch_size: Int | None = None) -> np.ndarray: if batch_size is None: batch_size = self.batch_size self._solutions = np.empty((batch_size, self.solution_dim), dtype=self.dtype) transform_vec = np.sqrt(self.cov.eigenvalues) # Resampling method for bound constraints -> sample new solutions until # all solutions are within bounds. remaining_indices = np.arange(batch_size) sampling_itrs = 0 while len(remaining_indices) > 0: unscaled_params = self._rng.normal( 0.0, self.sigma, (len(remaining_indices), self.solution_dim), ).astype(self.dtype) new_solutions, out_of_bounds = self._transform_and_check_sol( unscaled_params, transform_vec, self.mean, self.lower_bounds, self.upper_bounds, ) self._solutions[remaining_indices] = new_solutions # Find indices in remaining_indices that are still out of bounds # (out_of_bounds indicates whether each value in each solution is # out of bounds). remaining_indices = remaining_indices[np.any(out_of_bounds, axis=1)] # Warn if we have resampled too many times. sampling_itrs += 1 if sampling_itrs > BOUNDS_SAMPLING_THRESHOLD: warnings.warn(BOUNDS_WARNING, stacklevel=2) return arr_readonly(self._solutions)
@staticmethod def _conedf(df, mu, solution_dim): """Used for computing separable learning rate.""" return 1.0 / (df + 2.0 * np.sqrt(df) + float(mu) / solution_dim) @staticmethod def _cmudf(df, mu, alphamu): """Used for computing separable learning rate.""" return (alphamu + mu + 1.0 / mu - 2) / (df + 4 * np.sqrt(df) + mu / 2.0) def _calc_strat_params(self, solution_dim, num_parents): """Calculates weights, mueff, and learning rates for sep-CMA-ES. Refer here https://github.com/CMA-ES/pycma/blob/master/cma/evolution_strategy.py#L3767 for the learning rates. """ # Create fresh weights for the number of parents found. weights = np.log(num_parents + 0.5) - np.log(np.arange(1, num_parents + 1)) total_weights = np.sum(weights) weights = weights / total_weights mueff = np.sum(weights) ** 2 / np.sum(weights**2) # Dynamically update these strategy-specific parameters. cc_sep = (1 + 1 / solution_dim + mueff / solution_dim) / ( solution_dim**0.5 + 1 / solution_dim + 2 * mueff / solution_dim ) cs = (mueff + 2) / (solution_dim + mueff + 5) c1 = 2 / ((solution_dim + 1.3) ** 2 + mueff) c1_sep = c1 * self._conedf(solution_dim, mueff, solution_dim) # Instead of 0, pycma uses "rankmu_offset" here but also mentions it # barely affects performance. The rest of this code also doesn't use # rankmu_offset. cmu_sep = min(1 - c1_sep, self._cmudf(solution_dim, mueff, 0)) return weights, mueff, cc_sep, cs, c1_sep, cmu_sep @staticmethod @nb.jit(nopython=True) def _calc_mean(parents, weights): """Numba helper for calculating the new mean.""" return np.sum(parents * np.expand_dims(weights, axis=1), axis=0) @staticmethod @nb.jit(nopython=True) def _calc_weighted_ys(parents, old_mean, weights): """Calculates y's for use in rank-mu update.""" ys = parents - np.expand_dims(old_mean, axis=0) return ys * np.expand_dims(weights, axis=1), ys @staticmethod @nb.jit(nopython=True) def _calc_cov_update(cov, c1a, cmu, c1, pc, sigma, rank_mu_update, weights): """Calculates covariance matrix update.""" rank_one_update = c1 * pc**2 return ( cov * (1 - c1a - cmu * np.sum(weights)) + rank_one_update * c1 + rank_mu_update * cmu / (sigma**2) )
[docs] def tell( self, ranking_indices: np.ndarray, ranking_values: np.ndarray, num_parents: Int ) -> None: self.current_eval += len(self._solutions[ranking_indices]) if num_parents == 0: return parents = self._solutions[ranking_indices][:num_parents] weights, mueff, cc, cs, c1, cmu = self._calc_strat_params( self.solution_dim, num_parents ) damps = ( 1 + 2 * max( 0, np.sqrt((mueff - 1) / (self.solution_dim + 1)) - 1, ) + cs ) # Recombination of the new mean. old_mean = self.mean self.mean = self._calc_mean(parents, weights) # Update the evolution path. y = self.mean - old_mean z = self.cov.invsqrt * y self.ps = (1 - cs) * self.ps + (np.sqrt(cs * (2 - cs) * mueff) / self.sigma) * z left = ( np.sum(np.square(self.ps)) / self.solution_dim / (1 - (1 - cs) ** (2 * self.current_eval / self.batch_size)) ) right = 2 + 4.0 / (self.solution_dim + 1) hsig = 1 if left < right else 0 self.pc = (1 - cc) * self.pc + hsig * np.sqrt(cc * (2 - cc) * mueff) * y # Adapt the covariance matrix. weighted_ys, ys = self._calc_weighted_ys(parents, old_mean, weights) # Equivalent to calculating the outer product of each ys[i] with itself # and taking a weighted sum of the outer products. rank_mu_update = np.sum(weighted_ys * ys, axis=0) c1a = c1 * (1 - (1 - hsig**2) * cc * (2 - cc)) self.cov.cov = self._calc_cov_update( self.cov.cov, c1a, cmu, c1, self.pc, self.sigma, rank_mu_update, weights ) # Update sigma. cn, sum_square_ps = cs / damps, np.sum(np.square(self.ps)) self.sigma *= np.exp(min(1, cn * (sum_square_ps / self.solution_dim - 1) / 2))