Source code for ribs.emitters._bayesian_opt_emitter

"""Provides the BayesianOptimizationEmitter."""

from __future__ import annotations

import warnings
from collections.abc import Collection

import numpy as np
from numpy.typing import ArrayLike
from scipy.stats import entropy, norm
from scipy.stats.qmc import Sobol
from sklearn.exceptions import ConvergenceWarning
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern

from ribs._utils import check_batch_shape, check_finite, validate_batch
from ribs.archives import GridArchive
from ribs.emitters._emitter_base import EmitterBase
from ribs.typing import BatchData, Float, Int


[docs] class BayesianOptimizationEmitter(EmitterBase): """A sample-efficient emitter that models objective and measure functions with Gaussian process surrogate models. Bayesian Optimisation is used to emit solutions that are predicted to have high *Expected Joint Improvement of Elites* (EJIE) acquisition values. Refer to `Kent 2024 <https://ieeexplore.ieee.org/abstract/document/10472301>`_ for more information. .. note:: This emitter requires the `pymoo <https://pymoo.org>`_ package, which can be installed with ``pip install pymoo`` or ``conda install pymoo``. Args: archive: An archive to use when creating and inserting solutions. Currently, the only supported archive type is :class:`ribs.archives.GridArchive`. bounds: Bounds of the solution space. This is a sequence of tuples, each of the form ``(lower_bound, upper_bound)``. Unlike other emitters, either these bounds or the ``lower_bounds``/``upper_bounds`` below must be provided since SOBOL sampling is used. lower_bounds: Instead of specifying ``bounds``, ``lower_bounds`` and ``upper_bounds`` may be specified. This is useful if, for instance, solutions are multi-dimensional. Here, pass an array specifying the lower bounds of the solution space. upper_bounds: Upper bounds of the solution space; see ``lower_bounds`` above. search_nrestarts: Number of starting points for EJIE pattern search. entropy_ejie: If ``True``, augments EJIE acquisition function with entropy to encourage measure space exploration. Refer to Sec. 4.1 of `Kent 2023 <https://dl.acm.org/doi/10.1145/3583131.3590486>`_ for more details. upscale_schedule: An array of increasing archive resolutions starting with :attr:`archive.resolution` and ending with the user's intended final archive resolution. This will upscale the archive to the next scheduled resolution if every cell within the current archive has been filled, or the number of evaluated solutions is more than twice :attr:`archive.cells`. If ``None``, the archive will not be upscaled. min_obj: The lowest possible objective value. Serves as the default objective value within archive cells that have not been filled. Mainly used when computing expected improvement. num_initial_samples: The number of solutions that will be sampled from a Sobol sequence as the first batch of training data for gaussian processes. Either ``num_initial_samples`` or ``initial_solutions`` must be set. initial_solutions: An (n, solution_dim) array of solutions to be used as the first batch of training data for gaussian processes. Either ``num_initial_samples`` or ``initial_solutions`` must be set. batch_size: Number of solutions to return in :meth:`ask`. Must not exceed ``search_nrestarts``. It is recommended to set this to 1 for sample efficiency. seed: Seed for the random number generator. """ def __init__( self, archive: GridArchive, *, bounds: Collection[tuple[None | Float, None | Float]] | None = None, lower_bounds: ArrayLike | None = None, upper_bounds: ArrayLike | None = None, search_nrestarts: Int = 5, entropy_ejie: bool = False, upscale_schedule: ArrayLike | None = None, min_obj: Float = 0, num_initial_samples: Int | None = None, initial_solutions: ArrayLike | None = None, batch_size: Int = 1, seed: Int | None = None, ) -> None: try: # pylint: disable = import-outside-toplevel from pymoo.algorithms.soo.nonconvex.pattern import PatternSearch from pymoo.optimize import minimize from pymoo.problems.functional import FunctionalProblem from pymoo.termination.default import DefaultSingleObjectiveTermination except ImportError as e: raise ImportError( "pymoo must be installed -- please run `pip install pymoo` " "or `conda install pymoo`" ) from e self._pymoo_mods = { "PatternSearch": PatternSearch, "minimize": minimize, "FunctionalProblem": FunctionalProblem, "DefaultSingleObjectiveTermination": DefaultSingleObjectiveTermination, } if bounds is None and lower_bounds is None and upper_bounds is None: raise ValueError( "Bounds must be specified for BayesianOptimizationEmitter, either " "with the bounds parameter or with lower_bounds and upper_bounds." ) EmitterBase.__init__( self, archive, solution_dim=archive.solution_dim, bounds=bounds, lower_bounds=lower_bounds, upper_bounds=upper_bounds, ) check_finite(self.lower_bounds, "lower_bounds") check_finite(self.upper_bounds, "upper_bounds") if not isinstance(archive, GridArchive): raise NotImplementedError( f"archive type {archive.__class__.__name__} not implemented for" " BayesianOptimizationEmitter. Expected GridArchive." ) if (upscale_schedule is not None) and ( not np.isclose(archive.learning_rate, 1) ): raise NotImplementedError( "Archive upscaling is currently incompatible with archive " "learning rate. Since you have specified an upscale schedule " f"{upscale_schedule}, the learning rate of the input archive " f"must be 1 (currently {archive.learning_rate})." ) self._seed = seed self._sobol = Sobol(d=self.solution_dim, scramble=True, seed=self._seed) # Initializes a multi-output GP. 1 output for objective function, plus 1 # output for each measure function # NOTE: Using Matern kernal with default parameters self._gp = GaussianProcessRegressor( kernel=Matern(), normalize_y=True, n_targets=1 + self.measure_dim ) if num_initial_samples is None and initial_solutions is None: raise ValueError( "Either num_initial_samples or initial_solutions must be provided." ) if num_initial_samples is not None and initial_solutions is not None: raise ValueError( "num_initial_samples and initial_solutions cannot both be provided." ) if initial_solutions is not None: self._initial_solutions = np.asarray( initial_solutions, dtype=archive.dtypes["solution"] ) else: self._initial_solutions = self._sample_n_rescale(num_initial_samples) check_batch_shape( self._initial_solutions, "initial_solutions", archive.solution_dim, "archive.solution_dim", ) self._dataset = { "solution": np.empty((0, self.solution_dim), dtype=self.dtype), "objective": np.empty((0, 1)), "measures": np.empty((0, self.measure_dim)), } self._search_nrestarts = search_nrestarts if upscale_schedule is None: self._upscale_schedule = None else: self._upscale_schedule = np.asarray(upscale_schedule) self._check_upscale_schedule(self._upscale_schedule) self._batch_size = batch_size self._misspec = 0 self._overspec = 0 self._prev_numcells = len(self.archive) self._numitrs_noprogress = 0 self._entropy_norm = ( entropy(np.ones(self.archive.cells) / self.archive.cells) if entropy_ejie else None ) self._min_obj = min_obj @property def batch_size(self) -> Int: """Number of solutions to return in :meth:`ask`.""" return self._batch_size @property def cell_prob_cutoff(self) -> Float: """Cutoff value (ohm) for :meth:`_get_cell_probs`. Described in `Kent 2024 <https://ieeexplore.ieee.org/abstract/document/10472301>`_ Sec.IV-D. There are some numerical errors involved with cell_probs, so even passing the same sample in different shapes/contexts can sometimes return slightly different cell_probs, so we return cell_prob_cutoff at a lower precision than cell_probs to ensure the same sample consistently passes/fails the threshold check. """ return round( 0.5 * (2 / self.archive.cells) ** ( (10 * self.solution_dim) / (self._misspec - 2 * self._overspec + self.num_evals + 1e-6) ) ** 0.5, 4, ) @property def num_evals(self) -> int: """Number of solutions stored in :attr:`_dataset`. This is the number of solutions that have been evaluated since the initialization of this emitter. """ return self._dataset["solution"].shape[0] @property def measure_dim(self) -> int: """Number of measure functions.""" return self.archive.measure_dim @property def num_sobol_samples(self) -> Int: """Number of SOBOL samples when choosing pattern search starting points in :meth:`ask`. .. note:: If measure function gradients are available, a potentially better way to do this might be to do Latin Hypercube sampling within measure space, and then use measure gradients to find solutions achieving those measure space samples. See `Kent 2024b <https://wrap.warwick.ac.uk/id/eprint/189556/1/WRAP_Theses_Kent_2024.pdf>`_ Sec. 6.3 for more details. """ m = 10 if self.solution_dim < 2 else 1 return np.clip( m * (self.solution_dim**2) * np.prod(self.measure_dim), 10000, 100000, ) @property def dtype(self) -> np.dtype: """Data type of solutions.""" return self.archive.dtypes["solution"] @property def upscale_schedule(self) -> np.ndarray | None: """Archive upscale schedule. Defined when initializing this emitter. """ return self._upscale_schedule @property def upscale_trigger_threshold(self) -> Int: """Maximum number of iterations the emitter is allowed to not find new cells before archive upscale is triggered. See `here <https://github.com/kentwar/BOPElites/blob/main/algorithm/BOP_Elites_UKD_beta.py#L187>`_ for more details. """ return np.floor(np.sqrt(self.archive.cells)) @property def min_obj(self) -> Float: """The lowest possible objective value. Refer to the documentation for this class. """ return self._min_obj @property def initial_solutions(self) -> np.ndarray | None: """Returned when the archive is empty (if :attr:`x0` is not set).""" return self._initial_solutions @EmitterBase.archive.setter def archive(self, new_archive: GridArchive) -> None: """Allows resetting the archive associated with this emitter (for archive upscaling).""" self._archive = new_archive
[docs] def post_upscale_updates(self) -> None: """Runs after the scheduler upscales the archive. This method updates :attr:`_entropy_norm` according to new number of archive cells and resets :attr:`_numitrs_noprogress` to 0. """ if self._entropy_norm is not None: self._entropy_norm = entropy( np.ones(self.archive.cells) / self.archive.cells ) self._numitrs_noprogress = 0
def _update_no_coverage_progress(self) -> None: """Potentially increments :attr:`_numitrs_noprogress`. Increments if number of discovered archive cells remains the same for two successive calls to this function. Otherwise resets :attr:`_numitrs_noprogress` to 0. """ if len(self.archive) == self._prev_numcells: self._numitrs_noprogress += self.batch_size else: self._numitrs_noprogress = 0 self._prev_numcells = len(self.archive) def _check_upscale_schedule(self, upscale_schedule: np.ndarray) -> None: """Checks that ``upscale_schedule`` is a valid upscale schedule. Specifically: 1. Must be a 2D array where the second dim equals :attr:`measure_dim`. 2. The resolutions corresponding to each measure must be non-decreasing along axis 0. 3. The first resolution within the schedule must equal :attr:`archive.dims`. Example of valid upscale_schedule: [ [5, 5], [5, 10], [10, 10] ] Example of invalid upscale_schedule: [ [5, 5], [5, 10], [10, 5] <- resolution for measure 2 decreases ] Args: upscale_schedule: See ``upscale_schedule`` from :meth:`__init__`. """ if upscale_schedule.ndim != 2: raise ValueError("upscale_schedule must have 2 dimensions.") if upscale_schedule.shape[1] != self.measure_dim: raise ValueError( f"Expected upscale_schedule of shape (any,{self.measure_dim}), " f"actually got {upscale_schedule.shape}." ) if not np.all(np.diff(upscale_schedule, axis=0) >= 0): raise ValueError( "The resolutions corresponding to each measure must be " "non-decreasing along axis 0." ) if not np.all(self.archive.dims == upscale_schedule[0]): raise ValueError( "Expected the first resolution within upscale_schedule to be " f"{self.archive.dims} (the resolution of this emitter's " f"archive), actually got {upscale_schedule[0]}." ) def _sample_n_rescale(self, num_samples: int) -> np.ndarray: """Samples `num_samples` solutions from the SOBOL sequence. The solutions are also rescaled to the bounds of the search space. Args: num_samples: Number of solutions to sample. Returns: Array of shape (num_samples, :attr:`solution_dim`) containing the sampled solutions. """ # SOBOL samples are in range [0, 1]. Need to rescale to bounds sobol_samples = self._sobol.random(n=num_samples) rescaled_samples = self.lower_bounds + sobol_samples * ( self.upper_bounds - self.lower_bounds ) return rescaled_samples def _get_expected_improvements( self, obj_mus: np.ndarray, obj_stds: np.ndarray ) -> np.ndarray: """Computes expected improvements predicted by :attr:`_gp`. The improvements are calculated for a batch of solutions over all cells in the current archive. This function takes in the posterior means and standard deviations predicted by the objective gaussian process instead of the solutions themselves to avoid redundant computation. Args: obj_mus: Array of shape (num_solutions,) containing the posterior objective means predicted by the gaussian process. obj_stds: Array of shape (num_solutions,) containing the posterior objective standard deviations predicted by the gaussian process. Returns: Array of shape (num_solutions, :attr:`archive.cells`) containing the expected improvements for each solution over each cell. """ num_samples = obj_mus.shape[0] all_obj = np.full((self.archive.cells,), self.min_obj) elite_idx, elite_obj = self.archive.data( ["index", "objective"], return_type="tuple" ) all_obj[elite_idx] = elite_obj distribution = norm( loc=np.repeat(all_obj[None, :], num_samples, axis=0), scale=np.repeat(obj_stds[:, None], self.archive.cells, axis=1), ) return (obj_mus[:, None] - all_obj) * distribution.cdf( obj_mus[:, None] ) + obj_stds[:, None] * distribution.pdf(obj_mus[:, None]) def _get_cell_probs( self, meas_mus: np.ndarray, meas_stds: np.ndarray, normalize: bool = True, cutoff: bool = True, ) -> np.ndarray: """Computes archive cell membership probabilities predicted by :attr:`_gp`. Probabilities are computed for a batch of solutions. This function takes in the posterior means and standard deviations predicted by the measure gaussian processes instead of the solutions themselves to avoid redundant computation. Args: meas_mus: Array of shape (num_solutions, :attr:`measure_dim`) containing the posterior measure means predicted by the gaussian process. meas_stds: Array of shape (num_solutions, :attr:`measure_dim`) containing the posterior measure standard deviations predicted by the gaussian process. normalize: If ``True``, normalizes the cell probabilities such that they sum to 1 for each solution. cutoff: If ``True``, sets cell probabilities below :attr:`cell_prob_cutoff` to 0. Returns: Array of shape (num_solutions, :attr:`archive.cells`) containing the predicted cell probabilities for each solution. """ num_solutions = meas_mus.shape[0] cell_probs = np.ones((num_solutions, *self.archive.dims)) for measure_idx, (mus, stds) in enumerate( zip(meas_mus.T, meas_stds.T, strict=True) ): distribution = norm(loc=mus, scale=stds) # computes the cdf values at each cell boundary, this has shape # (num_solutions, num_boundaries). cdf_vals = distribution.cdf(self.archive.boundaries[measure_idx][:, None]).T # takes the difference between each pair of adjacent boundaries, # this has shape (num_solutions, num_boundaries-1) = (num_solutions, # measure_resolution) cdf_diffs = np.diff(cdf_vals, axis=1) # reshapes diffs to be compatible with element-wise multiplication for i in range(self.measure_dim): if i != measure_idx: # axis i+1 because first axis is num_solutions cdf_diffs = np.expand_dims(cdf_diffs, axis=i + 1) cell_probs *= cdf_diffs cell_probs = cell_probs.reshape((num_solutions, self.archive.cells)) if cutoff: cell_probs[cell_probs < self.cell_prob_cutoff] = 0 if normalize: # with ``cutoff``, it is possible a solution has 0 prob on all # cells, we don't normalize on those to prevent numerical error cell_probs_sum = np.sum(cell_probs, axis=1)[:, None] cell_probs_sum[cell_probs_sum == 0] = 1 cell_probs /= cell_probs_sum return cell_probs def _get_ejie_values(self, samples: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Computes *Expected Joint Improvement of Elites* (EJIE) acquisition values. Value are computed by multiplying the predicted expected improvements and cell membership probabilities. Returns individual EJIE values for each cell in an array of shape (num_solutions, :attr:`archive.cells`). You can use `np.sum(result, axis=1)` to get the total EJIE on the entire archive. Also returns the predicted cell membership probabilities for each sample in an array of shape (num_solutions, :attr:`archive.cells`). Args: samples: Array of shape (num_samples, :attr:`solution_dim`) containing samples whose EJIE values need to be computed. Returns: Returns an array of shape (num_solutions, :attr:`archive.cells`) containing each solution's EJIE values for each cell. Also returns an array of shape (num_solutions, :attr:`archive.cells`) containing the predicted cell membership probabilities for each solution. """ mus, stds = self._gp.predict( samples.reshape(-1, self.solution_dim), return_std=True ) expected_improvements = self._get_expected_improvements(mus[:, 0], stds[:, 0]) cell_probs = self._get_cell_probs( mus[:, 1:], stds[:, 1:], normalize=True, cutoff=True ) if self._entropy_norm is not None: all_zero_filter = np.all(np.isclose(cell_probs, 0), axis=1) entropies = np.zeros((mus.shape[0], 1)) entropies[~all_zero_filter] = entropy(cell_probs[~all_zero_filter], axis=1)[ :, None ] ejie_by_cell = ( expected_improvements * cell_probs * (1 + entropies / self._entropy_norm) ) else: ejie_by_cell = expected_improvements * cell_probs return ejie_by_cell, cell_probs
[docs] def ask(self) -> np.ndarray: """Returns solutions that are predicted to have high EJIE values. If ``self._gp`` has not been trained on any data and ``self._initial_solutions`` is set, we return ``self._initial_solutions``, which was either provided by user at emitter initialization or sampled from a Sobol sequence. If ``self._gp`` has been trained on some data: 1. Samples :attr:`num_sobol_samples` SOBOL samples. 2. Computes the EJIE values for each sample, and keeps the top :attr:`_search_nrestarts` samples with the largest EJIE values and as starting points for pattern search. 3. Starts a pattern search instance for each starting point to maximize their EJIE values. 4. After all pattern search instances have converged, checks if at least :attr:`batch_size` samples with positive EJIE values have been found. If not, increments :attr:`_overspec` and repeats the process until at least :attr:`batch_size` solutions with positive EJIE values have been found. 5. Returns the top :attr:`batch_size` solutions with the largest EJIE values. NOTE: This process has been simplified from the original implementation. The following are the components that are in the BOP-Elites source codes but removed here for simplicity: 1. `load_previous_points <https://github.com/kentwar/BOPElites/blob/main/algorithm/BOP_Elites_UKD.py#L337>`_ 2. `gen_elite_children <https://github.com/kentwar/BOPElites/blob/ main/algorithm/BOP_Elites_UKD.py#L298>`_ 3. We no longer restrict all starting points to be from unique cells. We understand this might compromise performance a bit, but enforcing all starting points from unique cells becomes messy in extreme cases when, for example, our archive resolution is so low that the number of cells is smaller than the number of starting points. Additionally, to my current understanding, it is not guaranteed that starting points from unique cells will result in higher optimized EJIE, because some cells might be easier to improve than others. 4. We no longer explicitly add samples predicted to be in empty cells to the starting point pool, since samples predicted to be in empty cells should already have high EJIE. Returns: numpy.ndarray: Array of shape (:attr:`batch_size`, :attr:`solution_dim`) containing the solutions with the largest EJIE values in descending EJIE order. """ if self.num_evals == 0: return np.clip(self.initial_solutions, self.lower_bounds, self.upper_bounds) # pymoo minimizes so need to negate pymoo_problem = self._pymoo_mods["FunctionalProblem"]( n_var=self.solution_dim, objs=lambda x: -np.sum(self._get_ejie_values(x)[0], axis=1), xl=self.lower_bounds, xu=self.upper_bounds, ) termination = self._pymoo_mods["DefaultSingleObjectiveTermination"]() optimization_outcomes = { "optimized_samples": [], "optimized_ejie_by_cell": [], "optimized_cell_probs": [], } while len(optimization_outcomes["optimized_samples"]) < self.batch_size: samples = self._sample_n_rescale(self.num_sobol_samples) starting_ejie_by_cell, _ = self._get_ejie_values(samples) search_starting_points = samples[ np.argsort(np.sum(starting_ejie_by_cell, axis=1))[ -self._search_nrestarts : ] ] # optimizes ejie values of starting points found_positive_ejie = False for x0 in search_starting_points: optimizer = self._pymoo_mods["PatternSearch"](x0=x0) # Note: Using default pymoo minimize, PatternSearch, and # termination. result = self._pymoo_mods["minimize"]( problem=pymoo_problem, algorithm=optimizer, termination=termination, copy_algorithm=False, seed=self._seed, ) if -result.F > 0: optimization_outcomes["optimized_samples"].append(result.X) # retrieve the cell-wise EJIE and probs for optimized # solution opt_ejie_by_cell, opt_cell_probs = self._get_ejie_values(result.X) optimization_outcomes["optimized_ejie_by_cell"].append( opt_ejie_by_cell.squeeze() ) optimization_outcomes["optimized_cell_probs"].append( opt_cell_probs.squeeze() ) found_positive_ejie = True # if didn't find any positive ejie after optimization, increments # over-specification count # (we don't increment the over-specification count if we found # some positive EJIEs but not enough to fill the batch) if not found_positive_ejie: self._overspec += 1 optimized_samples = np.array(optimization_outcomes["optimized_samples"]) ejie_by_cell = np.array(optimization_outcomes["optimized_ejie_by_cell"]) cell_probs = np.array(optimization_outcomes["optimized_cell_probs"]) total_ejies = np.sum(ejie_by_cell, axis=1) # Most likely cell for each optimized solution best_cell_idx = np.argmax(cell_probs, axis=1) best_cell_probs = cell_probs[range(cell_probs.shape[0]), best_cell_idx] # Computes EJIE attributions of the most likely cell for each solution ejie_attributions = ( ejie_by_cell[range(ejie_by_cell.shape[0]), best_cell_idx] / total_ejies ) # Sort by EJIE, take the top :attr:`batch_size` samples sorted_idx = np.argsort(total_ejies)[::-1][: self.batch_size] # NOTE: BOP-Elites Algorithm 1 implements a different mis-specification # check, in which a mis-specification occurs if a sample is predicted # to be in a cell with high confidence, but the prediction turns out # to be wrong. # We implement a new mis-specification check as recommended by the # author. New mis-specification checks whether most of a sample's EJIE # is attributed to a single cell, which has low predicted cell # probability. This corresponds to the (undesirable) scenario in which # a cell that is likely unreachable dominates EJIE. for best_prob, attr_val in zip( best_cell_probs[sorted_idx], ejie_attributions[sorted_idx], strict=True ): if best_prob < 0.5 < attr_val: self._misspec += 1 return optimized_samples[sorted_idx]
[docs] def tell( self, solution: ArrayLike, objective: ArrayLike, measures: ArrayLike, add_info: BatchData, **fields: ArrayLike, ) -> np.ndarray | None: """Updates the gaussian process and potentially upscales the archive. The function does the following: 1. Adds ``solution``, ``objective``, and ``measures`` to :attr:`_dataset`. 2. Updates :attr:`_gp` with :attr:`_dataset`. 3. For each solution whose EJIE attribution exceeds 50%, checks whether its predicted cell is different from the cell it is actually assigned according to its evaluated measures. If so, increments :attr:`_misspec`. 4. If :attr:`upscale_schedule` is not ``None``, and if the archive upscale conditions have been met, sends an upscale signal upstream by returning the next resolution to upscale to. Args: solution: (batch_size, :attr:`solution_dim`) array of solutions generated by this emitter's :meth:`ask()` method. objective: 1D array containing the objective function value of each solution. measures: (batch_size, :attr:`measure_dim`) array with the measure values of each solution. add_info: Data returned from the archive :meth:`~ribs.archives.ArchiveBase.add` method. fields: Additional data for each solution. Each argument should be an array with batch_size as the first dimension. Returns: A 1D array of shape (:attr:`measure_dim`,) containing the next resolution to upscale to. The actual upscaling will be done in the scheduler, through :meth:`~ribs.schedulers.BayesianOptimizationScheduler.tell`. If no upscaling is needed in the current step, returns ``None``. """ data, add_info = validate_batch( self.archive, { "solution": solution, "objective": objective, "measures": measures, **fields, }, add_info, ) # Adds new data to dataset. self._dataset["solution"] = np.vstack( (self._dataset["solution"], data["solution"]) ) self._dataset["objective"] = np.vstack( (self._dataset["objective"], data["objective"].reshape(-1, 1)) ) self._dataset["measures"] = np.vstack( (self._dataset["measures"], data["measures"]) ) # Updates (actually re-trains) GP with updated dataset. # sklearn occasionally raises LBFGS ConvergenceWarning, but this does # not seem to impact BOP-Elites performance too much. with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=ConvergenceWarning) self._gp.fit( X=self._dataset["solution"], y=np.hstack((self._dataset["objective"], self._dataset["measures"])), ) # Checks upscale conditions and upscales if needed # NOTE: BOP-Elites Algorithm 1 implements a slightly different upscale # condition, in which the archive upscale is triggered if either all # its cells have been filled or if num_evals > 2*cells. However, the # old condition may struggle with applications where some cells are not # feasible. We implement an improved condition here as recommended by # the original author. The new condition triggers the upscale when no # new cell has been found for multiple iterations. self._update_no_coverage_progress() if ( (self.upscale_schedule is not None) and np.any(np.all(self.upscale_schedule > self.archive.dims, axis=1)) and self._numitrs_noprogress > self.upscale_trigger_threshold ): # The next resolution on the schedule that is higher than the # current resolution along all measure dims next_res = self.upscale_schedule[ np.all(self.upscale_schedule > self.archive.dims, axis=1) ][0] return next_res return None