Lunar Lander Relanded: Using Dask to Distribute Evaluations¶
This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.
1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
2
3Install the following dependencies before running this example:
4 pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.27.0 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
5
6This script uses the same setup as the tutorial, but it also uses Dask to
7parallelize evaluations on a single machine and adds in a CLI. Refer to the
8tutorial here: https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for
9more info.
10
11You should not need much familiarity with Dask to read this example. However, if
12you would like to know more about Dask, we recommend referring to the quickstart
13for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
14
15This script creates an output directory (defaults to `lunar_lander_output/`, see
16the --outdir flag) with the following files:
17
18 - archive.csv: The CSV representation of the final archive, obtained with
19 as_pandas().
20 - archive_ccdf.png: A plot showing the (unnormalized) complementary
21 cumulative distribution function of objectives in the archive. For
22 each objective p on the x-axis, this plot shows the number of
23 solutions that had an objective of at least p.
24 - heatmap.png: A heatmap showing the performance of solutions in the
25 archive.
26 - metrics.json: Metrics about the run, saved as a mapping from the metric
27 name to a list of x values (iteration number) and a list of y values
28 (metric value) for that metric.
29 - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
30 `max_score`.
31
32In evaluation mode (--run-eval flag), the script will read in the archive from
33the output directory and simulate 10 random solutions from the archive. It will
34write videos of these simulations to a `videos/` subdirectory in the output
35directory.
36
37Usage:
38 # Basic usage - should take ~1 hour with 4 cores.
39 python lunar_lander.py NUM_WORKERS
40 # Now open the Dask dashboard at http://localhost:8787 to view worker
41 # status.
42
43 # Evaluation mode. If you passed a different outdir and/or env_seed when
44 # running the algorithm with the command above, you must pass the same
45 # outdir and/or env_seed here.
46 python lunar_lander.py --run-eval
47Help:
48 python lunar_lander.py --help
49"""
50import json
51import time
52from pathlib import Path
53
54import fire
55import gymnasium as gym
56import matplotlib.pyplot as plt
57import numpy as np
58import pandas as pd
59import tqdm
60from dask.distributed import Client, LocalCluster
61
62from ribs.archives import GridArchive
63from ribs.emitters import EvolutionStrategyEmitter
64from ribs.schedulers import Scheduler
65from ribs.visualize import grid_archive_heatmap
66
67
68def simulate(model, seed=None, video_env=None):
69 """Simulates the lunar lander model.
70
71 Note that this function has some slight differences from the simulate()
72 function in the lunar lander tutorial; e.g., it creates an environment
73 rather than taking the environment as an argument.
74
75 Args:
76 model (np.ndarray): The array of weights for the linear policy.
77 seed (int): The seed for the environment.
78 video_env (gym.Env): If passed in, this will be used instead of creating
79 a new env. This is used primarily for recording video during
80 evaluation.
81 Returns:
82 total_reward (float): The reward accrued by the lander throughout its
83 trajectory.
84 impact_x_pos (float): The x position of the lander when it touches the
85 ground for the first time.
86 impact_y_vel (float): The y velocity of the lander when it touches the
87 ground for the first time.
88 """
89 if video_env is None:
90 # Since we are using multiple processes, it is simpler if each worker
91 # just creates their own copy of the environment instead of trying to
92 # share the environment. This also makes the function "pure."
93 env = gym.make("LunarLander-v2")
94 else:
95 env = video_env
96
97 action_dim = env.action_space.n
98 obs_dim = env.observation_space.shape[0]
99 model = model.reshape((action_dim, obs_dim))
100
101 total_reward = 0.0
102 impact_x_pos = None
103 impact_y_vel = None
104 all_y_vels = []
105 obs, _ = env.reset(seed=seed)
106 done = False
107
108 while not done:
109 action = np.argmax(model @ obs) # Linear policy.
110 obs, reward, terminated, truncated, _ = env.step(action)
111 done = terminated or truncated
112 total_reward += reward
113
114 # Refer to the definition of state here:
115 # https://gymnasium.farama.org/environments/box2d/lunar_lander/
116 x_pos = obs[0]
117 y_vel = obs[3]
118 leg0_touch = bool(obs[6])
119 leg1_touch = bool(obs[7])
120 all_y_vels.append(y_vel)
121
122 # Check if the lunar lander is impacting for the first time.
123 if impact_x_pos is None and (leg0_touch or leg1_touch):
124 impact_x_pos = x_pos
125 impact_y_vel = y_vel
126
127 # If the lunar lander did not land, set the x-pos to the one from the final
128 # timestep, and set the y-vel to the max y-vel (we use min since the lander
129 # goes down).
130 if impact_x_pos is None:
131 impact_x_pos = x_pos
132 impact_y_vel = min(all_y_vels)
133
134 # Only close the env if it was not a video env.
135 if video_env is None:
136 env.close()
137
138 return total_reward, impact_x_pos, impact_y_vel
139
140
141def create_scheduler(seed, n_emitters, sigma0, batch_size):
142 """Creates the Scheduler based on given configurations.
143
144 See lunar_lander_main() for description of args.
145
146 Returns:
147 A pyribs scheduler set up for CMA-ME (i.e. it has
148 EvolutionStrategyEmitter's and a GridArchive).
149 """
150 env = gym.make("LunarLander-v2")
151 action_dim = env.action_space.n
152 obs_dim = env.observation_space.shape[0]
153 initial_model = np.zeros((action_dim, obs_dim))
154 archive = GridArchive(
155 solution_dim=initial_model.size,
156 dims=[50, 50], # 50 cells in each dimension.
157 # (-1, 1) for x-pos and (-3, 0) for y-vel.
158 ranges=[(-1.0, 1.0), (-3.0, 0.0)],
159 seed=seed)
160
161 # If we create the emitters with identical seeds, they will all output the
162 # same initial solutions. The algorithm should still work -- eventually, the
163 # emitters will produce different solutions because they get different
164 # responses when inserting into the archive. However, using different seeds
165 # avoids this problem altogether.
166 seeds = ([None] * n_emitters
167 if seed is None else [seed + i for i in range(n_emitters)])
168
169 # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
170 emitters = [
171 EvolutionStrategyEmitter(
172 archive,
173 x0=initial_model.flatten(),
174 sigma0=sigma0,
175 ranker="2imp",
176 batch_size=batch_size,
177 seed=s,
178 ) for s in seeds
179 ]
180
181 scheduler = Scheduler(archive, emitters)
182 return scheduler
183
184
185def run_search(client, scheduler, env_seed, iterations, log_freq):
186 """Runs the QD algorithm for the given number of iterations.
187
188 Args:
189 client (Client): A Dask client providing access to workers.
190 scheduler (Scheduler): pyribs scheduler.
191 env_seed (int): Seed for the environment.
192 iterations (int): Iterations to run.
193 log_freq (int): Number of iterations to wait before recording metrics.
194 Returns:
195 dict: A mapping from various metric names to a list of "x" and "y"
196 values where x is the iteration and y is the value of the metric. Think
197 of each entry as the x's and y's for a matplotlib plot.
198 """
199 print(
200 "> Starting search.\n"
201 " - Open Dask's dashboard at http://localhost:8787 to monitor workers."
202 )
203
204 metrics = {
205 "Max Score": {
206 "x": [],
207 "y": [],
208 },
209 "Archive Size": {
210 "x": [0],
211 "y": [0],
212 },
213 }
214
215 start_time = time.time()
216 for itr in tqdm.trange(1, iterations + 1):
217 # Request models from the scheduler.
218 sols = scheduler.ask()
219
220 # Evaluate the models and record the objectives and measures.
221 objs, meas = [], []
222
223 # Ask the Dask client to distribute the simulations among the Dask
224 # workers, then gather the results of the simulations.
225 futures = client.map(lambda model: simulate(model, env_seed), sols)
226 results = client.gather(futures)
227
228 # Process the results.
229 for obj, impact_x_pos, impact_y_vel in results:
230 objs.append(obj)
231 meas.append([impact_x_pos, impact_y_vel])
232
233 # Send the results back to the scheduler.
234 scheduler.tell(objs, meas)
235
236 # Logging.
237 if itr % log_freq == 0 or itr == iterations:
238 elapsed_time = time.time() - start_time
239 metrics["Max Score"]["x"].append(itr)
240 metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
241 metrics["Archive Size"]["x"].append(itr)
242 metrics["Archive Size"]["y"].append(len(scheduler.archive))
243 print(f"> {itr} itrs completed after {elapsed_time:.2f} s")
244 print(f" - Max Score: {metrics['Max Score']['y'][-1]}")
245 print(f" - Archive Size: {metrics['Archive Size']['y'][-1]}")
246
247 return metrics
248
249
250def save_heatmap(archive, filename):
251 """Saves a heatmap of the scheduler's archive to the filename.
252
253 Args:
254 archive (GridArchive): Archive with results from an experiment.
255 filename (str): Path to an image file.
256 """
257 fig, ax = plt.subplots(figsize=(8, 6))
258 grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
259 ax.invert_yaxis() # Makes more sense if larger velocities are on top.
260 ax.set_ylabel("Impact y-velocity")
261 ax.set_xlabel("Impact x-position")
262 fig.savefig(filename)
263
264
265def save_metrics(outdir, metrics):
266 """Saves metrics to png plots and a JSON file.
267
268 Args:
269 outdir (Path): output directory for saving files.
270 metrics (dict): Metrics as output by run_search.
271 """
272 # Plots.
273 for metric in metrics:
274 fig, ax = plt.subplots()
275 ax.plot(metrics[metric]["x"], metrics[metric]["y"])
276 ax.set_title(metric)
277 ax.set_xlabel("Iteration")
278 fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
279
280 # JSON file.
281 with (outdir / "metrics.json").open("w") as file:
282 json.dump(metrics, file, indent=2)
283
284
285def save_ccdf(archive, filename):
286 """Saves a CCDF showing the distribution of the archive's objectives.
287
288 CCDF = Complementary Cumulative Distribution Function (see
289 https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
290 The CCDF plotted here is not normalized to the range (0,1). This may help
291 when comparing CCDF's among archives with different amounts of coverage
292 (i.e. when one archive has more cells filled).
293
294 Args:
295 archive (GridArchive): Archive with results from an experiment.
296 filename (str): Path to an image file.
297 """
298 fig, ax = plt.subplots()
299 ax.hist(
300 archive.as_pandas(include_solutions=False)["objective"],
301 50, # Number of cells.
302 histtype="step",
303 density=False,
304 cumulative=-1) # CCDF rather than CDF.
305 ax.set_xlabel("Objectives")
306 ax.set_ylabel("Num. Entries")
307 ax.set_title("Distribution of Archive Objectives")
308 fig.savefig(filename)
309
310
311def run_evaluation(outdir, env_seed):
312 """Simulates 10 random archive solutions and saves videos of them.
313
314 Videos are saved to outdir / videos.
315
316 Args:
317 outdir (Path): Path object for the output directory from which to
318 retrieve the archive and save videos.
319 env_seed (int): Seed for the environment.
320 """
321 df = pd.read_csv(outdir / "archive.csv")
322 indices = np.random.permutation(len(df))[:10]
323
324 # Use a single env so that all the videos go to the same directory.
325 video_env = gym.wrappers.RecordVideo(
326 gym.make("LunarLander-v2", render_mode="rgb_array"),
327 video_folder=str(outdir / "videos"),
328 # This will ensure all episodes are recorded as videos.
329 episode_trigger=lambda idx: True,
330 )
331
332 for idx in indices:
333 model = np.array(df.loc[idx, "solution_0":])
334 reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
335 video_env)
336 print(f"=== Index {idx} ===\n"
337 "Model:\n"
338 f"{model}\n"
339 f"Reward: {reward}\n"
340 f"Impact x-pos: {impact_x_pos}\n"
341 f"Impact y-vel: {impact_y_vel}\n")
342
343 video_env.close()
344
345
346def lunar_lander_main(workers=4,
347 env_seed=52,
348 iterations=500,
349 log_freq=25,
350 n_emitters=5,
351 batch_size=30,
352 sigma0=1.0,
353 seed=None,
354 outdir="lunar_lander_output",
355 run_eval=False):
356 """Uses CMA-ME to train linear agents in Lunar Lander.
357
358 Args:
359 workers (int): Number of workers to use for simulations.
360 env_seed (int): Environment seed. The default gives the flat terrain
361 from the tutorial.
362 iterations (int): Number of iterations to run the algorithm.
363 log_freq (int): Number of iterations to wait before recording metrics
364 and saving heatmap.
365 n_emitters (int): Number of emitters.
366 batch_size (int): Batch size of each emitter.
367 sigma0 (float): Initial step size of each emitter.
368 seed (seed): Random seed for the pyribs components.
369 outdir (str): Directory for Lunar Lander output.
370 run_eval (bool): Pass this flag to run an evaluation of 10 random
371 solutions selected from the archive in the `outdir`.
372 """
373 outdir = Path(outdir)
374
375 if run_eval:
376 run_evaluation(outdir, env_seed)
377 return
378
379 # Make the directory here so that it is not made when running eval.
380 outdir.mkdir(exist_ok=True)
381
382 # Setup Dask. The client connects to a "cluster" running on this machine.
383 # The cluster simply manages several concurrent worker processes. If using
384 # Dask across many workers, we would set up a more complicated cluster and
385 # connect the client to it.
386 cluster = LocalCluster(
387 processes=True, # Each worker is a process.
388 n_workers=workers, # Create this many worker processes.
389 threads_per_worker=1, # Each worker process is single-threaded.
390 )
391 client = Client(cluster)
392
393 # CMA-ME.
394 scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
395 metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
396
397 # Outputs.
398 scheduler.archive.as_pandas().to_csv(outdir / "archive.csv")
399 save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
400 save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
401 save_metrics(outdir, metrics)
402
403
404if __name__ == "__main__":
405 fire.Fire(lunar_lander_main)