Source code for ribs.emitters.opt._sep_cma_es

"""Implementation of sep-CMA-ES that can be used across various emitters.

Adapted from Nikolaus Hansen's pycma:
https://github.com/CMA-ES/pycma/blob/master/cma/purecma.py
"""
import warnings

import numba as nb
import numpy as np

from ribs._utils import readonly
from ribs.emitters.opt._evolution_strategy_base import (
    BOUNDS_SAMPLING_THRESHOLD, BOUNDS_WARNING, EvolutionStrategyBase)


class DiagonalMatrix:
    """Maintains a diagonal covariance matrix.

    Args:
        dimension (int): Size of the (square) covariance matrix.
        dtype (str or data-type): Data type of the matrix, typically np.float32
            or np.float64.
    """

    def __init__(self, dimension, dtype):
        self.cov = np.ones((dimension,), dtype=dtype)
        self.dtype = dtype

        # The last evaluation on which the eigensystem was updated.
        self.updated_eval = 0

    @property
    def condition_number(self):
        """Condition number of the covariance matrix."""
        return np.max(self.cov) / np.min(self.cov)

    @property
    def eigenvalues(self):
        """Eigenvalues (equal to covariance matrix since it is diagonal)."""
        return self.cov

    @property
    def invsqrt(self):
        """C^-1/2."""
        return 1 / np.sqrt(self.cov)


[docs]class SeparableCMAEvolutionStrategy(EvolutionStrategyBase): """sep-CMA-ES optimizer for use with emitters. Refer to :class:`EvolutionStrategyBase` for usage instruction. Args: sigma0 (float): Initial step size. batch_size (int): Number of solutions to evaluate at a time. If None, we calculate a default batch size based on solution_dim. solution_dim (int): Size of the solution space. seed (int): Seed for the random number generator. dtype (str or data-type): Data type of solutions. lower_bounds (float or np.ndarray): scalar or (solution_dim,) array indicating lower bounds of the solution space. Scalars specify the same bound for the entire space, while arrays specify a bound for each dimension. Pass -np.inf in the array or scalar to indicated unbounded space. upper_bounds (float or np.ndarray): Same as above, but for upper bounds (and pass np.inf instead of -np.inf). """ def __init__( # pylint: disable = super-init-not-called self, sigma0, solution_dim, batch_size=None, seed=None, dtype=np.float64, lower_bounds=-np.inf, upper_bounds=np.inf): self.batch_size = (4 + int(3 * np.log(solution_dim)) if batch_size is None else batch_size) self.sigma0 = sigma0 self.solution_dim = solution_dim self.dtype = dtype # Even scalars must be converted into 0-dim arrays so that they work # with the bound check in numba. self.lower_bounds = np.asarray(lower_bounds, dtype=self.dtype) self.upper_bounds = np.asarray(upper_bounds, dtype=self.dtype) self._rng = np.random.default_rng(seed) self._solutions = None # Strategy-specific params -> initialized in reset(). self.current_eval = None self.mean = None self.sigma = None self.pc = None self.ps = None self.cov = None
[docs] def reset(self, x0): self.current_eval = 0 self.sigma = self.sigma0 self.mean = np.array(x0, self.dtype) # Setup evolution path variables. self.pc = np.zeros(self.solution_dim, dtype=self.dtype) self.ps = np.zeros(self.solution_dim, dtype=self.dtype) # Setup the covariance matrix. self.cov = DiagonalMatrix(self.solution_dim, self.dtype)
[docs] def check_stop(self, ranking_values): # Tolerances from pycma CMA-ES. if self.cov.condition_number > 1e14: return True # Area of distribution too small. area = self.sigma * np.sqrt(max(self.cov.eigenvalues)) if area < 1e-11: return True # Fitness is too flat (only applies if there are at least 2 parents). # NOTE: We use norm here because we may have multiple ranking values. if (len(ranking_values) >= 2 and np.linalg.norm(ranking_values[0] - ranking_values[-1]) < 1e-12): return True return False
@staticmethod @nb.jit(nopython=True) def _transform_and_check_sol(unscaled_params, transform_vec, mean, lower_bounds, upper_bounds): """Numba helper for transforming parameters to the solution space.""" solutions = (np.expand_dims(transform_vec, axis=0) * unscaled_params + np.expand_dims(mean, axis=0)) out_of_bounds = np.logical_or( solutions < np.expand_dims(lower_bounds, axis=0), solutions > np.expand_dims(upper_bounds, axis=0), ) return solutions, out_of_bounds
[docs] def ask(self, batch_size=None): if batch_size is None: batch_size = self.batch_size self._solutions = np.empty((batch_size, self.solution_dim), dtype=self.dtype) transform_vec = np.sqrt(self.cov.eigenvalues) # Resampling method for bound constraints -> sample new solutions until # all solutions are within bounds. remaining_indices = np.arange(batch_size) sampling_itrs = 0 while len(remaining_indices) > 0: unscaled_params = self._rng.normal( 0.0, self.sigma, (len(remaining_indices), self.solution_dim), ).astype(self.dtype) new_solutions, out_of_bounds = self._transform_and_check_sol( unscaled_params, transform_vec, self.mean, self.lower_bounds, self.upper_bounds) self._solutions[remaining_indices] = new_solutions # Find indices in remaining_indices that are still out of bounds # (out_of_bounds indicates whether each value in each solution is # out of bounds). remaining_indices = remaining_indices[np.any(out_of_bounds, axis=1)] # Warn if we have resampled too many times. sampling_itrs += 1 if sampling_itrs > BOUNDS_SAMPLING_THRESHOLD: warnings.warn(BOUNDS_WARNING) return readonly(self._solutions)
@staticmethod def _conedf(df, mu, solution_dim): """Used for computing separable learning rate.""" return 1. / (df + 2. * np.sqrt(df) + float(mu) / solution_dim) @staticmethod def _cmudf(df, mu, alphamu): """Used for computing separable learning rate.""" return (alphamu + mu + 1. / mu - 2) / (df + 4 * np.sqrt(df) + mu / 2.) def _calc_strat_params(self, solution_dim, num_parents): """Calculates weights, mueff, and learning rates for sep-CMA-ES. Refer here https://github.com/CMA-ES/pycma/blob/master/cma/evolution_strategy.py#L3767 for the learning rates. """ # Create fresh weights for the number of parents found. weights = (np.log(num_parents + 0.5) - np.log(np.arange(1, num_parents + 1))) total_weights = np.sum(weights) weights = weights / total_weights mueff = np.sum(weights)**2 / np.sum(weights**2) # Dynamically update these strategy-specific parameters. cc = ((4 + mueff / solution_dim) / # pylint: disable = unused-variable (solution_dim + 4 + 2 * mueff / solution_dim)) cc_sep = ( (1 + 1 / solution_dim + mueff / solution_dim) / (solution_dim**0.5 + 1 / solution_dim + 2 * mueff / solution_dim)) cs = (mueff + 2) / (solution_dim + mueff + 5) c1 = 2 / ((solution_dim + 1.3)**2 + mueff) c1_sep = c1 * self._conedf(solution_dim, mueff, solution_dim) cmu = min( # pylint: disable = unused-variable 1 - c1, 2 * (mueff - 2 + 1 / mueff) / ((solution_dim + 2)**2 + mueff), ) # Instead of 0, pycma uses "rankmu_offset" here but also mentions it # barely affects performance. The rest of this code also doesn't use # rankmu_offset. cmu_sep = min(1 - c1_sep, self._cmudf(solution_dim, mueff, 0)) return weights, mueff, cc_sep, cs, c1_sep, cmu_sep @staticmethod @nb.jit(nopython=True) def _calc_mean(parents, weights): """Numba helper for calculating the new mean.""" return np.sum(parents * np.expand_dims(weights, axis=1), axis=0) @staticmethod @nb.jit(nopython=True) def _calc_weighted_ys(parents, old_mean, weights): """Calculates y's for use in rank-mu update.""" ys = parents - np.expand_dims(old_mean, axis=0) return ys * np.expand_dims(weights, axis=1), ys @staticmethod @nb.jit(nopython=True) def _calc_cov_update(cov, c1a, cmu, c1, pc, sigma, rank_mu_update, weights): """Calculates covariance matrix update.""" rank_one_update = c1 * pc**2 return (cov * (1 - c1a - cmu * np.sum(weights)) + rank_one_update * c1 + rank_mu_update * cmu / (sigma**2))
[docs] def tell(self, ranking_indices, ranking_values, num_parents): self.current_eval += len(self._solutions[ranking_indices]) if num_parents == 0: return parents = self._solutions[ranking_indices][:num_parents] weights, mueff, cc, cs, c1, cmu = self._calc_strat_params( self.solution_dim, num_parents) damps = (1 + 2 * max( 0, np.sqrt((mueff - 1) / (self.solution_dim + 1)) - 1, ) + cs) # Recombination of the new mean. old_mean = self.mean self.mean = self._calc_mean(parents, weights) # Update the evolution path. y = self.mean - old_mean z = self.cov.invsqrt * y self.ps = ((1 - cs) * self.ps + (np.sqrt(cs * (2 - cs) * mueff) / self.sigma) * z) left = (np.sum(np.square(self.ps)) / self.solution_dim / (1 - (1 - cs)**(2 * self.current_eval / self.batch_size))) right = 2 + 4. / (self.solution_dim + 1) hsig = 1 if left < right else 0 self.pc = ((1 - cc) * self.pc + hsig * np.sqrt(cc * (2 - cc) * mueff) * y) # Adapt the covariance matrix. weighted_ys, ys = self._calc_weighted_ys(parents, old_mean, weights) # Equivalent to calculating the outer product of each ys[i] with itself # and taking a weighted sum of the outer products. rank_mu_update = np.sum(weighted_ys * ys, axis=0) c1a = c1 * (1 - (1 - hsig**2) * cc * (2 - cc)) self.cov.cov = self._calc_cov_update(self.cov.cov, c1a, cmu, c1, self.pc, self.sigma, rank_mu_update, weights) # Update sigma. cn, sum_square_ps = cs / damps, np.sum(np.square(self.ps)) self.sigma *= np.exp( min(1, cn * (sum_square_ps / self.solution_dim - 1) / 2))