Lunar Lander Relanded: Using Dask to Distribute Evaluations

This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.

  1"""Uses CMA-ME to train linear agents in Lunar Lander.
  2
  3This script uses the same setup as the tutorial, but it also uses Dask to
  4parallelize evaluations on a single machine and adds in a CLI. Refer to the
  5tutorial here: https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for
  6more info.
  7
  8You should not need much familiarity with Dask to read this example. However, if
  9you would like to know more about Dask, we recommend referring to the quickstart
 10for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
 11
 12This script creates an output directory (defaults to `lunar_lander_output/`, see
 13the --outdir flag) with the following files:
 14
 15    - archive.csv: The CSV representation of the final archive, obtained with
 16      as_pandas().
 17    - archive_ccdf.png: A plot showing the (unnormalized) complementary
 18      cumulative distribution function of objective values in the archive. For
 19      each objective value p on the x-axis, this plot shows the number of
 20      solutions that had an objective value of at least p.
 21    - heatmap.png: A heatmap showing the performance of solutions in the
 22      archive.
 23    - metrics.json: Metrics about the run, saved as a mapping from the metric
 24      name to a list of x values (iteration number) and a list of y values
 25      (metric value) for that metric.
 26    - {metric_name}.png: Plots of the metrics, currently just `archive_size` and
 27      `max_score`.
 28
 29In evaluation mode (--run-eval flag), the script will read in the archive from
 30the output directory and simulate 10 random solutions from the archive. It will
 31write videos of these simulations to a `videos/` subdirectory in the output
 32directory.
 33
 34Usage:
 35    # Basic usage - should take ~1 hour with 4 cores.
 36    python lunar_lander.py NUM_WORKERS
 37    # Now open the Dask dashboard at http://localhost:8787 to view worker
 38    # status.
 39
 40    # Evaluation mode. If you passed a different outdir and/or env_seed when
 41    # running the algorithm with the command above, you must pass the same
 42    # outdir and/or env_seed here.
 43    python lunar_lander.py --run-eval
 44Help:
 45    python lunar_lander.py --help
 46"""
 47import json
 48import time
 49from pathlib import Path
 50
 51import fire
 52import gym
 53import matplotlib.pyplot as plt
 54import numpy as np
 55import pandas as pd
 56from alive_progress import alive_bar
 57from dask.distributed import Client, LocalCluster
 58
 59from ribs.archives import GridArchive
 60from ribs.emitters import ImprovementEmitter
 61from ribs.schedulers import Scheduler
 62from ribs.visualize import grid_archive_heatmap
 63
 64
 65def simulate(model, seed=None, video_env=None):
 66    """Simulates the lunar lander model.
 67
 68    Args:
 69        model (np.ndarray): The array of weights for the linear policy.
 70        seed (int): The seed for the environment.
 71        video_env (gym.Env): If passed in, this will be used instead of creating
 72            a new env. This is used primarily for recording video during
 73            evaluation.
 74    Returns:
 75        total_reward (float): The reward accrued by the lander throughout its
 76            trajectory.
 77        impact_x_pos (float): The x position of the lander when it touches the
 78            ground for the first time.
 79        impact_y_vel (float): The y velocity of the lander when it touches the
 80            ground for the first time.
 81    """
 82    if video_env is None:
 83        # Since we are using multiple processes, it is simpler if each worker
 84        # just creates their own copy of the environment instead of trying to
 85        # share the environment. This also makes the function "pure."
 86        env = gym.make("LunarLander-v2")
 87    else:
 88        env = video_env
 89
 90    if seed is not None:
 91        env.seed(seed)
 92
 93    action_dim = env.action_space.n
 94    obs_dim = env.observation_space.shape[0]
 95    model = model.reshape((action_dim, obs_dim))
 96
 97    total_reward = 0.0
 98    impact_x_pos = None
 99    impact_y_vel = None
100    all_y_vels = []
101    obs = env.reset()
102    done = False
103
104    while not done:
105        action = np.argmax(model @ obs)  # Linear policy.
106        obs, reward, done, _ = env.step(action)
107        total_reward += reward
108
109        # Refer to the definition of state here:
110        # https://github.com/openai/gym/blob/master/gym/envs/box2d/lunar_lander.py#L306
111        x_pos = obs[0]
112        y_vel = obs[3]
113        leg0_touch = bool(obs[6])
114        leg1_touch = bool(obs[7])
115        all_y_vels.append(y_vel)
116
117        # Check if the lunar lander is impacting for the first time.
118        if impact_x_pos is None and (leg0_touch or leg1_touch):
119            impact_x_pos = x_pos
120            impact_y_vel = y_vel
121
122    # If the lunar lander did not land, set the x-pos to the one from the final
123    # timestep, and set the y-vel to the max y-vel (we use min since the lander
124    # goes down).
125    if impact_x_pos is None:
126        impact_x_pos = x_pos
127        impact_y_vel = min(all_y_vels)
128
129    # Only close the env if it was not a video env.
130    if video_env is None:
131        env.close()
132
133    return total_reward, impact_x_pos, impact_y_vel
134
135
136def create_scheduler(seed, n_emitters, sigma0, batch_size):
137    """Creates the Scheduler based on given configurations.
138
139    See lunar_lander_main() for description of args.
140
141    Returns:
142        A pyribs scheduler set up for CMA-ME (i.e. it has ImprovementEmitter's
143        and a GridArchive).
144    """
145    env = gym.make("LunarLander-v2")
146    action_dim = env.action_space.n
147    obs_dim = env.observation_space.shape[0]
148    initial_model = np.zeros((action_dim, obs_dim))
149    archive = GridArchive(
150        solution_dim=initial_model.size,
151        dims=[50, 50],  # 50 cells in each dimension.
152        ranges=[(-1.0, 1.0), (-3.0, 0.0)],  # (-1, 1) for x-pos and (-3, 0) for y-vel.
153        seed=seed
154    )
155
156    # If we create the emitters with identical seeds, they will all output the
157    # same initial solutions. The algorithm should still work -- eventually, the
158    # emitters will produce different solutions because they get different
159    # responses when inserting into the archive. However, using different seeds
160    # avoids this problem altogether.
161    seeds = ([None] * n_emitters
162             if seed is None else [seed + i for i in range(n_emitters)])
163
164    emitters = [
165        ImprovementEmitter(
166            archive,
167            initial_model.flatten(),
168            sigma0=sigma0,
169            batch_size=batch_size,
170            seed=s,
171        ) for s in seeds
172    ]
173
174    scheduler = Scheduler(archive, emitters)
175    return scheduler
176
177
178def run_search(client, scheduler, env_seed, iterations, log_freq):
179    """Runs the QD algorithm for the given number of iterations.
180
181    Args:
182        client (Client): A Dask client providing access to workers.
183        scheduler (Scheduler): pyribs scheduler.
184        env_seed (int): Seed for the environment.
185        iterations (int): Iterations to run.
186        log_freq (int): Number of iterations to wait before recording metrics.
187    Returns:
188        dict: A mapping from various metric names to a list of "x" and "y"
189        values where x is the iteration and y is the value of the metric. Think
190        of each entry as the x's and y's for a matplotlib plot.
191    """
192    print(
193        "> Starting search.\n"
194        "  - Open Dask's dashboard at http://localhost:8787 to monitor workers."
195    )
196
197    metrics = {
198        "Max Score": {
199            "x": [],
200            "y": [],
201        },
202        "Archive Size": {
203            "x": [0],
204            "y": [0],
205        },
206    }
207
208    start_time = time.time()
209    with alive_bar(iterations) as progress:
210        for itr in range(1, iterations + 1):
211            # Request models from the scheduler.
212            sols = scheduler.ask()
213
214            # Evaluate the models and record the objectives and BCs.
215            objs, bcs = [], []
216
217            # Ask the Dask client to distribute the simulations among the Dask
218            # workers, then gather the results of the simulations.
219            futures = client.map(lambda model: simulate(model, env_seed), sols)
220            results = client.gather(futures)
221
222            # Process the results.
223            for obj, impact_x_pos, impact_y_vel in results:
224                objs.append(obj)
225                bcs.append([impact_x_pos, impact_y_vel])
226
227            # Send the results back to the scheduler.
228            scheduler.tell(objs, bcs)
229
230            # Logging.
231            progress()
232            if itr % log_freq == 0 or itr == iterations:
233                elapsed_time = time.time() - start_time
234                metrics["Max Score"]["x"].append(itr)
235                metrics["Max Score"]["y"].append(
236                    scheduler.archive.stats.obj_max)
237                metrics["Archive Size"]["x"].append(itr)
238                metrics["Archive Size"]["y"].append(len(scheduler.archive))
239                print(f"> {itr} itrs completed after {elapsed_time:.2f} s")
240                print(f"  - Max Score: {metrics['Max Score']['y'][-1]}")
241                print(f"  - Archive Size: {metrics['Archive Size']['y'][-1]}")
242
243    return metrics
244
245
246def save_heatmap(archive, filename):
247    """Saves a heatmap of the scheduler's archive to the filename.
248
249    Args:
250        archive (GridArchive): Archive with results from an experiment.
251        filename (str): Path to an image file.
252    """
253    fig, ax = plt.subplots(figsize=(8, 6))
254    grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
255    ax.invert_yaxis()  # Makes more sense if larger velocities are on top.
256    ax.set_ylabel("Impact y-velocity")
257    ax.set_xlabel("Impact x-position")
258    fig.savefig(filename)
259
260
261def save_metrics(outdir, metrics):
262    """Saves metrics to png plots and a JSON file.
263
264    Args:
265        outdir (Path): output directory for saving files.
266        metrics (dict): Metrics as output by run_search.
267    """
268    # Plots.
269    for metric in metrics:
270        fig, ax = plt.subplots()
271        ax.plot(metrics[metric]["x"], metrics[metric]["y"])
272        ax.set_title(metric)
273        ax.set_xlabel("Iteration")
274        fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
275
276    # JSON file.
277    with (outdir / "metrics.json").open("w") as file:
278        json.dump(metrics, file, indent=2)
279
280
281def save_ccdf(archive, filename):
282    """Saves a CCDF showing the distribution of the archive's objective values.
283
284    CCDF = Complementary Cumulative Distribution Function (see
285    https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
286    The CCDF plotted here is not normalized to the range (0,1). This may help
287    when comparing CCDF's among archives with different amounts of coverage
288    (i.e. when one archive has more cells filled).
289
290    Args:
291        archive (GridArchive): Archive with results from an experiment.
292        filename (str): Path to an image file.
293    """
294    fig, ax = plt.subplots()
295    ax.hist(
296        archive.as_pandas(include_solutions=False)["objective"],
297        50,  # Number of cells.
298        histtype="step",
299        density=False,
300        cumulative=-1)  # CCDF rather than CDF.
301    ax.set_xlabel("Objective Value")
302    ax.set_ylabel("Num. Entries")
303    ax.set_title("Distribution of Archive Objective Values")
304    fig.savefig(filename)
305
306
307def run_evaluation(outdir, env_seed):
308    """Simulates 10 random archive solutions and saves videos of them.
309
310    Videos are saved to outdir / videos.
311
312    Args:
313        outdir (Path): Path object for the output directory from which to
314            retrieve the archive and save videos.
315        env_seed (int): Seed for the environment.
316    """
317    df = pd.read_csv(outdir / "archive.csv")
318    indices = np.random.permutation(len(df))[:10]
319
320    # Use a single env so that all the videos go to the same directory.
321    video_env = gym.wrappers.Monitor(
322        gym.make("LunarLander-v2"),
323        str(outdir / "videos"),
324        force=True,
325        # Default is to write the video for "cubic" episodes -- 0,1,8,etc (see
326        # https://github.com/openai/gym/blob/master/gym/wrappers/monitor.py#L54).
327        # This will ensure all the videos are written.
328        video_callable=lambda idx: True,
329    )
330
331    for idx in indices:
332        model = np.array(df.loc[idx, "solution_0":])
333        reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
334                                                      video_env)
335        print(f"=== Index {idx} ===\n"
336              "Model:\n"
337              f"{model}\n"
338              f"Reward: {reward}\n"
339              f"Impact x-pos: {impact_x_pos}\n"
340              f"Impact y-vel: {impact_y_vel}\n")
341
342    video_env.close()
343
344
345def lunar_lander_main(workers=4,
346                      env_seed=1339,
347                      iterations=500,
348                      log_freq=25,
349                      n_emitters=5,
350                      batch_size=30,
351                      sigma0=1.0,
352                      seed=None,
353                      outdir="lunar_lander_output",
354                      run_eval=False):
355    """Uses CMA-ME to train linear agents in Lunar Lander.
356
357    Args:
358        workers (int): Number of workers to use for simulations.
359        env_seed (int): Environment seed. The default gives the flat terrain
360            from the tutorial.
361        iterations (int): Number of iterations to run the algorithm.
362        log_freq (int): Number of iterations to wait before recording metrics
363            and saving heatmap.
364        n_emitters (int): Number of emitters.
365        batch_size (int): Batch size of each emitter.
366        sigma0 (float): Initial step size of each emitter.
367        seed (seed): Random seed for the pyribs components.
368        outdir (str): Directory for Lunar Lander output.
369        run_eval (bool): Pass this flag to run an evaluation of 10 random
370            solutions selected from the archive in the `outdir`.
371    """
372    outdir = Path(outdir)
373    outdir.mkdir(exist_ok=True)
374
375    if run_eval:
376        run_evaluation(outdir, env_seed)
377        return
378
379    # Setup Dask. The client connects to a "cluster" running on this machine.
380    # The cluster simply manages several concurrent worker processes. If using
381    # Dask across many workers, we would set up a more complicated cluster and
382    # connect the client to it.
383    cluster = LocalCluster(
384        processes=True,  # Each worker is a process.
385        n_workers=workers,  # Create this many worker processes.
386        threads_per_worker=1,  # Each worker process is single-threaded.
387    )
388    client = Client(cluster)
389
390    # CMA-ME.
391    scheduler = create_scheduler(seed, n_emitters, sigma0, batch_size)
392    metrics = run_search(client, scheduler, env_seed, iterations, log_freq)
393
394    # Outputs.
395    scheduler.archive.as_pandas().to_csv(outdir / "archive.csv")
396    save_ccdf(scheduler.archive, str(outdir / "archive_ccdf.png"))
397    save_heatmap(scheduler.archive, str(outdir / "heatmap.png"))
398    save_metrics(outdir, metrics)
399
400
401if __name__ == "__main__":
402    fire.Fire(lunar_lander_main)