Lunar Lander Relanded: Using Dask to Distribute Evaluations

This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.

  1"""Uses CMA-ME to train agents with linear policies in Lunar Lander.
  2
  3Install the following dependencies before running this example:
  4    pip install ribs[visualize] tqdm fire gymnasium[box2d]==0.27.0 moviepy>=1.0.0 dask>=2.0.0 distributed>=2.0.0 bokeh>=2.0.0
  5
  6This script uses the same setup as the tutorial, but it also uses Dask instead
  7of Python's multiprocessing to parallelize evaluations on a single machine and
  8adds in a CLI. Refer to the tutorial here:
  9https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for more info.
 10
 11You should not need much familiarity with Dask to read this example. However, if
 12you would like to know more about Dask, we recommend referring to the quickstart
 13for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
 14
 15This script creates an output directory (defaults to `lunar_lander_output/`, see
 16the --outdir flag) with the following files:
 17
 18    - archive.csv: The CSV representation of the final archive, obtained with
 19      as_pandas().
 20    - archive_ccdf.png: A plot showing the (unnormalized) complementary
 21      cumulative distribution function of objectives in the archive. For
 22      each objective p on the x-axis, this plot shows the number of
 23      solutions that had an objective of at least p.
 24    - heatmap.png: A heatmap showing the performance of solutions in the
 25      archive.
 26    - metrics.json: Metrics about the run, saved as a mapping from the metric
 27      name to a list of x values (iteration number) and a list of y values
 28      (metric value) for that metric.
 29    - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
 30      `max_score`.
 31
 32In evaluation mode (--run-eval flag), the script will read in the archive from
 33the output directory and simulate 10 random solutions from the archive. It will
 34write videos of these simulations to a `videos/` subdirectory in the output
 35directory.
 36
 37Usage:
 38    # Basic usage - should take ~1 hour with 4 cores.
 39    python lunar_lander.py NUM_WORKERS
 40    # Now open the Dask dashboard at http://localhost:8787 to view worker
 41    # status.
 42
 43    # Evaluation mode. If you passed a different outdir and/or env_seed when
 44    # running the algorithm with the command above, you must pass the same
 45    # outdir and/or env_seed here.
 46    python lunar_lander.py --run-eval
 47Help:
 48    python lunar_lander.py --help
 49"""
 50import json
 51import time
 52from pathlib import Path
 53
 54import fire
 55import gymnasium as gym
 56import matplotlib.pyplot as plt
 57import numpy as np
 58import pandas as pd
 59import tqdm
 60from dask.distributed import Client, LocalCluster
 61
 62from ribs.archives import GridArchive
 63from ribs.emitters import EvolutionStrategyEmitter
 64from ribs.schedulers import Scheduler
 65from ribs.visualize import grid_archive_heatmap
 66
 67
 68def simulate(model, seed=None, video_env=None):
 69    """Simulates the lunar lander model.
 70
 71    Args:
 72        model (np.ndarray): The array of weights for the linear policy.
 73        seed (int): The seed for the environment.
 74        video_env (gym.Env): If passed in, this will be used instead of creating
 75            a new env. This is used primarily for recording video during
 76            evaluation.
 77    Returns:
 78        total_reward (float): The reward accrued by the lander throughout its
 79            trajectory.
 80        impact_x_pos (float): The x position of the lander when it touches the
 81            ground for the first time.
 82        impact_y_vel (float): The y velocity of the lander when it touches the
 83            ground for the first time.
 84    """
 85    if video_env is None:
 86        # Since we are using multiple processes, it is simpler if each worker
 87        # just creates their own copy of the environment instead of trying to
 88        # share the environment. This also makes the function "pure." However,
 89        # we should use the video_env if it is passed in.
 90        env = gym.make("LunarLander-v2")
 91    else:
 92        env = video_env
 93
 94    action_dim = env.action_space.n
 95    obs_dim = env.observation_space.shape[0]
 96    model = model.reshape((action_dim, obs_dim))
 97
 98    total_reward = 0.0
 99    impact_x_pos = None
100    impact_y_vel = None
101    all_y_vels = []
102    obs, _ = env.reset(seed=seed)
103    done = False
104
105    while not done:
106        action = np.argmax(model @ obs)  # Linear policy.
107        obs, reward, terminated, truncated, _ = env.step(action)
108        done = terminated or truncated
109        total_reward += reward
110
111        # Refer to the definition of state here:
112        # https://gymnasium.farama.org/environments/box2d/lunar_lander/
113        x_pos = obs[0]
114        y_vel = obs[3]
115        leg0_touch = bool(obs[6])
116        leg1_touch = bool(obs[7])
117        all_y_vels.append(y_vel)
118
119        # Check if the lunar lander is impacting for the first time.
120        if impact_x_pos is None and (leg0_touch or leg1_touch):
121            impact_x_pos = x_pos
122            impact_y_vel = y_vel
123
124    # If the lunar lander did not land, set the x-pos to the one from the final
125    # timestep, and set the y-vel to the max y-vel (we use min since the lander
126    # goes down).
127    if impact_x_pos is None:
128        impact_x_pos = x_pos
129        impact_y_vel = min(all_y_vels)
130
131    # Only close the env if it was not a video env.
132    if video_env is None:
133        env.close()
134
135    return total_reward, impact_x_pos, impact_y_vel
136
137
138def create_scheduler(seed, n_emitters, sigma0, batch_size):
139    """Creates the Scheduler based on given configurations.
140
141    See lunar_lander_main() for description of args.
142
143    Returns:
144        A pyribs scheduler set up for CMA-ME (i.e. it has
145        EvolutionStrategyEmitter's and a GridArchive).
146    """
147    env = gym.make("LunarLander-v2")
148    action_dim = env.action_space.n
149    obs_dim = env.observation_space.shape[0]
150    initial_model = np.zeros((action_dim, obs_dim))
151    archive = GridArchive(
152        solution_dim=initial_model.size,
153        dims=[50, 50],  # 50 cells in each dimension.
154        # (-1, 1) for x-pos and (-3, 0) for y-vel.
155        ranges=[(-1.0, 1.0), (-3.0, 0.0)],
156        seed=seed)
157
158    # If we create the emitters with identical seeds, they will all output the
159    # same initial solutions. The algorithm should still work -- eventually, the
160    # emitters will produce different solutions because they get different
161    # responses when inserting into the archive. However, using different seeds
162    # avoids this problem altogether.
163    seeds = ([None] * n_emitters
164             if seed is None else [seed + i for i in range(n_emitters)])
165
166    # We use the EvolutionStrategyEmitter to create an ImprovementEmitter.
167    emitters = [
168        EvolutionStrategyEmitter(
169            archive,
170            x0=initial_model.flatten(),
171            sigma0=sigma0,
172            ranker="2imp",
173            batch_size=batch_size,
174            seed=s,
175        ) for s in seeds
176    ]
177
178    scheduler = Scheduler(archive, emitters)
179    return scheduler
180
181
182def run_search(client, scheduler, env_seed, iterations, log_freq):
183    """Runs the QD algorithm for the given number of iterations.
184
185    Args:
186        client (Client): A Dask client providing access to workers.
187        scheduler (Scheduler): pyribs scheduler.
188        env_seed (int): Seed for the environment.
189        iterations (int): Iterations to run.
190        log_freq (int): Number of iterations to wait before recording metrics.
191    Returns:
192        dict: A mapping from various metric names to a list of "x" and "y"
193        values where x is the iteration and y is the value of the metric. Think
194        of each entry as the x's and y's for a matplotlib plot.
195    """
196    print(
197        "> Starting search.\n"
198        "  - Open Dask's dashboard at http://localhost:8787 to monitor workers."
199    )
200
201    metrics = {
202        "Max Score": {
203            "x": [],
204            "y": [],
205        },
206        "Archive Size": {
207            "x": [0],
208            "y": [0],
209        },
210    }
211
212    start_time = time.time()
213    for itr in tqdm.trange(1, iterations + 1):
214        # Request models from the scheduler.
215        sols = scheduler.ask()
216
217        # Evaluate the models and record the objectives and measures.
218        objs, meas = [], []
219
220        # Ask the Dask client to distribute the simulations among the Dask
221        # workers, then gather the results of the simulations.
222        futures = client.map(lambda model: simulate(model, env_seed), sols)
223        results = client.gather(futures)
224
225        # Process the results.
226        for obj, impact_x_pos, impact_y_vel in results:
227            objs.append(obj)
228            meas.append([impact_x_pos, impact_y_vel])
229
230        # Send the results back to the scheduler.
231        scheduler.tell(objs, meas)
232
233        # Logging.
234        if itr % log_freq == 0 or itr == iterations:
235            elapsed_time = time.time() - start_time
236            metrics["Max Score"]["x"].append(itr)
237            metrics["Max Score"]["y"].append(scheduler.archive.stats.obj_max)
238            metrics["Archive Size"]["x"].append(itr)
239            metrics["Archive Size"]["y"].append(len(scheduler.archive))
240            tqdm.tqdm.write(
241                f"> {itr} itrs completed after {elapsed_time:.2f} s\n"
242                f"  - Max Score: {metrics['Max Score']['y'][-1]}\n"
243                f"  - Archive Size: {metrics['Archive Size']['y'][-1]}")
244
245    return metrics
246
247
248def save_heatmap(archive, filename):
249    """Saves a heatmap of the scheduler's archive to the filename.
250
251    Args:
252        archive (GridArchive): Archive with results from an experiment.
253        filename (str): Path to an image file.
254    """
255    fig, ax = plt.subplots(figsize=(8, 6))
256    grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
257    ax.invert_yaxis()  # Makes more sense if larger velocities are on top.
258    ax.set_ylabel("Impact y-velocity")
259    ax.set_xlabel("Impact x-position")
260    fig.savefig(filename)
261
262
263def save_metrics(outdir, metrics):
264    """Saves metrics to png plots and a JSON file.
265
266    Args:
267        outdir (Path): output directory for saving files.
268        metrics (dict): Metrics as output by run_search.
269    """
270    # Plots.
271    for metric in metrics:
272        fig, ax = plt.subplots()
273        ax.plot(metrics[metric]["x"], metrics[metric]["y"])
274        ax.set_title(metric)
275        ax.set_xlabel("Iteration")
276        fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
277
278    # JSON file.
279    with (outdir / "metrics.json").open("w") as file:
280        json.dump(metrics, file, indent=2)
281
282
283def save_ccdf(archive, filename):
284    """Saves a CCDF showing the distribution of the archive's objectives.
285
286    CCDF = Complementary Cumulative Distribution Function (see
287    https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
288    The CCDF plotted here is not normalized to the range (0,1). This may help
289    when comparing CCDF's among archives with different amounts of coverage
290    (i.e. when one archive has more cells filled).
291
292    Args:
293        archive (GridArchive): Archive with results from an experiment.
294        filename (str): Path to an image file.
295    """
296    fig, ax = plt.subplots()
297    ax.hist(
298        archive.as_pandas(include_solutions=False)["objective"],
299        50,  # Number of cells.
300        histtype="step",
301        density=False,
302        cumulative=-1)  # CCDF rather than CDF.
303    ax.set_xlabel("Objectives")
304    ax.set_ylabel("Num. Entries")
305    ax.set_title("Distribution of Archive Objectives")
306    fig.savefig(filename)
307
308
309def run_evaluation(outdir, env_seed):
310    """Simulates 10 random archive solutions and saves videos of them.
311
312    Videos are saved to outdir / videos.
313
314    Args:
315        outdir (Path): Path object for the output directory from which to
316            retrieve the archive and save videos.
317        env_seed (int): Seed for the environment.
318    """
319    df = pd.read_csv(outdir / "archive.csv")
320    indices = np.random.permutation(len(df))[:10]
321
322    # Use a single env so that all the videos go to the same directory.
323    video_env = gym.wrappers.RecordVideo(
324        gym.make("LunarLander-v2", render_mode="rgb_array"),
325        video_folder=str(outdir / "videos"),
326        # This will ensure all episodes are recorded as videos.
327        episode_trigger=lambda idx: True,
328    )
329
330    for idx in indices:
331        model = np.array(df.loc[idx, "solution_0":])
332        reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
333                                                      video_env)
334        print(f"=== Index {idx} ===\n"
335              "Model:\n"
336              f"{model}\n"
337              f"Reward: {reward}\n"
338              f"Impact x-pos: {impact_x_pos}\n"
339              f"Impact y-vel: {impact_y_vel}\n")
340
341    video_env.close()
342
343
344def lunar_lander_main(workers=4,
345                      env_seed=52,
346                      iterations=500,
347                      log_freq=25,
348                      n_emitters=5,
349                      batch_size=30,
350                      sigma0=1.0,
351                      seed=None,
352                      outdir="lunar_lander_output",
353                      run_eval=False):
354    """Uses CMA-ME to train linear agents in Lunar Lander.
355
356    Args:
357        workers (int): Number of workers to use for simulations.
358        env_seed (int): Environment seed. The default gives the flat terrain
359            from the tutorial.
360        iterations (int): Number of iterations to run the algorithm.
361        log_freq (int): Number of iterations to wait before recording metrics
362            and saving heatmap.
363        n_emitters (int): Number of emitters.
364        batch_size (int): Batch size of each emitter.
365        sigma0 (float): Initial step size of each emitter.
366        seed (seed): Random seed for the pyribs components.
367        outdir (str): Directory for Lunar Lander output.
368        run_eval (bool): Pass this flag to run an evaluation of 10 random
369            solutions selected from the archive in the `outdir`.
370    """
371    outdir = Path(outdir)
372
373    if run_eval:
374        run_evaluation(outdir, env_seed)
375        return
376
377    # Make the directory here so that it is not made when running eval.
378    outdir.mkdir(exist_ok=True)
379
380    # Setup Dask. The client connects to a "cluster" running on this machine.
381    # The cluster simply manages several concurrent worker processes. If using
382    # Dask across many workers, we would set up a more complicated cluster and
383    # connect the client to it.
384    cluster = LocalCluster(
385        processes=True,  # Each worker is a process.
386        n_workers=workers,  # Create this many worker processes.
387        threads_per_worker=1,  # Each worker process is single-threaded.
388    )
389    client = Client(cluster)
390
391    # CMA-ME.
392    scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
393    metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
394
395    # Outputs.
396    scheduler.archive.as_pandas().to_csv(outdir / "archive.csv")
397    save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
398    save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
399    save_metrics(outdir, metrics)
400
401
402if __name__ == "__main__":
403    fire.Fire(lunar_lander_main)