Lunar Lander Relanded: Using Dask to Distribute EvaluationsΒΆ
This example extends the Lunar Lander tutorial by using Dask to distribute evaluations and thus speed things up. It also adds in more logging capabilities and a CLI.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 | """Uses CMA-ME to train linear agents in Lunar Lander.
This script uses the same setup as the tutorial, but it also uses Dask to
parallelize evaluations on a single machine and adds in a CLI. Refer to the
tutorial here: https://docs.pyribs.org/en/stable/tutorials/lunar_lander.html for
more info.
You should not need much familiarity with Dask to read this example. However, if
you would like to know more about Dask, we recommend referring to the quickstart
for Dask distributed: https://distributed.dask.org/en/latest/quickstart.html.
This script creates an output directory (defaults to `lunar_lander_output/`, see
the --outdir flag) with the following files:
- archive.csv: The CSV representation of the final archive, obtained with
as_pandas().
- archive_ccdf.png: A plot showing the (unnormalized) complementary
cumulative distribution function of objective values in the archive. For
each objective value p on the x-axis, this plot shows the number of
solutions that had an objective value of at least p.
- heatmap.png: A heatmap showing the performance of solutions in the
archive.
- metrics.json: Metrics about the run, saved as a mapping from the metric
name to a list of x values (iteration number) and a list of y values
(metric value) for that metric.
- {metric_name}.png: Plots of the metrics, currently just `archive_size` and
`max_score`.
In evaluation mode (--run-eval flag), the script will read in the archive from
the output directory and simulate 10 random solutions from the archive. It will
write videos of these simulations to a `videos/` subdirectory in the output
directory.
Usage:
# Basic usage - should take ~1 hour with 4 cores.
python lunar_lander.py NUM_WORKERS
# Now open the Dask dashboard at http://localhost:8787 to view worker
# status.
# Evaluation mode. If you passed a different outdir and/or env_seed when
# running the algorithm with the command above, you must pass the same
# outdir and/or env_seed here.
python lunar_lander.py --run-eval
Help:
python lunar_lander.py --help
"""
import json
import time
from pathlib import Path
import fire
import gym
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from alive_progress import alive_bar
from dask.distributed import Client, LocalCluster
from ribs.archives import GridArchive
from ribs.emitters import ImprovementEmitter
from ribs.optimizers import Optimizer
from ribs.visualize import grid_archive_heatmap
def simulate(model, seed=None, video_env=None):
"""Simulates the lunar lander model.
Args:
model (np.ndarray): The array of weights for the linear policy.
seed (int): The seed for the environment.
video_env (gym.Env): If passed in, this will be used instead of creating
a new env. This is used primarily for recording video during
evaluation.
Returns:
total_reward (float): The reward accrued by the lander throughout its
trajectory.
impact_x_pos (float): The x position of the lander when it touches the
ground for the first time.
impact_y_vel (float): The y velocity of the lander when it touches the
ground for the first time.
"""
if video_env is None:
# Since we are using multiple processes, it is simpler if each worker
# just creates their own copy of the environment instead of trying to
# share the environment. This also makes the function "pure."
env = gym.make("LunarLander-v2")
else:
env = video_env
if seed is not None:
env.seed(seed)
action_dim = env.action_space.n
obs_dim = env.observation_space.shape[0]
model = model.reshape((action_dim, obs_dim))
total_reward = 0.0
impact_x_pos = None
impact_y_vel = None
all_y_vels = []
obs = env.reset()
done = False
while not done:
action = np.argmax(model @ obs) # Linear policy.
obs, reward, done, _ = env.step(action)
total_reward += reward
# Refer to the definition of state here:
# https://github.com/openai/gym/blob/master/gym/envs/box2d/lunar_lander.py#L306
x_pos = obs[0]
y_vel = obs[3]
leg0_touch = bool(obs[6])
leg1_touch = bool(obs[7])
all_y_vels.append(y_vel)
# Check if the lunar lander is impacting for the first time.
if impact_x_pos is None and (leg0_touch or leg1_touch):
impact_x_pos = x_pos
impact_y_vel = y_vel
# If the lunar lander did not land, set the x-pos to the one from the final
# timestep, and set the y-vel to the max y-vel (we use min since the lander
# goes down).
if impact_x_pos is None:
impact_x_pos = x_pos
impact_y_vel = min(all_y_vels)
# Only close the env if it was not a video env.
if video_env is None:
env.close()
return total_reward, impact_x_pos, impact_y_vel
def create_optimizer(seed, n_emitters, sigma0, batch_size):
"""Creates the Optimizer based on given configurations.
See lunar_lander_main() for description of args.
Returns:
A pyribs optimizer set up for CMA-ME (i.e. it has ImprovementEmitter's
and a GridArchive).
"""
env = gym.make("LunarLander-v2")
action_dim = env.action_space.n
obs_dim = env.observation_space.shape[0]
archive = GridArchive(
[50, 50], # 50 bins in each dimension.
[(-1.0, 1.0), (-3.0, 0.0)], # (-1, 1) for x-pos and (-3, 0) for y-vel.
seed=seed,
)
# If we create the emitters with identical seeds, they will all output the
# same initial solutions. The algorithm should still work -- eventually, the
# emitters will produce different solutions because they get different
# responses when inserting into the archive. However, using different seeds
# avoids this problem altogether.
seeds = ([None] * n_emitters
if seed is None else [seed + i for i in range(n_emitters)])
initial_model = np.zeros((action_dim, obs_dim))
emitters = [
ImprovementEmitter(
archive,
initial_model.flatten(),
sigma0=sigma0,
batch_size=batch_size,
seed=s,
) for s in seeds
]
optimizer = Optimizer(archive, emitters)
return optimizer
def run_search(client, optimizer, env_seed, iterations, log_freq):
"""Runs the QD algorithm for the given number of iterations.
Args:
client (Client): A Dask client providing access to workers.
optimizer (Optimizer): pyribs optimizer.
env_seed (int): Seed for the environment.
iterations (int): Iterations to run.
log_freq (int): Number of iterations to wait before recording metrics.
Returns:
dict: A mapping from various metric names to a list of "x" and "y"
values where x is the iteration and y is the value of the metric. Think
of each entry as the x's and y's for a matplotlib plot.
"""
print(
"> Starting search.\n"
" - Open Dask's dashboard at http://localhost:8787 to monitor workers."
)
metrics = {
"Max Score": {
"x": [],
"y": [],
},
"Archive Size": {
"x": [0],
"y": [0],
},
}
start_time = time.time()
with alive_bar(iterations) as progress:
for itr in range(1, iterations + 1):
# Request models from the optimizer.
sols = optimizer.ask()
# Evaluate the models and record the objectives and BCs.
objs, bcs = [], []
# Ask the Dask client to distribute the simulations among the Dask
# workers, then gather the results of the simulations.
futures = client.map(lambda model: simulate(model, env_seed), sols)
results = client.gather(futures)
# Process the results.
for obj, impact_x_pos, impact_y_vel in results:
objs.append(obj)
bcs.append([impact_x_pos, impact_y_vel])
# Send the results back to the optimizer.
optimizer.tell(objs, bcs)
# Logging.
progress()
if itr % log_freq == 0 or itr == iterations:
elapsed_time = time.time() - start_time
metrics["Max Score"]["x"].append(itr)
metrics["Max Score"]["y"].append(
optimizer.archive.stats.obj_max)
metrics["Archive Size"]["x"].append(itr)
metrics["Archive Size"]["y"].append(len(optimizer.archive))
print(f"> {itr} itrs completed after {elapsed_time:.2f} s")
print(f" - Max Score: {metrics['Max Score']['y'][-1]}")
print(f" - Archive Size: {metrics['Archive Size']['y'][-1]}")
return metrics
def save_heatmap(archive, filename):
"""Saves a heatmap of the optimizer's archive to the filename.
Args:
archive (GridArchive): Archive with results from an experiment.
filename (str): Path to an image file.
"""
fig, ax = plt.subplots(figsize=(8, 6))
grid_archive_heatmap(archive, vmin=-300, vmax=300, ax=ax)
ax.invert_yaxis() # Makes more sense if larger velocities are on top.
ax.set_ylabel("Impact y-velocity")
ax.set_xlabel("Impact x-position")
fig.savefig(filename)
def save_metrics(outdir, metrics):
"""Saves metrics to png plots and a JSON file.
Args:
outdir (Path): output directory for saving files.
metrics (dict): Metrics as output by run_search.
"""
# Plots.
for metric in metrics:
fig, ax = plt.subplots()
ax.plot(metrics[metric]["x"], metrics[metric]["y"])
ax.set_title(metric)
ax.set_xlabel("Iteration")
fig.savefig(str(outdir / f"{metric.lower().replace(' ', '_')}.png"))
# JSON file.
with (outdir / "metrics.json").open("w") as file:
json.dump(metrics, file, indent=2)
def save_ccdf(archive, filename):
"""Saves a CCDF showing the distribution of the archive's objective values.
CCDF = Complementary Cumulative Distribution Function (see
https://en.wikipedia.org/wiki/Cumulative_distribution_function#Complementary_cumulative_distribution_function_(tail_distribution)).
The CCDF plotted here is not normalized to the range (0,1). This may help
when comparing CCDF's among archives with different amounts of coverage
(i.e. when one archive has more bins filled).
Args:
archive (GridArchive): Archive with results from an experiment.
filename (str): Path to an image file.
"""
fig, ax = plt.subplots()
ax.hist(
archive.as_pandas(include_solutions=False)["objective"],
50, # Number of bins.
histtype="step",
density=False,
cumulative=-1) # CCDF rather than CDF.
ax.set_xlabel("Objective Value")
ax.set_ylabel("Num. Entries")
ax.set_title("Distribution of Archive Objective Values")
fig.savefig(filename)
def run_evaluation(outdir, env_seed):
"""Simulates 10 random archive solutions and saves videos of them.
Videos are saved to outdir / videos.
Args:
outdir (Path): Path object for the output directory from which to
retrieve the archive and save videos.
env_seed (int): Seed for the environment.
"""
df = pd.read_csv(outdir / "archive.csv")
indices = np.random.permutation(len(df))[:10]
# Use a single env so that all the videos go to the same directory.
video_env = gym.wrappers.Monitor(
gym.make("LunarLander-v2"),
str(outdir / "videos"),
force=True,
# Default is to write the video for "cubic" episodes -- 0,1,8,etc (see
# https://github.com/openai/gym/blob/master/gym/wrappers/monitor.py#L54).
# This will ensure all the videos are written.
video_callable=lambda idx: True,
)
for idx in indices:
model = np.array(df.loc[idx, "solution_0":])
reward, impact_x_pos, impact_y_vel = simulate(model, env_seed,
video_env)
print(f"=== Index {idx} ===\n"
"Model:\n"
f"{model}\n"
f"Reward: {reward}\n"
f"Impact x-pos: {impact_x_pos}\n"
f"Impact y-vel: {impact_y_vel}\n")
video_env.close()
def lunar_lander_main(workers=4,
env_seed=1339,
iterations=500,
log_freq=25,
n_emitters=5,
batch_size=30,
sigma0=1.0,
seed=None,
outdir="lunar_lander_output",
run_eval=False):
"""Uses CMA-ME to train linear agents in Lunar Lander.
Args:
workers (int): Number of workers to use for simulations.
env_seed (int): Environment seed. The default gives the flat terrain
from the tutorial.
iterations (int): Number of iterations to run the algorithm.
log_freq (int): Number of iterations to wait before recording metrics
and saving heatmap.
n_emitters (int): Number of emitters.
batch_size (int): Batch size of each emitter.
sigma0 (float): Initial step size of each emitter.
seed (seed): Random seed for the pyribs components.
outdir (str): Directory for Lunar Lander output.
run_eval (bool): Pass this flag to run an evaluation of 10 random
solutions selected from the archive in the `outdir`.
"""
outdir = Path(outdir)
outdir.mkdir(exist_ok=True)
if run_eval:
run_evaluation(outdir, env_seed)
return
# Setup Dask. The client connects to a "cluster" running on this machine.
# The cluster simply manages several concurrent worker processes. If using
# Dask across many workers, we would set up a more complicated cluster and
# connect the client to it.
cluster = LocalCluster(
processes=True, # Each worker is a process.
n_workers=workers, # Create this many worker processes.
threads_per_worker=1, # Each worker process is single-threaded.
)
client = Client(cluster)
# CMA-ME.
optimizer = create_optimizer(seed, n_emitters, sigma0, batch_size)
metrics = run_search(client, optimizer, env_seed, iterations, log_freq)
# Outputs.
optimizer.archive.as_pandas().to_csv(outdir / "archive.csv")
save_ccdf(optimizer.archive, str(outdir / "archive_ccdf.png"))
save_heatmap(optimizer.archive, str(outdir / "heatmap.png"))
save_metrics(outdir, metrics)
if __name__ == "__main__":
fire.Fire(lunar_lander_main)
|