Lunar Lander Relanded: Using Dask to Distribute Evaluations

This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.

  1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
  2
  3Install the following dependencies before running this example -- swig must be
  4installed before box2d can be installed, hence it is a separate command:
  5    pip install swig
  6    pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.29.1 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
  7
  8This script uses the same setup as the tutorial, but it also uses Dask instead
  9of Python's multiprocessing to parallelize evaluations on a single machine and
 10adds in a CLI. Refer to the tutorial here:
 11https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for more info.
 12
 13You should not need much familiarity with Dask to read this example. However, if
 14you would like to know more about Dask, we recommend referring to the quickstart
 15for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
 16
 17This script creates an output directory (defaults to `lunar_lander_output/`, see
 18the --outdir flag) with the following files:
 19
 20    - archive.csv: The CSV representation of the final archive, obtained with
 21      data().
 22    - archive_ccdf.png: A plot showing the (unnormalized) complementary
 23      cumulative distribution function of objectives in the archive. For
 24      each objective p on the x-axis, this plot shows the number of
 25      solutions that had an objective of at least p.
 26    - heatmap.png: A heatmap showing the performance of solutions in the
 27      archive.
 28    - metrics.json: Metrics about the run, saved as a mapping from the metric
 29      name to a list of x values (iteration number) and a list of y values
 30      (metric value) for that metric.
 31    - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
 32      `max_score`.
 33
 34In evaluation mode (--run-eval flag), the script will read in the archive from
 35the output directory and simulate 10 random solutions from the archive. It will
 36write videos of these simulations to a `videos/` subdirectory in the output
 37directory.
 38
 39Usage:
 40    # Basic usage - should take ~1 hour with 4 cores.
 41    python lunar_lander.py NUM_WORKERS
 42    # Now open the Dask dashboard at http://localhost:8787 to view worker
 43    # status.
 44
 45    # Evaluation mode. If you passed a different outdir and/or env_seed when
 46    # running the algorithm with the command above, you must pass the same
 47    # outdir and/or env_seed here.
 48    python lunar_lander.py --run-eval
 49Help:
 50    python lunar_lander.py --help
 51"""
 52import json
 53import time
 54from pathlib import Path
 55
 56import fire
 57import gymnasium as gym
 58import matplotlib.pyplot as plt
 59import numpy as np
 60import pandas as pd
 61import tqdm
 62from dask.distributed import Client, LocalCluster
 63
 64from ribs.archives import ArchiveDataFrame, GridArchive
 65from ribs.emitters import EvolutionStrategyEmitter
 66from ribs.schedulers import Scheduler
 67from ribs.visualize import grid_archive_heatmap
 68
 69
 70def simulate(model, seed=None, video_env=None):
 71    """Simulates the lunar lander model.
 72
 73    Args:
 74        model (np.ndarray): The array of weights for the linear policy.
 75        seed (int): The seed for the environment.
 76        video_env (gym.Env): If passed in, this will be used instead of creating
 77            a new env. This is used primarily for recording video during
 78            evaluation.
 79    Returns:
 80        total_reward (float): The reward accrued by the lander throughout its
 81            trajectory.
 82        impact_x_pos (float): The x position of the lander when it touches the
 83            ground for the first time.
 84        impact_y_vel (float): The y velocity of the lander when it touches the
 85            ground for the first time.
 86    """
 87    if video_env is None:
 88        # Since we are using multiple processes, it is simpler if each worker
 89        # just creates their own copy of the environment instead of trying to
 90        # share the environment. This also makes the function "pure." However,
 91        # we should use the video_env if it is passed in.
 92        env = gym.make("LunarLander-v2")
 93    else:
 94        env = video_env
 95
 96    action_dim = env.action_space.n
 97    obs_dim = env.observation_space.shape[0]
 98    model = model.reshape((action_dim, obs_dim))
 99
100    total_reward = 0.0
101    impact_x_pos = None
102    impact_y_vel = None
103    all_y_vels = []
104    obs, _ = env.reset(seed=seed)
105    done = False
106
107    while not done:
108        action = np.argmax(model @ obs)  # Linear policy.
109        obs, reward, terminated, truncated, _ = env.step(action)
110        done = terminated or truncated
111        total_reward += reward
112
113        # Refer to the definition of state here:
114        # https://gymnasium.farama.org/environments/box2d/lunar_lander/
115        x_pos = obs[0]
116        y_vel = obs[3]
117        leg0_touch = bool(obs[6])
118        leg1_touch = bool(obs[7])
119        all_y_vels.append(y_vel)
120
121        # Check if the lunar lander is impacting for the first time.
122        if impact_x_pos is None and (leg0_touch or leg1_touch):
123            impact_x_pos = x_pos
124            impact_y_vel = y_vel
125
126    # If the lunar lander did not land, set the x-pos to the one from the final
127    # timestep, and set the y-vel to the max y-vel (we use min since the lander
128    # goes down).
129    if impact_x_pos is None:
130        impact_x_pos = x_pos
131        impact_y_vel = min(all_y_vels)
132
133    # Only close the env if it was not a video env.
134    if video_env is None:
135        env.close()
136
137    return total_reward, impact_x_pos, impact_y_vel
138
139
140def create_scheduler(seed, n_emitters, sigma0, batch_size):
141    """Creates the Scheduler based on given configurations.
142
143    See lunar_lander_main() for description of args.
144
145    Returns:
146        A pyribs scheduler set up for CMA-ME (i.e. it has
147        EvolutionStrategyEmitter's and a GridArchive).
148    """
149    env = gym.make("LunarLander-v2")
150    action_dim = env.action_space.n
151    obs_dim = env.observation_space.shape[0]
152    initial_model = np.zeros((action_dim, obs_dim))
153    archive = GridArchive(
154        solution_dim=initial_model.size,
155        dims=[50, 50],  # 50 cells in each dimension.
156        # (-1, 1) for x-pos and (-3, 0) for y-vel.
157        ranges=[(-1.0, 1.0), (-3.0, 0.0)],
158        seed=seed,
159        qd_score_offset=-600,
160    )
161
162    # If we create the emitters with identical seeds, they will all output the
163    # same initial solutions. The algorithm should still work -- eventually, the
164    # emitters will produce different solutions because they get different
165    # responses when inserting into the archive. However, using different seeds
166    # avoids this problem altogether.
167    seeds = ([None] * n_emitters
168             if seed is None else [seed + i for i in range(n_emitters)])
169
170    # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
171    emitters = [
172        EvolutionStrategyEmitter(
173            archive,
174            x0=initial_model.flatten(),
175            sigma0=sigma0,
176            ranker="2imp",
177            batch_size=batch_size,
178            seed=s,
179        ) for s in seeds
180    ]
181
182    scheduler = Scheduler(archive, emitters)
183    return scheduler
184
185
186def run_search(client, scheduler, env_seed, iterations, log_freq):
187    """Runs the QD algorithm for the given number of iterations.
188
189    Args:
190        client (Client): A Dask client providing access to workers.
191        scheduler (Scheduler): pyribs scheduler.
192        env_seed (int): Seed for the environment.
193        iterations (int): Iterations to run.
194        log_freq (int): Number of iterations to wait before recording metrics.
195    Returns:
196        dict: A mapping from various metric names to a list of "x" and "y"
197        values where x is the iteration and y is the value of the metric. Think
198        of each entry as the x's and y's for a matplotlib plot.
199    """
200    print(
201        "> Starting search.\n"
202        "  - Open Dask's dashboard at http://localhost:8787 to monitor workers."
203    )
204
205    metrics = {
206        "Max Score": {
207            "x": [],
208            "y": [],
209        },
210        "Archive Size": {
211            "x": [0],
212            "y": [0],
213        },
214        "QD Score": {
215            "x": [0],
216            "y": [0],
217        },
218    }
219
220    start_time = time.time()
221    for itr in tqdm.trange(1, iterations + 1):
222        # Request models from the scheduler.
223        sols = scheduler.ask()
224
225        # Evaluate the models and record the objectives and measures.
226        objs, meas = [], []
227
228        # Ask the Dask client to distribute the simulations among the Dask
229        # workers, then gather the results of the simulations.
230        futures = client.map(lambda model: simulate(model, env_seed), sols)
231        results = client.gather(futures)
232
233        # Process the results.
234        for obj, impact_x_pos, impact_y_vel in results:
235            objs.append(obj)
236            meas.append([impact_x_pos, impact_y_vel])
237
238        # Send the results back to the scheduler.
239        scheduler.tell(objs, meas)
240
241        # Logging.
242        if itr % log_freq == 0 or itr == iterations:
243            elapsed_time = time.time() - start_time
244            metrics["Max Score"]["x"].append(itr)
245            metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
246            metrics["Archive Size"]["x"].append(itr)
247            metrics["Archive Size"]["y"].append(len(scheduler.archive))
248            metrics["QD Score"]["x"].append(itr)
249            metrics["QD Score"]["y"].append(scheduler.archive.stats.qd_score)
250            tqdm.tqdm.write(
251                f"> {itr} itrs completed after {elapsed_time:.2f} s\n"
252                f"  - Max Score: {metrics['Max Score']['y'][-1]}\n"
253                f"  - Archive Size: {metrics['Archive Size']['y'][-1]}\n"
254                f"  - QD Score: {metrics['QD Score']['y'][-1]}")
255
256    return metrics
257
258
259def save_heatmap(archive, filename):
260    """Saves a heatmap of the scheduler's archive to the filename.
261
262    Args:
263        archive (GridArchive): Archive with results from an experiment.
264        filename (str): Path to an image file.
265    """
266    fig, ax = plt.subplots(figsize=(8, 6))
267    grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
268    ax.invert_yaxis()  # Makes more sense if larger velocities are on top.
269    ax.set_ylabel("Impact y-velocity")
270    ax.set_xlabel("Impact x-position")
271    fig.savefig(filename)
272
273
274def save_metrics(outdir, metrics):
275    """Saves metrics to png plots and a JSON file.
276
277    Args:
278        outdir (Path): output directory for saving files.
279        metrics (dict): Metrics as output by run_search.
280    """
281    # Plots.
282    for metric in metrics:
283        fig, ax = plt.subplots()
284        ax.plot(metrics[metric]["x"], metrics[metric]["y"])
285        ax.set_title(metric)
286        ax.set_xlabel("Iteration")
287        fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
288
289    # JSON file.
290    with (outdir / "metrics.json").open("w") as file:
291        json.dump(metrics, file, indent=2)
292
293
294def save_ccdf(archive, filename):
295    """Saves a CCDF showing the distribution of the archive's objectives.
296
297    CCDF = Complementary Cumulative Distribution Function (see
298    https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
299    The CCDF plotted here is not normalized to the range (0,1). This may help
300    when comparing CCDF's among archives with different amounts of coverage
301    (i.e. when one archive has more cells filled).
302
303    Args:
304        archive (GridArchive): Archive with results from an experiment.
305        filename (str): Path to an image file.
306    """
307    fig, ax = plt.subplots()
308    ax.hist(
309        archive.data("objective"),
310        50,  # Number of cells.
311        histtype="step",
312        density=False,
313        cumulative=-1)  # CCDF rather than CDF.
314    ax.set_xlabel("Objectives")
315    ax.set_ylabel("Num. Entries")
316    ax.set_title("Distribution of Archive Objectives")
317    fig.savefig(filename)
318
319
320def run_evaluation(outdir, env_seed):
321    """Simulates 10 random archive solutions and saves videos of them.
322
323    Videos are saved to outdir / videos.
324
325    Args:
326        outdir (Path): Path object for the output directory from which to
327            retrieve the archive and save videos.
328        env_seed (int): Seed for the environment.
329    """
330    df = ArchiveDataFrame(pd.read_csv(outdir / "archive.csv"))
331    solutions = df.get_field("solution")
332    indices = np.random.permutation(len(df))[:10]
333
334    # Use a single env so that all the videos go to the same directory.
335    video_env = gym.wrappers.RecordVideo(
336        gym.make("LunarLander-v2", render_mode="rgb_array"),
337        video_folder=str(outdir / "videos"),
338        # This will ensure all episodes are recorded as videos.
339        episode_trigger=lambda idx: True,
340    )
341
342    for idx in indices:
343        model = solutions[idx]
344        reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
345                                                      video_env)
346        print(f"=== Index {idx} ===\n"
347              "Model:\n"
348              f"{model}\n"
349              f"Reward: {reward}\n"
350              f"Impact x-pos: {impact_x_pos}\n"
351              f"Impact y-vel: {impact_y_vel}\n")
352
353    video_env.close()
354
355
356def lunar_lander_main(workers=4,
357                      env_seed=52,
358                      iterations=500,
359                      log_freq=25,
360                      n_emitters=5,
361                      batch_size=30,
362                      sigma0=1.0,
363                      seed=None,
364                      outdir="lunar_lander_output",
365                      run_eval=False):
366    """Uses CMA-ME to train linear agents in Lunar Lander.
367
368    Args:
369        workers (int): Number of workers to use for simulations.
370        env_seed (int): Environment seed. The default gives the flat terrain
371            from the tutorial.
372        iterations (int): Number of iterations to run the algorithm.
373        log_freq (int): Number of iterations to wait before recording metrics
374            and saving heatmap.
375        n_emitters (int): Number of emitters.
376        batch_size (int): Batch size of each emitter.
377        sigma0 (float): Initial step size of each emitter.
378        seed (seed): Random seed for the pyribs components.
379        outdir (str): Directory for Lunar Lander output.
380        run_eval (bool): Pass this flag to run an evaluation of 10 random
381            solutions selected from the archive in the `outdir`.
382    """
383    outdir = Path(outdir)
384
385    if run_eval:
386        run_evaluation(outdir, env_seed)
387        return
388
389    # Make the directory here so that it is not made when running eval.
390    outdir.mkdir(exist_ok=True)
391
392    # Setup Dask. The client connects to a "cluster" running on this machine.
393    # The cluster simply manages several concurrent worker processes. If using
394    # Dask across many workers, we would set up a more complicated cluster and
395    # connect the client to it.
396    cluster = LocalCluster(
397        processes=True,  # Each worker is a process.
398        n_workers=workers,  # Create this many worker processes.
399        threads_per_worker=1,  # Each worker process is single-threaded.
400    )
401    client = Client(cluster)
402
403    # CMA-ME.
404    scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
405    metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
406
407    # Outputs.
408    scheduler.archive.data(return_type="pandas").to_csv(outdir / "archive.csv")
409    save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
410    save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
411    save_metrics(outdir, metrics)
412
413
414if __name__ == "__main__":
415    fire.Fire(lunar_lander_main)