Lunar Lander Relanded: Using Dask to Distribute Evaluations

This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.

  1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
  2
  3Install the following dependencies before running this example:
  4    pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.27.0 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
  5
  6This script uses the same setup as the tutorial, but it also uses Dask to
  7parallelize evaluations on a single machine and adds in a CLI. Refer to the
  8tutorial here: https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for
  9more info.
 10
 11You should not need much familiarity with Dask to read this example. However, if
 12you would like to know more about Dask, we recommend referring to the quickstart
 13for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
 14
 15This script creates an output directory (defaults to `lunar_lander_output/`, see
 16the --outdir flag) with the following files:
 17
 18    - archive.csv: The CSV representation of the final archive, obtained with
 19      as_pandas().
 20    - archive_ccdf.png: A plot showing the (unnormalized) complementary
 21      cumulative distribution function of objectives in the archive. For
 22      each objective p on the x-axis, this plot shows the number of
 23      solutions that had an objective of at least p.
 24    - heatmap.png: A heatmap showing the performance of solutions in the
 25      archive.
 26    - metrics.json: Metrics about the run, saved as a mapping from the metric
 27      name to a list of x values (iteration number) and a list of y values
 28      (metric value) for that metric.
 29    - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
 30      `max_score`.
 31
 32In evaluation mode (--run-eval flag), the script will read in the archive from
 33the output directory and simulate 10 random solutions from the archive. It will
 34write videos of these simulations to a `videos/` subdirectory in the output
 35directory.
 36
 37Usage:
 38    # Basic usage - should take ~1 hour with 4 cores.
 39    python lunar_lander.py NUM_WORKERS
 40    # Now open the Dask dashboard at http://localhost:8787 to view worker
 41    # status.
 42
 43    # Evaluation mode. If you passed a different outdir and/or env_seed when
 44    # running the algorithm with the command above, you must pass the same
 45    # outdir and/or env_seed here.
 46    python lunar_lander.py --run-eval
 47Help:
 48    python lunar_lander.py --help
 49"""
 50import json
 51import time
 52from pathlib import Path
 53
 54import fire
 55import gymnasium as gym
 56import matplotlib.pyplot as plt
 57import numpy as np
 58import pandas as pd
 59import tqdm
 60from dask.distributed import Client, LocalCluster
 61
 62from ribs.archives import GridArchive
 63from ribs.emitters import EvolutionStrategyEmitter
 64from ribs.schedulers import Scheduler
 65from ribs.visualize import grid_archive_heatmap
 66
 67
 68def simulate(model, seed=None, video_env=None):
 69    """Simulates the lunar lander model.
 70
 71    Note that this function has some slight differences from the simulate()
 72    function in the lunar lander tutorial; e.g., it creates an environment
 73    rather than taking the environment as an argument.
 74
 75    Args:
 76        model (np.ndarray): The array of weights for the linear policy.
 77        seed (int): The seed for the environment.
 78        video_env (gym.Env): If passed in, this will be used instead of creating
 79            a new env. This is used primarily for recording video during
 80            evaluation.
 81    Returns:
 82        total_reward (float): The reward accrued by the lander throughout its
 83            trajectory.
 84        impact_x_pos (float): The x position of the lander when it touches the
 85            ground for the first time.
 86        impact_y_vel (float): The y velocity of the lander when it touches the
 87            ground for the first time.
 88    """
 89    if video_env is None:
 90        # Since we are using multiple processes, it is simpler if each worker
 91        # just creates their own copy of the environment instead of trying to
 92        # share the environment. This also makes the function "pure."
 93        env = gym.make("LunarLander-v2")
 94    else:
 95        env = video_env
 96
 97    action_dim = env.action_space.n
 98    obs_dim = env.observation_space.shape[0]
 99    model = model.reshape((action_dim, obs_dim))
100
101    total_reward = 0.0
102    impact_x_pos = None
103    impact_y_vel = None
104    all_y_vels = []
105    obs, _ = env.reset(seed=seed)
106    done = False
107
108    while not done:
109        action = np.argmax(model @ obs)  # Linear policy.
110        obs, reward, terminated, truncated, _ = env.step(action)
111        done = terminated or truncated
112        total_reward += reward
113
114        # Refer to the definition of state here:
115        # https://gymnasium.farama.org/environments/box2d/lunar_lander/
116        x_pos = obs[0]
117        y_vel = obs[3]
118        leg0_touch = bool(obs[6])
119        leg1_touch = bool(obs[7])
120        all_y_vels.append(y_vel)
121
122        # Check if the lunar lander is impacting for the first time.
123        if impact_x_pos is None and (leg0_touch or leg1_touch):
124            impact_x_pos = x_pos
125            impact_y_vel = y_vel
126
127    # If the lunar lander did not land, set the x-pos to the one from the final
128    # timestep, and set the y-vel to the max y-vel (we use min since the lander
129    # goes down).
130    if impact_x_pos is None:
131        impact_x_pos = x_pos
132        impact_y_vel = min(all_y_vels)
133
134    # Only close the env if it was not a video env.
135    if video_env is None:
136        env.close()
137
138    return total_reward, impact_x_pos, impact_y_vel
139
140
141def create_scheduler(seed, n_emitters, sigma0, batch_size):
142    """Creates the Scheduler based on given configurations.
143
144    See lunar_lander_main() for description of args.
145
146    Returns:
147        A pyribs scheduler set up for CMA-ME (i.e. it has
148        EvolutionStrategyEmitter's and a GridArchive).
149    """
150    env = gym.make("LunarLander-v2")
151    action_dim = env.action_space.n
152    obs_dim = env.observation_space.shape[0]
153    initial_model = np.zeros((action_dim, obs_dim))
154    archive = GridArchive(
155        solution_dim=initial_model.size,
156        dims=[50, 50],  # 50 cells in each dimension.
157        # (-1, 1) for x-pos and (-3, 0) for y-vel.
158        ranges=[(-1.0, 1.0), (-3.0, 0.0)],
159        seed=seed)
160
161    # If we create the emitters with identical seeds, they will all output the
162    # same initial solutions. The algorithm should still work -- eventually, the
163    # emitters will produce different solutions because they get different
164    # responses when inserting into the archive. However, using different seeds
165    # avoids this problem altogether.
166    seeds = ([None] * n_emitters
167             if seed is None else [seed + i for i in range(n_emitters)])
168
169    # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
170    emitters = [
171        EvolutionStrategyEmitter(
172            archive,
173            x0=initial_model.flatten(),
174            sigma0=sigma0,
175            ranker="2imp",
176            batch_size=batch_size,
177            seed=s,
178        ) for s in seeds
179    ]
180
181    scheduler = Scheduler(archive, emitters)
182    return scheduler
183
184
185def run_search(client, scheduler, env_seed, iterations, log_freq):
186    """Runs the QD algorithm for the given number of iterations.
187
188    Args:
189        client (Client): A Dask client providing access to workers.
190        scheduler (Scheduler): pyribs scheduler.
191        env_seed (int): Seed for the environment.
192        iterations (int): Iterations to run.
193        log_freq (int): Number of iterations to wait before recording metrics.
194    Returns:
195        dict: A mapping from various metric names to a list of "x" and "y"
196        values where x is the iteration and y is the value of the metric. Think
197        of each entry as the x's and y's for a matplotlib plot.
198    """
199    print(
200        "> Starting search.\n"
201        "  - Open Dask's dashboard at http://localhost:8787 to monitor workers."
202    )
203
204    metrics = {
205        "Max Score": {
206            "x": [],
207            "y": [],
208        },
209        "Archive Size": {
210            "x": [0],
211            "y": [0],
212        },
213    }
214
215    start_time = time.time()
216    for itr in tqdm.trange(1, iterations + 1):
217        # Request models from the scheduler.
218        sols = scheduler.ask()
219
220        # Evaluate the models and record the objectives and measures.
221        objs, meas = [], []
222
223        # Ask the Dask client to distribute the simulations among the Dask
224        # workers, then gather the results of the simulations.
225        futures = client.map(lambda model: simulate(model, env_seed), sols)
226        results = client.gather(futures)
227
228        # Process the results.
229        for obj, impact_x_pos, impact_y_vel in results:
230            objs.append(obj)
231            meas.append([impact_x_pos, impact_y_vel])
232
233        # Send the results back to the scheduler.
234        scheduler.tell(objs, meas)
235
236        # Logging.
237        if itr % log_freq == 0 or itr == iterations:
238            elapsed_time = time.time() - start_time
239            metrics["Max Score"]["x"].append(itr)
240            metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
241            metrics["Archive Size"]["x"].append(itr)
242            metrics["Archive Size"]["y"].append(len(scheduler.archive))
243            print(f"> {itr} itrs completed after {elapsed_time:.2f} s")
244            print(f"  - Max Score: {metrics['Max Score']['y'][-1]}")
245            print(f"  - Archive Size: {metrics['Archive Size']['y'][-1]}")
246
247    return metrics
248
249
250def save_heatmap(archive, filename):
251    """Saves a heatmap of the scheduler's archive to the filename.
252
253    Args:
254        archive (GridArchive): Archive with results from an experiment.
255        filename (str): Path to an image file.
256    """
257    fig, ax = plt.subplots(figsize=(8, 6))
258    grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
259    ax.invert_yaxis()  # Makes more sense if larger velocities are on top.
260    ax.set_ylabel("Impact y-velocity")
261    ax.set_xlabel("Impact x-position")
262    fig.savefig(filename)
263
264
265def save_metrics(outdir, metrics):
266    """Saves metrics to png plots and a JSON file.
267
268    Args:
269        outdir (Path): output directory for saving files.
270        metrics (dict): Metrics as output by run_search.
271    """
272    # Plots.
273    for metric in metrics:
274        fig, ax = plt.subplots()
275        ax.plot(metrics[metric]["x"], metrics[metric]["y"])
276        ax.set_title(metric)
277        ax.set_xlabel("Iteration")
278        fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
279
280    # JSON file.
281    with (outdir / "metrics.json").open("w") as file:
282        json.dump(metrics, file, indent=2)
283
284
285def save_ccdf(archive, filename):
286    """Saves a CCDF showing the distribution of the archive's objectives.
287
288    CCDF = Complementary Cumulative Distribution Function (see
289    https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
290    The CCDF plotted here is not normalized to the range (0,1). This may help
291    when comparing CCDF's among archives with different amounts of coverage
292    (i.e. when one archive has more cells filled).
293
294    Args:
295        archive (GridArchive): Archive with results from an experiment.
296        filename (str): Path to an image file.
297    """
298    fig, ax = plt.subplots()
299    ax.hist(
300        archive.as_pandas(include_solutions=False)["objective"],
301        50,  # Number of cells.
302        histtype="step",
303        density=False,
304        cumulative=-1)  # CCDF rather than CDF.
305    ax.set_xlabel("Objectives")
306    ax.set_ylabel("Num. Entries")
307    ax.set_title("Distribution of Archive Objectives")
308    fig.savefig(filename)
309
310
311def run_evaluation(outdir, env_seed):
312    """Simulates 10 random archive solutions and saves videos of them.
313
314    Videos are saved to outdir / videos.
315
316    Args:
317        outdir (Path): Path object for the output directory from which to
318            retrieve the archive and save videos.
319        env_seed (int): Seed for the environment.
320    """
321    df = pd.read_csv(outdir / "archive.csv")
322    indices = np.random.permutation(len(df))[:10]
323
324    # Use a single env so that all the videos go to the same directory.
325    video_env = gym.wrappers.RecordVideo(
326        gym.make("LunarLander-v2", render_mode="rgb_array"),
327        video_folder=str(outdir / "videos"),
328        # This will ensure all episodes are recorded as videos.
329        episode_trigger=lambda idx: True,
330    )
331
332    for idx in indices:
333        model = np.array(df.loc[idx, "solution_0":])
334        reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
335                                                      video_env)
336        print(f"=== Index {idx} ===\n"
337              "Model:\n"
338              f"{model}\n"
339              f"Reward: {reward}\n"
340              f"Impact x-pos: {impact_x_pos}\n"
341              f"Impact y-vel: {impact_y_vel}\n")
342
343    video_env.close()
344
345
346def lunar_lander_main(workers=4,
347                      env_seed=52,
348                      iterations=500,
349                      log_freq=25,
350                      n_emitters=5,
351                      batch_size=30,
352                      sigma0=1.0,
353                      seed=None,
354                      outdir="lunar_lander_output",
355                      run_eval=False):
356    """Uses CMA-ME to train linear agents in Lunar Lander.
357
358    Args:
359        workers (int): Number of workers to use for simulations.
360        env_seed (int): Environment seed. The default gives the flat terrain
361            from the tutorial.
362        iterations (int): Number of iterations to run the algorithm.
363        log_freq (int): Number of iterations to wait before recording metrics
364            and saving heatmap.
365        n_emitters (int): Number of emitters.
366        batch_size (int): Batch size of each emitter.
367        sigma0 (float): Initial step size of each emitter.
368        seed (seed): Random seed for the pyribs components.
369        outdir (str): Directory for Lunar Lander output.
370        run_eval (bool): Pass this flag to run an evaluation of 10 random
371            solutions selected from the archive in the `outdir`.
372    """
373    outdir = Path(outdir)
374
375    if run_eval:
376        run_evaluation(outdir, env_seed)
377        return
378
379    # Make the directory here so that it is not made when running eval.
380    outdir.mkdir(exist_ok=True)
381
382    # Setup Dask. The client connects to a "cluster" running on this machine.
383    # The cluster simply manages several concurrent worker processes. If using
384    # Dask across many workers, we would set up a more complicated cluster and
385    # connect the client to it.
386    cluster = LocalCluster(
387        processes=True,  # Each worker is a process.
388        n_workers=workers,  # Create this many worker processes.
389        threads_per_worker=1,  # Each worker process is single-threaded.
390    )
391    client = Client(cluster)
392
393    # CMA-ME.
394    scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
395    metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
396
397    # Outputs.
398    scheduler.archive.as_pandas().to_csv(outdir / "archive.csv")
399    save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
400    save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
401    save_metrics(outdir, metrics)
402
403
404if __name__ == "__main__":
405    fire.Fire(lunar_lander_main)