BOP-Elites

Introduced in Kent 2024, Bayesian Optimization of Elites (BOP-Elites) adapts methods from Bayesian optimization to model the objective and measure functions and select solutions to evaluate. The script below shows how to implement BOP-Elites on the sphere linear projection benchmark.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Example script of running BOP-Elites on the sphere domain.

BOP-Elites was introduced in Kent 2024:
https://ieeexplore.ieee.org/abstract/document/10472301

Install the following dependencies before running this example:
    pip install ribs[visualize] pymoo tqdm fire
"""

from __future__ import annotations

import json
import time
from pathlib import Path

import fire
import matplotlib.pyplot as plt
import numpy as np
import tqdm

from ribs.archives import GridArchive
from ribs.emitters import BayesianOptimizationEmitter
from ribs.schedulers import BayesianOptimizationScheduler
from ribs.visualize import grid_archive_heatmap


def sphere(
    solutions: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Sphere function evaluation and measures for a batch of solutions.

    Args:
        solutions: (batch_size, dim) batch of solutions.

    Returns:
        objectives: (batch_size,) batch of objectives.
        objective_grads: (batch_size, solution_dim) batch of objective gradients.
        measures: (batch_size, 2) batch of measures.
        measure_grads: (batch_size, 2, solution_dim) batch of measure gradients.
    """
    dim = solutions.shape[1]

    # Shift the Sphere function so that the optimal value is at x_i = 2.048.
    sphere_shift = 5.12 * 0.4

    # Normalize the objective to the range [0, 100] where 100 is optimal.
    best_obj = 0.0
    worst_obj = (-5.12 - sphere_shift) ** 2 * dim
    raw_obj = np.sum(np.square(solutions - sphere_shift), axis=1)
    objectives = (raw_obj - worst_obj) / (best_obj - worst_obj) * 100

    # Compute gradient of the objective.
    objective_grads = -2 * (solutions - sphere_shift)

    # Calculate measures.
    clipped = solutions.copy()
    clip_mask = (clipped < -5.12) | (clipped > 5.12)
    clipped[clip_mask] = 5.12 / clipped[clip_mask]
    measures = np.concatenate(
        (
            np.sum(clipped[:, : dim // 2], axis=1, keepdims=True),
            np.sum(clipped[:, dim // 2 :], axis=1, keepdims=True),
        ),
        axis=1,
    )

    # Compute gradient of the measures.
    derivatives = np.ones(solutions.shape)
    derivatives[clip_mask] = -5.12 / np.square(solutions[clip_mask])

    mask_0 = np.concatenate((np.ones(dim // 2), np.zeros(dim - dim // 2)))
    mask_1 = np.concatenate((np.zeros(dim // 2), np.ones(dim - dim // 2)))

    d_measure0 = derivatives * mask_0
    d_measure1 = derivatives * mask_1

    measure_grads = np.stack((d_measure0, d_measure1), axis=1)

    return (
        objectives,
        objective_grads,
        measures,
        measure_grads,
    )


def save_heatmap(archive: GridArchive, heatmap_path: str | Path) -> None:
    """Saves a heatmap of the archive to the given path.

    Args:
        archive: The archive to save.
        heatmap_path: Image path for the heatmap.
    """
    plt.figure(figsize=(8, 6))
    grid_archive_heatmap(archive, vmin=0, vmax=100, cmap="viridis")
    plt.tight_layout()
    plt.savefig(heatmap_path)
    plt.close(plt.gcf())


def main(  # pylint: disable = too-many-positional-arguments
    iterations: int = 1000,
    solution_dim: int = 4,
    search_nrestarts: int = 5,
    entropy_ejie: bool = False,
    # See here for how to annotate tuples:
    # https://docs.python.org/3/library/typing.html#annotating-tuples
    upscale_schedule: tuple[tuple[int, ...], ...] = ((5, 5), (10, 10), (25, 25)),
    num_initial_samples: int = 20,
    initial_solutions: np.ndarray | None = None,
    batch_size: int = 8,
    num_emitters: int = 1,
    seed: int = 42,
    outdir: str = "test_logs",
    log_every: int = 20,
) -> None:
    """Main function for running BOP-Elites on the sphere domain."""
    # logdir for saving experiment data
    logdir = Path(outdir)
    logdir.mkdir(exist_ok=True)

    # The main grid archive that interacts with BOP-Elites. We start at the lowest
    # resolution from ``upscale_schedule``.
    max_bound = solution_dim / 2 * 5.12
    bounds = [(-max_bound, max_bound), (-max_bound, max_bound)]
    main_archive = GridArchive(
        solution_dim=solution_dim,
        dims=upscale_schedule[0],
        ranges=bounds,
        seed=seed,
    )
    # The passive archive that does NOT interact with BOP-Elites other than storing all
    # evaluated data seen so far. We do not scale the passive archive, and it always
    # stays at the final resolution.
    passive_archive = GridArchive(
        solution_dim=solution_dim,
        dims=upscale_schedule[-1],
        ranges=bounds,
        seed=seed,
    )

    # The main component of BOP-Elites
    emitters = [
        BayesianOptimizationEmitter(
            archive=main_archive,
            # BayesianOptimizationEmitter requires solution space bounds due to Sobol
            # sampling. i.e., bounds cannot be None.
            lower_bounds=np.full(solution_dim, -10.24),
            upper_bounds=np.full(solution_dim, 10.24),
            search_nrestarts=search_nrestarts,
            entropy_ejie=entropy_ejie,
            upscale_schedule=upscale_schedule,
            num_initial_samples=num_initial_samples,
            initial_solutions=initial_solutions,
            batch_size=batch_size,
            seed=seed + i,
        )
        for i in range(num_emitters)
    ]

    # Scheduler for managing multiple emitters (in what order we ask them for solutions
    # etc.).
    scheduler = BayesianOptimizationScheduler(
        main_archive, emitters, result_archive=passive_archive
    )

    metrics = {
        "QD Score": {
            "x": [0],
            "y": [scheduler.result_archive.stats.qd_score],
        },
        "Archive Coverage": {
            "x": [0],
            "y": [scheduler.result_archive.stats.coverage],
        },
        "Itr. Time": {
            "x": [],
            "y": [],  # Does not start at 0.
        },
    }

    for i in tqdm.trange(1, iterations + 1):
        itr_start_time = time.time()

        solutions = scheduler.ask()
        objectives, _, measures, _ = sphere(solutions)
        scheduler.tell(objectives, measures)

        final_itr = i == iterations
        if i % log_every == 0 or final_itr:
            if final_itr:
                scheduler.result_archive.data(return_type="pandas").to_csv(
                    logdir / "final_archive.csv"
                )

            metrics["QD Score"]["x"].append(i)
            metrics["QD Score"]["y"].append(scheduler.result_archive.stats.qd_score)
            metrics["Archive Coverage"]["x"].append(i)
            metrics["Archive Coverage"]["y"].append(
                scheduler.result_archive.stats.coverage
            )
            metrics["Itr. Time"]["x"].append(i)
            metrics["Itr. Time"]["y"].append(time.time() - itr_start_time)

            tqdm.tqdm.write(
                f"Iteration {i} | Archive Coverage: "
                f"{metrics['Archive Coverage']['y'][-1] * 100:.3f}% "
                f"QD Score: {metrics['QD Score']['y'][-1]:.3f}"
            )

            save_heatmap(
                scheduler.result_archive,
                logdir / f"heatmap_{i:08d}.png",
            )

    # Plot metrics.
    for metric, values in metrics.items():
        plt.plot(values["x"], values["y"])
        plt.title(metric)
        plt.xlabel("Iteration")
        plt.savefig(str(logdir / f"{metric.lower().replace(' ', '_')}.png"))
        plt.clf()

    # Convert metrics to Python scalars by calling .item(), since each stats value is a
    # 0-D array by default, and JSON cannot serialize 0-D arrays.
    for metric in metrics:  # pylint: disable = consider-using-dict-items
        metrics[metric]["y"] = [
            m if isinstance(m, (int, float)) else m.item() for m in metrics[metric]["y"]
        ]

    # Save metrics to JSON.
    with (logdir / "metrics.json").open("w") as file:
        json.dump(metrics, file, indent=2)


if __name__ == "__main__":
    fire.Fire(main)