Lunar Lander Relanded: Using Dask to Distribute Evaluations¶
This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.
1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
2
3Install the following dependencies before running this example:
4 pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.27.0 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
5
6This script uses the same setup as the tutorial, but it also uses Dask instead
7of Python's multiprocessing to parallelize evaluations on a single machine and
8adds in a CLI. Refer to the tutorial here:
9https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for more info.
10
11You should not need much familiarity with Dask to read this example. However, if
12you would like to know more about Dask, we recommend referring to the quickstart
13for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
14
15This script creates an output directory (defaults to `lunar_lander_output/`, see
16the --outdir flag) with the following files:
17
18 - archive.csv: The CSV representation of the final archive, obtained with
19 as_pandas().
20 - archive_ccdf.png: A plot showing the (unnormalized) complementary
21 cumulative distribution function of objectives in the archive. For
22 each objective p on the x-axis, this plot shows the number of
23 solutions that had an objective of at least p.
24 - heatmap.png: A heatmap showing the performance of solutions in the
25 archive.
26 - metrics.json: Metrics about the run, saved as a mapping from the metric
27 name to a list of x values (iteration number) and a list of y values
28 (metric value) for that metric.
29 - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
30 `max_score`.
31
32In evaluation mode (--run-eval flag), the script will read in the archive from
33the output directory and simulate 10 random solutions from the archive. It will
34write videos of these simulations to a `videos/` subdirectory in the output
35directory.
36
37Usage:
38 # Basic usage - should take ~1 hour with 4 cores.
39 python lunar_lander.py NUM_WORKERS
40 # Now open the Dask dashboard at http://localhost:8787 to view worker
41 # status.
42
43 # Evaluation mode. If you passed a different outdir and/or env_seed when
44 # running the algorithm with the command above, you must pass the same
45 # outdir and/or env_seed here.
46 python lunar_lander.py --run-eval
47Help:
48 python lunar_lander.py --help
49"""
50import json
51import time
52from pathlib import Path
53
54import fire
55import gymnasium as gym
56import matplotlib.pyplot as plt
57import numpy as np
58import pandas as pd
59import tqdm
60from dask.distributed import Client, LocalCluster
61
62from ribs.archives import GridArchive
63from ribs.emitters import EvolutionStrategyEmitter
64from ribs.schedulers import Scheduler
65from ribs.visualize import grid_archive_heatmap
66
67
68def simulate(model, seed=None, video_env=None):
69 """Simulates the lunar lander model.
70
71 Args:
72 model (np.ndarray): The array of weights for the linear policy.
73 seed (int): The seed for the environment.
74 video_env (gym.Env): If passed in, this will be used instead of creating
75 a new env. This is used primarily for recording video during
76 evaluation.
77 Returns:
78 total_reward (float): The reward accrued by the lander throughout its
79 trajectory.
80 impact_x_pos (float): The x position of the lander when it touches the
81 ground for the first time.
82 impact_y_vel (float): The y velocity of the lander when it touches the
83 ground for the first time.
84 """
85 if video_env is None:
86 # Since we are using multiple processes, it is simpler if each worker
87 # just creates their own copy of the environment instead of trying to
88 # share the environment. This also makes the function "pure." However,
89 # we should use the video_env if it is passed in.
90 env = gym.make("LunarLander-v2")
91 else:
92 env = video_env
93
94 action_dim = env.action_space.n
95 obs_dim = env.observation_space.shape[0]
96 model = model.reshape((action_dim, obs_dim))
97
98 total_reward = 0.0
99 impact_x_pos = None
100 impact_y_vel = None
101 all_y_vels = []
102 obs, _ = env.reset(seed=seed)
103 done = False
104
105 while not done:
106 action = np.argmax(model @ obs) # Linear policy.
107 obs, reward, terminated, truncated, _ = env.step(action)
108 done = terminated or truncated
109 total_reward += reward
110
111 # Refer to the definition of state here:
112 # https://gymnasium.farama.org/environments/box2d/lunar_lander/
113 x_pos = obs[0]
114 y_vel = obs[3]
115 leg0_touch = bool(obs[6])
116 leg1_touch = bool(obs[7])
117 all_y_vels.append(y_vel)
118
119 # Check if the lunar lander is impacting for the first time.
120 if impact_x_pos is None and (leg0_touch or leg1_touch):
121 impact_x_pos = x_pos
122 impact_y_vel = y_vel
123
124 # If the lunar lander did not land, set the x-pos to the one from the final
125 # timestep, and set the y-vel to the max y-vel (we use min since the lander
126 # goes down).
127 if impact_x_pos is None:
128 impact_x_pos = x_pos
129 impact_y_vel = min(all_y_vels)
130
131 # Only close the env if it was not a video env.
132 if video_env is None:
133 env.close()
134
135 return total_reward, impact_x_pos, impact_y_vel
136
137
138def create_scheduler(seed, n_emitters, sigma0, batch_size):
139 """Creates the Scheduler based on given configurations.
140
141 See lunar_lander_main() for description of args.
142
143 Returns:
144 A pyribs scheduler set up for CMA-ME (i.e. it has
145 EvolutionStrategyEmitter's and a GridArchive).
146 """
147 env = gym.make("LunarLander-v2")
148 action_dim = env.action_space.n
149 obs_dim = env.observation_space.shape[0]
150 initial_model = np.zeros((action_dim, obs_dim))
151 archive = GridArchive(
152 solution_dim=initial_model.size,
153 dims=[50, 50], # 50 cells in each dimension.
154 # (-1, 1) for x-pos and (-3, 0) for y-vel.
155 ranges=[(-1.0, 1.0), (-3.0, 0.0)],
156 seed=seed)
157
158 # If we create the emitters with identical seeds, they will all output the
159 # same initial solutions. The algorithm should still work -- eventually, the
160 # emitters will produce different solutions because they get different
161 # responses when inserting into the archive. However, using different seeds
162 # avoids this problem altogether.
163 seeds = ([None] * n_emitters
164 if seed is None else [seed + i for i in range(n_emitters)])
165
166 # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
167 emitters = [
168 EvolutionStrategyEmitter(
169 archive,
170 x0=initial_model.flatten(),
171 sigma0=sigma0,
172 ranker="2imp",
173 batch_size=batch_size,
174 seed=s,
175 ) for s in seeds
176 ]
177
178 scheduler = Scheduler(archive, emitters)
179 return scheduler
180
181
182def run_search(client, scheduler, env_seed, iterations, log_freq):
183 """Runs the QD algorithm for the given number of iterations.
184
185 Args:
186 client (Client): A Dask client providing access to workers.
187 scheduler (Scheduler): pyribs scheduler.
188 env_seed (int): Seed for the environment.
189 iterations (int): Iterations to run.
190 log_freq (int): Number of iterations to wait before recording metrics.
191 Returns:
192 dict: A mapping from various metric names to a list of "x" and "y"
193 values where x is the iteration and y is the value of the metric. Think
194 of each entry as the x's and y's for a matplotlib plot.
195 """
196 print(
197 "> Starting search.\n"
198 " - Open Dask's dashboard at http://localhost:8787 to monitor workers."
199 )
200
201 metrics = {
202 "Max Score": {
203 "x": [],
204 "y": [],
205 },
206 "Archive Size": {
207 "x": [0],
208 "y": [0],
209 },
210 }
211
212 start_time = time.time()
213 for itr in tqdm.trange(1, iterations + 1):
214 # Request models from the scheduler.
215 sols = scheduler.ask()
216
217 # Evaluate the models and record the objectives and measures.
218 objs, meas = [], []
219
220 # Ask the Dask client to distribute the simulations among the Dask
221 # workers, then gather the results of the simulations.
222 futures = client.map(lambda model: simulate(model, env_seed), sols)
223 results = client.gather(futures)
224
225 # Process the results.
226 for obj, impact_x_pos, impact_y_vel in results:
227 objs.append(obj)
228 meas.append([impact_x_pos, impact_y_vel])
229
230 # Send the results back to the scheduler.
231 scheduler.tell(objs, meas)
232
233 # Logging.
234 if itr % log_freq == 0 or itr == iterations:
235 elapsed_time = time.time() - start_time
236 metrics["Max Score"]["x"].append(itr)
237 metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
238 metrics["Archive Size"]["x"].append(itr)
239 metrics["Archive Size"]["y"].append(len(scheduler.archive))
240 tqdm.tqdm.write(
241 f"> {itr} itrs completed after {elapsed_time:.2f} s\n"
242 f" - Max Score: {metrics['Max Score']['y'][-1]}\n"
243 f" - Archive Size: {metrics['Archive Size']['y'][-1]}")
244
245 return metrics
246
247
248def save_heatmap(archive, filename):
249 """Saves a heatmap of the scheduler's archive to the filename.
250
251 Args:
252 archive (GridArchive): Archive with results from an experiment.
253 filename (str): Path to an image file.
254 """
255 fig, ax = plt.subplots(figsize=(8, 6))
256 grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
257 ax.invert_yaxis() # Makes more sense if larger velocities are on top.
258 ax.set_ylabel("Impact y-velocity")
259 ax.set_xlabel("Impact x-position")
260 fig.savefig(filename)
261
262
263def save_metrics(outdir, metrics):
264 """Saves metrics to png plots and a JSON file.
265
266 Args:
267 outdir (Path): output directory for saving files.
268 metrics (dict): Metrics as output by run_search.
269 """
270 # Plots.
271 for metric in metrics:
272 fig, ax = plt.subplots()
273 ax.plot(metrics[metric]["x"], metrics[metric]["y"])
274 ax.set_title(metric)
275 ax.set_xlabel("Iteration")
276 fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
277
278 # JSON file.
279 with (outdir / "metrics.json").open("w") as file:
280 json.dump(metrics, file, indent=2)
281
282
283def save_ccdf(archive, filename):
284 """Saves a CCDF showing the distribution of the archive's objectives.
285
286 CCDF = Complementary Cumulative Distribution Function (see
287 https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
288 The CCDF plotted here is not normalized to the range (0,1). This may help
289 when comparing CCDF's among archives with different amounts of coverage
290 (i.e. when one archive has more cells filled).
291
292 Args:
293 archive (GridArchive): Archive with results from an experiment.
294 filename (str): Path to an image file.
295 """
296 fig, ax = plt.subplots()
297 ax.hist(
298 archive.as_pandas(include_solutions=False)["objective"],
299 50, # Number of cells.
300 histtype="step",
301 density=False,
302 cumulative=-1) # CCDF rather than CDF.
303 ax.set_xlabel("Objectives")
304 ax.set_ylabel("Num. Entries")
305 ax.set_title("Distribution of Archive Objectives")
306 fig.savefig(filename)
307
308
309def run_evaluation(outdir, env_seed):
310 """Simulates 10 random archive solutions and saves videos of them.
311
312 Videos are saved to outdir / videos.
313
314 Args:
315 outdir (Path): Path object for the output directory from which to
316 retrieve the archive and save videos.
317 env_seed (int): Seed for the environment.
318 """
319 df = pd.read_csv(outdir / "archive.csv")
320 indices = np.random.permutation(len(df))[:10]
321
322 # Use a single env so that all the videos go to the same directory.
323 video_env = gym.wrappers.RecordVideo(
324 gym.make("LunarLander-v2", render_mode="rgb_array"),
325 video_folder=str(outdir / "videos"),
326 # This will ensure all episodes are recorded as videos.
327 episode_trigger=lambda idx: True,
328 )
329
330 for idx in indices:
331 model = np.array(df.loc[idx, "solution_0":])
332 reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
333 video_env)
334 print(f"=== Index {idx} ===\n"
335 "Model:\n"
336 f"{model}\n"
337 f"Reward: {reward}\n"
338 f"Impact x-pos: {impact_x_pos}\n"
339 f"Impact y-vel: {impact_y_vel}\n")
340
341 video_env.close()
342
343
344def lunar_lander_main(workers=4,
345 env_seed=52,
346 iterations=500,
347 log_freq=25,
348 n_emitters=5,
349 batch_size=30,
350 sigma0=1.0,
351 seed=None,
352 outdir="lunar_lander_output",
353 run_eval=False):
354 """Uses CMA-ME to train linear agents in Lunar Lander.
355
356 Args:
357 workers (int): Number of workers to use for simulations.
358 env_seed (int): Environment seed. The default gives the flat terrain
359 from the tutorial.
360 iterations (int): Number of iterations to run the algorithm.
361 log_freq (int): Number of iterations to wait before recording metrics
362 and saving heatmap.
363 n_emitters (int): Number of emitters.
364 batch_size (int): Batch size of each emitter.
365 sigma0 (float): Initial step size of each emitter.
366 seed (seed): Random seed for the pyribs components.
367 outdir (str): Directory for Lunar Lander output.
368 run_eval (bool): Pass this flag to run an evaluation of 10 random
369 solutions selected from the archive in the `outdir`.
370 """
371 outdir = Path(outdir)
372
373 if run_eval:
374 run_evaluation(outdir, env_seed)
375 return
376
377 # Make the directory here so that it is not made when running eval.
378 outdir.mkdir(exist_ok=True)
379
380 # Setup Dask. The client connects to a "cluster" running on this machine.
381 # The cluster simply manages several concurrent worker processes. If using
382 # Dask across many workers, we would set up a more complicated cluster and
383 # connect the client to it.
384 cluster = LocalCluster(
385 processes=True, # Each worker is a process.
386 n_workers=workers, # Create this many worker processes.
387 threads_per_worker=1, # Each worker process is single-threaded.
388 )
389 client = Client(cluster)
390
391 # CMA-ME.
392 scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
393 metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
394
395 # Outputs.
396 scheduler.archive.as_pandas().to_csv(outdir / "archive.csv")
397 save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
398 save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
399 save_metrics(outdir, metrics)
400
401
402if __name__ == "__main__":
403 fire.Fire(lunar_lander_main)