Lunar Lander Relanded: Using Dask to Distribute Evaluations¶
This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.
1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
2
3Install the following dependencies before running this example -- swig must be
4installed before box2d can be installed, hence it is a separate command:
5 pip install swig
6 pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.29.1 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
7
8This script uses the same setup as the tutorial, but it also uses Dask instead
9of Python's multiprocessing to parallelize evaluations on a single machine and
10adds in a CLI. Refer to the tutorial here:
11https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for more info.
12
13You should not need much familiarity with Dask to read this example. However, if
14you would like to know more about Dask, we recommend referring to the quickstart
15for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
16
17This script creates an output directory (defaults to `lunar_lander_output/`, see
18the --outdir flag) with the following files:
19
20 - archive.csv: The CSV representation of the final archive, obtained with
21 data().
22 - archive_ccdf.png: A plot showing the (unnormalized) complementary
23 cumulative distribution function of objectives in the archive. For
24 each objective p on the x-axis, this plot shows the number of
25 solutions that had an objective of at least p.
26 - heatmap.png: A heatmap showing the performance of solutions in the
27 archive.
28 - metrics.json: Metrics about the run, saved as a mapping from the metric
29 name to a list of x values (iteration number) and a list of y values
30 (metric value) for that metric.
31 - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
32 `max_score`.
33
34In evaluation mode (--run-eval flag), the script will read in the archive from
35the output directory and simulate 10 random solutions from the archive. It will
36write videos of these simulations to a `videos/` subdirectory in the output
37directory.
38
39Usage:
40 # Basic usage - should take ~1 hour with 4 cores.
41 python lunar_lander.py NUM_WORKERS
42 # Now open the Dask dashboard at http://localhost:8787 to view worker
43 # status.
44
45 # Evaluation mode. If you passed a different outdir and/or env_seed when
46 # running the algorithm with the command above, you must pass the same
47 # outdir and/or env_seed here.
48 python lunar_lander.py --run-eval
49Help:
50 python lunar_lander.py --help
51"""
52import json
53import time
54from pathlib import Path
55
56import fire
57import gymnasium as gym
58import matplotlib.pyplot as plt
59import numpy as np
60import pandas as pd
61import tqdm
62from dask.distributed import Client, LocalCluster
63
64from ribs.archives import ArchiveDataFrame, GridArchive
65from ribs.emitters import EvolutionStrategyEmitter
66from ribs.schedulers import Scheduler
67from ribs.visualize import grid_archive_heatmap
68
69
70def simulate(model, seed=None, video_env=None):
71 """Simulates the lunar lander model.
72
73 Args:
74 model (np.ndarray): The array of weights for the linear policy.
75 seed (int): The seed for the environment.
76 video_env (gym.Env): If passed in, this will be used instead of creating
77 a new env. This is used primarily for recording video during
78 evaluation.
79 Returns:
80 total_reward (float): The reward accrued by the lander throughout its
81 trajectory.
82 impact_x_pos (float): The x position of the lander when it touches the
83 ground for the first time.
84 impact_y_vel (float): The y velocity of the lander when it touches the
85 ground for the first time.
86 """
87 if video_env is None:
88 # Since we are using multiple processes, it is simpler if each worker
89 # just creates their own copy of the environment instead of trying to
90 # share the environment. This also makes the function "pure." However,
91 # we should use the video_env if it is passed in.
92 env = gym.make("LunarLander-v2")
93 else:
94 env = video_env
95
96 action_dim = env.action_space.n
97 obs_dim = env.observation_space.shape[0]
98 model = model.reshape((action_dim, obs_dim))
99
100 total_reward = 0.0
101 impact_x_pos = None
102 impact_y_vel = None
103 all_y_vels = []
104 obs, _ = env.reset(seed=seed)
105 done = False
106
107 while not done:
108 action = np.argmax(model @ obs) # Linear policy.
109 obs, reward, terminated, truncated, _ = env.step(action)
110 done = terminated or truncated
111 total_reward += reward
112
113 # Refer to the definition of state here:
114 # https://gymnasium.farama.org/environments/box2d/lunar_lander/
115 x_pos = obs[0]
116 y_vel = obs[3]
117 leg0_touch = bool(obs[6])
118 leg1_touch = bool(obs[7])
119 all_y_vels.append(y_vel)
120
121 # Check if the lunar lander is impacting for the first time.
122 if impact_x_pos is None and (leg0_touch or leg1_touch):
123 impact_x_pos = x_pos
124 impact_y_vel = y_vel
125
126 # If the lunar lander did not land, set the x-pos to the one from the final
127 # timestep, and set the y-vel to the max y-vel (we use min since the lander
128 # goes down).
129 if impact_x_pos is None:
130 impact_x_pos = x_pos
131 impact_y_vel = min(all_y_vels)
132
133 # Only close the env if it was not a video env.
134 if video_env is None:
135 env.close()
136
137 return total_reward, impact_x_pos, impact_y_vel
138
139
140def create_scheduler(seed, n_emitters, sigma0, batch_size):
141 """Creates the Scheduler based on given configurations.
142
143 See lunar_lander_main() for description of args.
144
145 Returns:
146 A pyribs scheduler set up for CMA-ME (i.e. it has
147 EvolutionStrategyEmitter's and a GridArchive).
148 """
149 env = gym.make("LunarLander-v2")
150 action_dim = env.action_space.n
151 obs_dim = env.observation_space.shape[0]
152 initial_model = np.zeros((action_dim, obs_dim))
153 archive = GridArchive(
154 solution_dim=initial_model.size,
155 dims=[50, 50], # 50 cells in each dimension.
156 # (-1, 1) for x-pos and (-3, 0) for y-vel.
157 ranges=[(-1.0, 1.0), (-3.0, 0.0)],
158 seed=seed,
159 qd_score_offset=-600,
160 )
161
162 # If we create the emitters with identical seeds, they will all output the
163 # same initial solutions. The algorithm should still work -- eventually, the
164 # emitters will produce different solutions because they get different
165 # responses when inserting into the archive. However, using different seeds
166 # avoids this problem altogether.
167 seeds = ([None] * n_emitters
168 if seed is None else [seed + i for i in range(n_emitters)])
169
170 # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
171 emitters = [
172 EvolutionStrategyEmitter(
173 archive,
174 x0=initial_model.flatten(),
175 sigma0=sigma0,
176 ranker="2imp",
177 batch_size=batch_size,
178 seed=s,
179 ) for s in seeds
180 ]
181
182 scheduler = Scheduler(archive, emitters)
183 return scheduler
184
185
186def run_search(client, scheduler, env_seed, iterations, log_freq):
187 """Runs the QD algorithm for the given number of iterations.
188
189 Args:
190 client (Client): A Dask client providing access to workers.
191 scheduler (Scheduler): pyribs scheduler.
192 env_seed (int): Seed for the environment.
193 iterations (int): Iterations to run.
194 log_freq (int): Number of iterations to wait before recording metrics.
195 Returns:
196 dict: A mapping from various metric names to a list of "x" and "y"
197 values where x is the iteration and y is the value of the metric. Think
198 of each entry as the x's and y's for a matplotlib plot.
199 """
200 print(
201 "> Starting search.\n"
202 " - Open Dask's dashboard at http://localhost:8787 to monitor workers."
203 )
204
205 metrics = {
206 "Max Score": {
207 "x": [],
208 "y": [],
209 },
210 "Archive Size": {
211 "x": [0],
212 "y": [0],
213 },
214 "QD Score": {
215 "x": [0],
216 "y": [0],
217 },
218 }
219
220 start_time = time.time()
221 for itr in tqdm.trange(1, iterations + 1):
222 # Request models from the scheduler.
223 sols = scheduler.ask()
224
225 # Evaluate the models and record the objectives and measures.
226 objs, meas = [], []
227
228 # Ask the Dask client to distribute the simulations among the Dask
229 # workers, then gather the results of the simulations.
230 futures = client.map(lambda model: simulate(model, env_seed), sols)
231 results = client.gather(futures)
232
233 # Process the results.
234 for obj, impact_x_pos, impact_y_vel in results:
235 objs.append(obj)
236 meas.append([impact_x_pos, impact_y_vel])
237
238 # Send the results back to the scheduler.
239 scheduler.tell(objs, meas)
240
241 # Logging.
242 if itr % log_freq == 0 or itr == iterations:
243 elapsed_time = time.time() - start_time
244 metrics["Max Score"]["x"].append(itr)
245 metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
246 metrics["Archive Size"]["x"].append(itr)
247 metrics["Archive Size"]["y"].append(len(scheduler.archive))
248 metrics["QD Score"]["x"].append(itr)
249 metrics["QD Score"]["y"].append(scheduler.archive.stats.qd_score)
250 tqdm.tqdm.write(
251 f"> {itr} itrs completed after {elapsed_time:.2f} s\n"
252 f" - Max Score: {metrics['Max Score']['y'][-1]}\n"
253 f" - Archive Size: {metrics['Archive Size']['y'][-1]}\n"
254 f" - QD Score: {metrics['QD Score']['y'][-1]}")
255
256 return metrics
257
258
259def save_heatmap(archive, filename):
260 """Saves a heatmap of the scheduler's archive to the filename.
261
262 Args:
263 archive (GridArchive): Archive with results from an experiment.
264 filename (str): Path to an image file.
265 """
266 fig, ax = plt.subplots(figsize=(8, 6))
267 grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
268 ax.invert_yaxis() # Makes more sense if larger velocities are on top.
269 ax.set_ylabel("Impact y-velocity")
270 ax.set_xlabel("Impact x-position")
271 fig.savefig(filename)
272
273
274def save_metrics(outdir, metrics):
275 """Saves metrics to png plots and a JSON file.
276
277 Args:
278 outdir (Path): output directory for saving files.
279 metrics (dict): Metrics as output by run_search.
280 """
281 # Plots.
282 for metric in metrics:
283 fig, ax = plt.subplots()
284 ax.plot(metrics[metric]["x"], metrics[metric]["y"])
285 ax.set_title(metric)
286 ax.set_xlabel("Iteration")
287 fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
288
289 # JSON file.
290 with (outdir / "metrics.json").open("w") as file:
291 json.dump(metrics, file, indent=2)
292
293
294def save_ccdf(archive, filename):
295 """Saves a CCDF showing the distribution of the archive's objectives.
296
297 CCDF = Complementary Cumulative Distribution Function (see
298 https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
299 The CCDF plotted here is not normalized to the range (0,1). This may help
300 when comparing CCDF's among archives with different amounts of coverage
301 (i.e. when one archive has more cells filled).
302
303 Args:
304 archive (GridArchive): Archive with results from an experiment.
305 filename (str): Path to an image file.
306 """
307 fig, ax = plt.subplots()
308 ax.hist(
309 archive.data("objective"),
310 50, # Number of cells.
311 histtype="step",
312 density=False,
313 cumulative=-1) # CCDF rather than CDF.
314 ax.set_xlabel("Objectives")
315 ax.set_ylabel("Num. Entries")
316 ax.set_title("Distribution of Archive Objectives")
317 fig.savefig(filename)
318
319
320def run_evaluation(outdir, env_seed):
321 """Simulates 10 random archive solutions and saves videos of them.
322
323 Videos are saved to outdir / videos.
324
325 Args:
326 outdir (Path): Path object for the output directory from which to
327 retrieve the archive and save videos.
328 env_seed (int): Seed for the environment.
329 """
330 df = ArchiveDataFrame(pd.read_csv(outdir / "archive.csv"))
331 solutions = df.get_field("solution")
332 indices = np.random.permutation(len(df))[:10]
333
334 # Use a single env so that all the videos go to the same directory.
335 video_env = gym.wrappers.RecordVideo(
336 gym.make("LunarLander-v2", render_mode="rgb_array"),
337 video_folder=str(outdir / "videos"),
338 # This will ensure all episodes are recorded as videos.
339 episode_trigger=lambda idx: True,
340 )
341
342 for idx in indices:
343 model = solutions[idx]
344 reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
345 video_env)
346 print(f"=== Index {idx} ===\n"
347 "Model:\n"
348 f"{model}\n"
349 f"Reward: {reward}\n"
350 f"Impact x-pos: {impact_x_pos}\n"
351 f"Impact y-vel: {impact_y_vel}\n")
352
353 video_env.close()
354
355
356def lunar_lander_main(workers=4,
357 env_seed=52,
358 iterations=500,
359 log_freq=25,
360 n_emitters=5,
361 batch_size=30,
362 sigma0=1.0,
363 seed=None,
364 outdir="lunar_lander_output",
365 run_eval=False):
366 """Uses CMA-ME to train linear agents in Lunar Lander.
367
368 Args:
369 workers (int): Number of workers to use for simulations.
370 env_seed (int): Environment seed. The default gives the flat terrain
371 from the tutorial.
372 iterations (int): Number of iterations to run the algorithm.
373 log_freq (int): Number of iterations to wait before recording metrics
374 and saving heatmap.
375 n_emitters (int): Number of emitters.
376 batch_size (int): Batch size of each emitter.
377 sigma0 (float): Initial step size of each emitter.
378 seed (seed): Random seed for the pyribs components.
379 outdir (str): Directory for Lunar Lander output.
380 run_eval (bool): Pass this flag to run an evaluation of 10 random
381 solutions selected from the archive in the `outdir`.
382 """
383 outdir = Path(outdir)
384
385 if run_eval:
386 run_evaluation(outdir, env_seed)
387 return
388
389 # Make the directory here so that it is not made when running eval.
390 outdir.mkdir(exist_ok=True)
391
392 # Setup Dask. The client connects to a "cluster" running on this machine.
393 # The cluster simply manages several concurrent worker processes. If using
394 # Dask across many workers, we would set up a more complicated cluster and
395 # connect the client to it.
396 cluster = LocalCluster(
397 processes=True, # Each worker is a process.
398 n_workers=workers, # Create this many worker processes.
399 threads_per_worker=1, # Each worker process is single-threaded.
400 )
401 client = Client(cluster)
402
403 # CMA-ME.
404 scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
405 metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
406
407 # Outputs.
408 scheduler.archive.data(return_type="pandas").to_csv(outdir / "archive.csv")
409 save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
410 save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
411 save_metrics(outdir, metrics)
412
413
414if __name__ == "__main__":
415 fire.Fire(lunar_lander_main)