Source code for ribs.archives._cqd_score

"""Utilities for computing CQD score."""

from __future__ import annotations

import dataclasses
from typing import Any, Literal

import numpy as np
from numpy.typing import ArrayLike

from ribs._utils import check_is_1d
from ribs.archives._archive_base import ArchiveBase
from ribs.typing import Float, Int


[docs] @dataclasses.dataclass class CQDScoreResult: """Stores the result of running :func:`cqd_score`.""" #: Number of times the score was computed. iterations: Int #: The mean score. This is the result most users will need. mean: Float #: Array of scores obtained on each iteration. scores: np.ndarray #: (iterations, n, measure_dim) array of target points passed into the function. target_points: np.ndarray #: 1D array of penalties used in the computation. If an array of penalties was #: passed in, this will be a copy of that array. penalties: np.ndarray #: Minimum objective passed into the function. obj_min: Float #: Maximum objective passed into the function. obj_max: Float #: Max distance passed into the function. dist_max: Float #: Order of the norm for distance that was passed into the function. Refer to the #: ``ord`` argument in :func:`numpy.linalg.norm` for type info. dist_ord: Any
[docs] def cqd_score( archive: ArchiveBase, *, iterations: Int, target_points: ArrayLike, penalties: Int | ArrayLike, obj_min: Float, obj_max: Float, dist_max: Float, dist_ord: Int | Float | Literal["fro", "nuc"] = None, ) -> CQDScoreResult: r"""Computes the CQD score of an archive. The Continuous Quality Diversity (CQD) score was introduced in `Kent 2022 <https://dl.acm.org/doi/10.1145/3520304.3534018>`_. Please see the :doc:`/examples/cqd_score` example for an example of how to call this function on an archive. Args: archive: Archive for which to compute the CQD score. The archive must implement the :meth:`~ribs.archives.ArchiveBase.data` method. iterations: Number of times to compute the CQD score. target_points: (iterations, n, measure_dim) array that lists n target points to use on each iteration. penalties: Number of penalty values over which to compute the score (the values are distributed evenly over the range [0,1]). Alternatively, this may be a 1D array that explicitly lists the penalty values. Known as :math:`\theta` in Kent 2022. obj_min: Minimum objective value, used when normalizing the objectives. obj_max: Maximum objective value, used when normalizing the objectives. dist_max: Maximum distance between points in measure space. dist_ord: Order of the norm to use for calculating measure space distance; this is passed to :func:`numpy.linalg.norm` as the ``ord`` argument. See :func:`numpy.linalg.norm` for possible values. The default is to use Euclidean distance (L2 norm). Returns: Object containing results of the CQD score calculations. Raises: ValueError: target_points or penalties is an array with the wrong shape. """ target_points = np.copy(target_points) # Copy since this is returned. if ( target_points.ndim != 3 or target_points.shape[0] != iterations or target_points.shape[2] != archive.measure_dim ): raise ValueError( "Expected target_points to be a 3D array with " f"shape ({iterations}, n, {archive.measure_dim}) " "(i.e. shape (iterations, n, measure_dim)) but it had " f"shape {target_points.shape}" ) if np.isscalar(penalties): penalties = np.linspace(0, 1, penalties) else: penalties = np.copy(penalties) # Copy since this is returned. check_is_1d(penalties, "penalties") objectives = archive.data("objective") measures = archive.data("measures") norm_objectives = objectives / (obj_max - obj_min) scores = np.zeros(iterations) for itr in range(iterations): # Distance calculation -- start by taking the difference between each measure i # and all the target points. distances = measures[:, None] - target_points[itr] # (len(archive), n_target_points) array of distances. distances = np.linalg.norm(distances, ord=dist_ord, axis=2) norm_distances = distances / dist_max for penalty in penalties: # Known as omega in Kent 2022 -- a (len(archive), n_target_points) array. values = norm_objectives[:, None] - penalty * norm_distances # (n_target_points,) array. max_values_per_target = np.max(values, axis=0) scores[itr] += np.sum(max_values_per_target) return CQDScoreResult( iterations=iterations, mean=np.mean(scores), scores=scores, target_points=target_points, penalties=penalties, obj_min=obj_min, obj_max=obj_max, dist_max=dist_max, dist_ord=dist_ord, )