1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441 | """Uses CMA-ME to train agents with linear policies in Lunar Lander.
To run this example, first install SWIG (https://swig.org), such as with the system
package manager (sudo apt-get install swig) or via the Python Package Index (pip install
swig). SWIG is necessary for box2d, which is a used in the Lunar Lander environment.
After SWIG is installed, install the following Python dependencies:
pip install ribs[visualize] tqdm "gymnasium[box2d]>=1.0.0" "moviepy>=1.0.0" dask distributed bokeh fire
This script uses the same setup as the tutorial, but it also uses Dask instead of
Python's multiprocessing to parallelize evaluations on a single machine and adds in a
CLI. Refer to the tutorial here:
https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for more info.
You should not need much familiarity with Dask to read this example. However, if you
would like to know more about Dask, we recommend referring to the quickstart for Dask
distributed: https://distributed.dask.org/en/latest/quickstart.html.
This script creates an output directory (defaults to `lunar_lander_output/`, see the
--outdir flag) with the following files:
- archive.csv: The CSV representation of the final archive, obtained with data().
- archive_ccdf.png: A plot showing the (unnormalized) complementary cumulative
distribution function of objectives in the archive. For each objective p on the
x-axis, this plot shows the number of solutions that had an objective of at least
p.
- heatmap.png: A heatmap showing the performance of solutions in the archive.
- metrics.json: Metrics about the run, saved as a mapping from the metric name to a
list of x values (iteration number) and a list of y values (metric value) for that
metric.
- {metric_name}.png: Plots of the metrics, currently just `archive_size` and
`max_score`.
In evaluation mode (--run-eval flag), the script will read in the archive from the
output directory and simulate 10 random solutions from the archive. It will write videos
of these simulations to a `videos/` subdirectory in the output directory.
Usage:
# Basic usage - should take ~1 hour with 4 cores.
python lunar_lander.py NUM_WORKERS
# Now open the Dask dashboard at http://localhost:8787 to view worker
# status.
# Evaluation mode. If you passed a different outdir and/or env_seed when
# running the algorithm with the command above, you must pass the same
# outdir and/or env_seed here.
python lunar_lander.py --run-eval
Help:
python lunar_lander.py --help
"""
from __future__ import annotations
import json
import time
from pathlib import Path
import fire
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tqdm
from dask.distributed import Client, LocalCluster
from ribs.archives import ArchiveBase, ArchiveDataFrame, GridArchive
from ribs.emitters import EvolutionStrategyEmitter
from ribs.schedulers import Scheduler
from ribs.visualize import grid_archive_heatmap
def simulate(
model: np.ndarray, seed: int | None = None, video_env: gym.Env | None = None
) -> tuple[float, float, float]:
"""Simulates the lunar lander model.
Args:
model: The array of weights for the linear policy.
seed: The seed for the environment.
video_env: If passed in, this will be used instead of creating a new env. This
is used primarily for recording video during evaluation.
Returns:
total_reward: The reward accrued by the lander throughout its trajectory.
impact_x_pos: The x position of the lander when it touches the ground for the
first time.
impact_y_vel: The y velocity of the lander when it touches the ground for the
first time.
"""
if video_env is None:
# Since we are using multiple processes, it is simpler if each worker just
# creates their own copy of the environment instead of trying to share the
# environment. This also makes the function "pure." However, we should use the
# video_env if it is passed in.
env = gym.make("LunarLander-v3")
else:
env = video_env
action_dim = env.action_space.n
obs_dim = env.observation_space.shape[0]
model = model.reshape((action_dim, obs_dim))
total_reward = 0.0
impact_x_pos = None
impact_y_vel = None
all_y_vels = []
obs, _ = env.reset(seed=seed)
done = False
while not done:
action = np.argmax(model @ obs) # Linear policy.
obs, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
total_reward += reward
# Refer to the definition of state here:
# https://gymnasium.farama.org/environments/box2d/lunar_lander/
x_pos = obs[0]
y_vel = obs[3]
leg0_touch = bool(obs[6])
leg1_touch = bool(obs[7])
all_y_vels.append(y_vel)
# Check if the lunar lander is impacting for the first time.
if impact_x_pos is None and (leg0_touch or leg1_touch):
impact_x_pos = x_pos
impact_y_vel = y_vel
# If the lunar lander did not land, set the x-pos to the one from the final
# timestep, and set the y-vel to the max y-vel (we use min since the lander goes
# down).
if impact_x_pos is None:
impact_x_pos = x_pos
impact_y_vel = min(all_y_vels)
# Only close the env if it was not a video env.
if video_env is None:
env.close()
return total_reward, impact_x_pos, impact_y_vel
def create_scheduler(
seed: int | None, n_emitters: int, sigma0: float, batch_size: int
) -> Scheduler:
"""Creates the Scheduler based on given configurations.
See lunar_lander_main() for description of args.
Returns:
A pyribs scheduler set up for CMA-ME (i.e. it has EvolutionStrategyEmitter's and
a GridArchive).
"""
env = gym.make("LunarLander-v3")
action_dim = env.action_space.n
obs_dim = env.observation_space.shape[0]
initial_model = np.zeros((action_dim, obs_dim))
archive = GridArchive(
solution_dim=initial_model.size,
dims=[50, 50], # 50 cells in each dimension.
# (-1, 1) for x-pos and (-3, 0) for y-vel.
ranges=[(-1.0, 1.0), (-3.0, 0.0)],
seed=seed,
qd_score_offset=-600,
)
# If we create the emitters with identical seeds, they will all output the same
# initial solutions. The algorithm should still work -- eventually, the emitters
# will produce different solutions because they get different responses when
# inserting into the archive. However, using different seeds avoids this problem
# altogether.
seeds = (
[None] * n_emitters if seed is None else [seed + i for i in range(n_emitters)]
)
# We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
emitters = [
EvolutionStrategyEmitter(
archive,
x0=initial_model.flatten(),
sigma0=sigma0,
ranker="2imp",
batch_size=batch_size,
seed=s,
)
for s in seeds
]
scheduler = Scheduler(archive, emitters)
return scheduler
def run_search(
client: Client, scheduler: Scheduler, env_seed: int, iterations: int, log_freq: int
) -> dict[str, dict[str, list[int | float]]]:
"""Runs the QD algorithm for the given number of iterations.
Args:
client: A Dask client providing access to workers.
scheduler: pyribs scheduler.
env_seed: Seed for the environment.
iterations: Iterations to run.
log_freq: Number of iterations to wait before recording metrics.
Returns:
A mapping from various metric names to a list of "x" and "y" values where x is
the iteration and y is the value of the metric. Think of each entry as the x's
and y's for a matplotlib plot.
"""
print(
"> Starting search.\n"
" - Open Dask's dashboard at http://localhost:8787 to monitor workers."
)
metrics = {
"Max Score": {
"x": [],
"y": [],
},
"Archive Size": {
"x": [0],
"y": [len(scheduler.archive)],
},
"QD Score": {
"x": [0],
"y": [scheduler.archive.stats.qd_score],
},
}
start_time = time.time()
for itr in tqdm.trange(1, iterations + 1):
# Request models from the scheduler.
sols = scheduler.ask()
# Evaluate the models and record the objectives and measures.
objs, meas = [], []
# Ask the Dask client to distribute the simulations among the Dask workers, then
# gather the results of the simulations.
futures = client.map(lambda model: simulate(model, env_seed), sols)
results = client.gather(futures)
# Process the results.
for obj, impact_x_pos, impact_y_vel in results:
objs.append(obj)
meas.append([impact_x_pos, impact_y_vel])
# Send the results back to the scheduler.
scheduler.tell(objs, meas)
# Logging.
if itr % log_freq == 0 or itr == iterations:
elapsed_time = time.time() - start_time
metrics["Max Score"]["x"].append(itr)
metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
metrics["Archive Size"]["x"].append(itr)
metrics["Archive Size"]["y"].append(len(scheduler.archive))
metrics["QD Score"]["x"].append(itr)
metrics["QD Score"]["y"].append(scheduler.archive.stats.qd_score)
tqdm.tqdm.write(
f"> {itr} itrs completed after {elapsed_time:.2f} s\n"
f" - Max Score: {metrics['Max Score']['y'][-1]}\n"
f" - Archive Size: {metrics['Archive Size']['y'][-1]}\n"
f" - QD Score: {metrics['QD Score']['y'][-1]}"
)
return metrics
def save_heatmap(archive: GridArchive, filename: str | Path) -> None:
"""Saves a heatmap of the scheduler's archive to the filename.
Args:
archive: Archive with results from an experiment.
filename: Path to an image file.
"""
fig, ax = plt.subplots(figsize=(8, 6))
grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
ax.invert_yaxis() # Makes more sense if larger velocities are on top.
ax.set_ylabel("Impact y-velocity")
ax.set_xlabel("Impact x-position")
fig.savefig(filename)
def save_metrics(
outdir: Path, metrics: dict[str, dict[str, list[int | float]]]
) -> None:
"""Saves metrics to png plots and a JSON file.
Args:
outdir: output directory for saving files.
metrics: Metrics as output by run_search.
"""
# Plot metrics.
for metric in metrics:
fig, ax = plt.subplots()
ax.plot(metrics[metric]["x"], metrics[metric]["y"])
ax.set_title(metric)
ax.set_xlabel("Iteration")
fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
# Convert metrics to Python scalars by calling .item(), since each stats value is a
# 0-D array by default, and JSON cannot serialize 0-D arrays.
for metric in metrics:
metrics[metric]["y"] = [
m if isinstance(m, (int, float)) else m.item() for m in metrics[metric]["y"]
]
# Save metrics to JSON.
with (outdir / "metrics.json").open("w") as file:
json.dump(metrics, file, indent=2)
def save_ccdf(archive: ArchiveBase, filename: str | Path) -> None:
"""Saves a CCDF showing the distribution of the archive's objectives.
CCDF = Complementary Cumulative Distribution Function (see
https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
The CCDF plotted here is not normalized to the range (0,1). This may help when
comparing CCDF's among archives with different amounts of coverage (i.e. when one
archive has more cells filled).
Args:
archive: Archive with results from an experiment.
filename: Path to an image file.
"""
fig, ax = plt.subplots()
ax.hist(
archive.data("objective"),
50, # Number of cells.
histtype="step",
density=False,
cumulative=-1, # CCDF rather than CDF.
)
ax.set_xlabel("Objectives")
ax.set_ylabel("Num. Entries")
ax.set_title("Distribution of Archive Objectives")
fig.savefig(filename)
def run_evaluation(outdir: Path, env_seed: int, seed: int | None) -> None:
"""Simulates 10 random archive solutions and saves videos of them.
Videos are saved to outdir / videos.
Args:
outdir: Path object for the output directory from which to retrieve the archive
and save videos.
env_seed: Seed for the environment.
seed: Seed for RNG.
"""
df = ArchiveDataFrame(pd.read_csv(outdir / "archive.csv"))
solutions = df.get_field("solution")
rng = np.random.default_rng(seed)
indices = rng.permutation(len(df))[:10]
# Use a single env so that all the videos go to the same directory.
video_env = gym.wrappers.RecordVideo(
gym.make("LunarLander-v3", render_mode="rgb_array"),
video_folder=str(outdir / "videos"),
# This will ensure all episodes are recorded as videos.
episode_trigger=lambda idx: True, # noqa: ARG005
)
for idx in indices:
model = solutions[idx]
reward, impact_x_pos, impact_y_vel = simulate(model, env_seed, video_env)
print(
f"=== Index {idx} ===\n"
"Model:\n"
f"{model}\n"
f"Reward: {reward}\n"
f"Impact x-pos: {impact_x_pos}\n"
f"Impact y-vel: {impact_y_vel}\n"
)
video_env.close()
def lunar_lander_main(
workers: int = 4,
env_seed: int = 52,
iterations: int = 500,
log_freq: int = 25,
n_emitters: int = 5,
batch_size: int = 30,
sigma0: float = 1.0,
seed: int | None = None,
outdir: str = "lunar_lander_output",
run_eval: bool = False,
) -> None:
"""Uses CMA-ME to train linear agents in Lunar Lander.
Args:
workers: Number of workers to use for simulations.
env_seed: Environment seed. The default gives the flat terrain from the
tutorial.
iterations: Number of iterations to run the algorithm.
log_freq: Number of iterations to wait before recording metrics and saving
heatmap.
n_emitters: Number of emitters.
batch_size: Batch size of each emitter.
sigma0: Initial step size of each emitter.
seed: Random seed for the pyribs components.
outdir: Directory for Lunar Lander output.
run_eval: Pass this flag to run an evaluation of 10 random solutions selected
from the archive in the `outdir`.
"""
outdir: Path = Path(outdir)
if run_eval:
run_evaluation(outdir, env_seed, seed)
return
# Make the directory here so that it is not made when running eval.
outdir.mkdir(exist_ok=True)
# Setup Dask. The client connects to a "cluster" running on this machine. The
# cluster manages concurrent worker processes. If using Dask across many workers, we
# would set up a more complicated cluster and connect the client to it.
cluster = LocalCluster(
processes=True, # Each worker is a process.
n_workers=workers, # Create this many worker processes.
threads_per_worker=1, # Each worker process is single-threaded.
)
client = Client(cluster)
# CMA-ME.
scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
# Outputs.
scheduler.archive.data(return_type="pandas").to_csv(outdir / "archive.csv")
save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
save_metrics(outdir, metrics)
if __name__ == "__main__":
fire.Fire(lunar_lander_main)
|