Source code for ribs.schedulers._bayesian_opt_scheduler
"""Provides the BayesianOptimizationScheduler."""
from __future__ import annotations
from collections.abc import Sequence
from typing import Literal
import numpy as np
from numpy.typing import ArrayLike
from ribs.archives import ArchiveBase, GridArchive
from ribs.emitters import BayesianOptimizationEmitter
from ribs.schedulers._scheduler import Scheduler
[docs]
class BayesianOptimizationScheduler(Scheduler):
"""Similar to :class:`~Scheduler` but with support for upscaling archives.
This scheduler should only be used in conjunction with
:class:`~ribs.emitters.BayesianOptimizationEmitter` and
:class:`~ribs.archives.GridArchive`.
Args:
archive: An archive object.
emitters: A list of emitters.
result_archive: An additional archive where all solutions are added.
add_mode: Indicates how solutions should be added to the archive. The default is
"batch", which adds all solutions with one call to
:meth:`~ribs.archives.ArchiveBase.add`. Alternatively, use "single" to add
the solutions one at a time with
:meth:`~ribs.archives.ArchiveBase.add_single`. "single" mode is included
since implementing batch addition on an archive is sometimes non-trivial. We
highly recommend "batch" mode since it is significantly faster.
Raises:
TypeError: Some emitters are not BayesianOptimizationEmitter.
ValueError: Not all emitters have the same upscale schedule.
"""
def __init__(
self,
archive: GridArchive,
emitters: Sequence[BayesianOptimizationEmitter],
result_archive: ArchiveBase | None = None,
*,
add_mode: Literal["batch", "single"] = "batch",
) -> None:
super().__init__(archive, emitters, result_archive, add_mode=add_mode)
# Checks that all emitters are BayesianOptimizationEmitter and have the same
# upscale schedule.
this_upscale_schedule = None
for i, e in enumerate(emitters):
if not isinstance(e, BayesianOptimizationEmitter):
raise TypeError(
"All emitters must be of type BayesianOptimizationEmitter, "
f"but emitter{i} has type {e.__class__.__name__}"
)
if i == 0:
this_upscale_schedule = e.upscale_schedule
else:
other_upscale_schedule = e.upscale_schedule
if not (
# Either both schedules are None...
(this_upscale_schedule is None and other_upscale_schedule is None)
or
# ...or they are both numpy arrays with the same shape and values.
(
isinstance(this_upscale_schedule, np.ndarray)
and isinstance(other_upscale_schedule, np.ndarray)
and this_upscale_schedule.shape != other_upscale_schedule.shape
and np.all(this_upscale_schedule == other_upscale_schedule)
)
):
raise ValueError(
"All emitters must have the same upscale schedule. "
"emitter0 has upscale schedule "
f"{this_upscale_schedule}, but emitter{i} has upscale "
f"schedule {other_upscale_schedule}."
)
# Checks that ``archive`` is a GridArchive
if not isinstance(archive, GridArchive):
raise TypeError(
"Archive type must be GridArchive. Actually got "
f"{archive.__class__.__name__}"
)
if this_upscale_schedule is None:
self._upscale_schedule = None
else:
self._upscale_schedule = this_upscale_schedule.copy()
@property
def upscale_schedule(self) -> np.ndarray | None:
"""The upscale schedules for all the Bayesian optimization emitters.
None if emitters do not undergo archive upscaling.
"""
return self._upscale_schedule
@Scheduler.archive.setter
def archive(self, new_archive: GridArchive) -> None:
self._archive = new_archive
[docs]
def ask_dqd(self) -> None:
raise NotImplementedError(
"ask_dqd() is not supported by BayesianOptimizationScheduler."
)
[docs]
def tell_dqd(
self,
objective: ArrayLike | None,
measures: ArrayLike,
jacobian: ArrayLike,
**fields: ArrayLike | None,
) -> None:
raise NotImplementedError(
"tell_dqd() is not supported by BayesianOptimizationScheduler."
)
[docs]
def tell(
self,
objective: ArrayLike | None,
measures: ArrayLike,
**fields: ArrayLike | None,
) -> None:
"""Updates :attr:`emitters` and the :attr:`archive` with new data.
When **ALL** emitters are ready to upscale, calls
:meth:`~ribs.archives.GridArchive.retessellate` to upscale the archive.
Otherwise same as :meth:`~Scheduler.tell`.
"""
if self._last_called != "ask":
raise RuntimeError("tell() was called without calling ask().")
self._last_called = "tell"
data = self._validate_tell_data(
{
"objective": objective,
"measures": measures,
**fields,
}
)
add_info = self._add_to_archives(data)
pos = 0
this_upscale_res = None
for i, (emitter, n) in enumerate(
zip(self._emitters, self._num_emitted, strict=True)
):
end = pos + n
upscale_res = emitter.tell(
**{
name: None if arr is None else arr[pos:end]
for name, arr in data.items()
},
add_info={name: arr[pos:end] for name, arr in add_info.items()},
)
pos = end
# Check that all emitters have returned the same upscale resolution.
if self.upscale_schedule is not None:
if i == 0:
this_upscale_res = upscale_res
elif np.any(this_upscale_res != upscale_res):
raise ValueError(
"Emitters returned different upscale resolutions "
"when they should return the same. Emitter0 "
f"returned resolution {this_upscale_res}, but "
f"emitter{i} returned resolution {upscale_res}"
)
# If the upscale resolution is not None, upscales :attr:`archive` and all
# emitter archives.
if this_upscale_res is not None:
for e in self._emitters:
e.archive.retessellate(this_upscale_res)
e.post_upscale_updates()