Source code for predtuner.modeledapp

import abc
import json
import logging
import os
import pickle
from pathlib import Path
from typing import Callable, Dict, Iterator, List, Optional, Tuple, Type, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch

from ._logging import PathLike
from .approxapp import ApproxApp, ApproxKnob, ApproxTuner, Config, KnobsT

msg_logger = logging.getLogger(__name__)


[docs]class ModeledApp(ApproxApp, abc.ABC): """Like `.approxapp.ApproxApp`, but uses a model for QoS/cost measurement. To use this class, inherit from it and implement `get_models`, `empirical_measure_qos_cost`, and `.approxapp.ApproxApp.name`. (This class provides an implementation of `.approxapp.ApproxApp.measure_qos_cost`.) :param op_knobs: a mapping from each operator (identified by str) to a list of applicable knobs. :type op_knobs: Dict[str, List[ApproxKnob]] :param target_device: the target device that this application should be tuned on. See `.approxapp.ApproxApp` constructor. :type target_device: Optional[str] """ def __init__( self, op_knobs: Dict[str, List[ApproxKnob]], target_device: str = None ) -> None: super().__init__(op_knobs, target_device) models = self.get_models() self._name_to_model = {m.name: m for m in models} if len(self._name_to_model) != len(models): raise ValueError("Name conflict in models") self._cost_models = { model.name: model for model in models if isinstance(model, ICostModel) } self._qos_models = { model.name: model for model in models if isinstance(model, IQoSModel) }
[docs] @abc.abstractmethod def get_models(self) -> List[Union["ICostModel", "IQoSModel"]]: """A list of QoS/Cost prediction models for this application. Cost models should inherit from `ICostModel` while QoS models should inherit from `IQoSModel`. :rtype: List[Union[ICostModel, IQoSModel]] """ pass
[docs] @abc.abstractmethod def empirical_measure_qos_cost( self, with_approxes: KnobsT, is_test: bool ) -> Tuple[float, float]: """Empirically measures QoS and cost by actually running the program with approximation (as opposed to using model). :param with_approxes: The approximation configuration to measure QoS and cost for. :param is_test: If True, uses a "test" dataset/mode that is held away from the tuner during tuning. """
[docs] def measure_qos_cost( self, with_approxes: KnobsT, is_test: bool, qos_model: Optional[str] = None, cost_model: Optional[str] = None, ) -> Tuple[float, float]: """Returns the QoS and cost (time, energy, ...) of a given configuration, *potentially using models*. If either of `cost_model` or `qos_model` is None, this will perform empirical measurement once to get the one that is not using a model. Otherwise, no empirical measurement will be used. Note that when running on test set (``is_test == True``), no modeling is allowed (this raises a `ValueError`). :param with_approxes: The approximation configuration to measure QoS and cost for. :param is_test: If True, uses a "test" dataset/mode that is held away from the tuner during tuning; otherwise use "tune" dataset. :param qos_model: The QoS model to use in this measurement, keyed by model's name (See `IQoSModel.name`). :param cost_model: The Cost model to use in this measurement, keyed by model's name (See `ICostModel.name`). """ # Testset measurement is always empirical if is_test: if qos_model is not None or cost_model is not None: raise ValueError("Test dataset measurement is always empirical") return self.empirical_measure_qos_cost(with_approxes, is_test) # Run empirical measurement once if either cost or qos needs it qos, cost = None, None if qos_model is None or cost_model is None: qos, cost = self.empirical_measure_qos_cost(with_approxes, is_test) # If we're asked to use some qos_model, overwrite `qos` value # even if we already get it from empirical measure (i.e., even if cost_model is None) if qos_model is not None: if qos_model not in self._qos_models: raise ValueError( f'"{qos_model}" is an invalid value for qos_model ' f"(choose from {list(self._qos_models.keys())})" ) qos = self._qos_models[qos_model].measure_qos(with_approxes) # Same goes for cost if cost_model is not None: if cost_model not in self._cost_models: raise ValueError( f'"{cost_model}" is an invalid value for cost_model ' f"(choose from {list(self._cost_models.keys())})" ) cost = self._cost_models[cost_model].measure_cost(with_approxes) assert type(qos) is float and type(cost) is float return qos, cost
[docs] def get_tuner(self) -> "ApproxModeledTuner": """Sets up an ApproxTuner instance which the user can directly call `tune()` on with opentuner parameters. This returns an `ApproxModeledTuner`, different from `.approxapp.ApproxApp.get_tuner` which returns an `ApproxTuner`. :rtype: ApproxModeledTuner """ return ApproxModeledTuner(self)
def init_model(self, model_name: str): self._name_to_model[model_name]._init()
[docs]class ICostModel(abc.ABC): """Abstract base class for models that provide cost prediction.""" def __init__(self) -> None: self._inited = False @property @abc.abstractmethod def name(self) -> str: """Name of model.""" pass
[docs] @abc.abstractmethod def measure_cost(self, with_approxes: KnobsT) -> float: """Predict the cost of application. :param with_approxes: The configuration to predict cost for. """ pass
def _init(self): """Initialize the model before the first prediction task (profiling, etc.)""" self._inited = True
[docs]class IQoSModel(abc.ABC): """Abstract base class for models that provide QoS prediction.""" def __init__(self) -> None: self._inited = False @property @abc.abstractmethod def name(self) -> str: """Name of model.""" pass
[docs] @abc.abstractmethod def measure_qos(self, with_approxes: KnobsT) -> float: """Predict the QoS of application. :param with_approxes: The configuration to predict QoS for. """ pass
def _init(self): """Initialize the model before the first prediction task (profiling, etc.)""" self._inited = True
[docs]class LinearCostModel(ICostModel): """Weighted linear cost predictor based on cost of each operator. This predictor compute a weighted sum over the cost of each operator and the speedup of each knob on that operator. :param app: The `ModeledApp` to predict cost for. :param op_costs: A mapping from operator name to its (baseline) cost. :param knob_speedups: A mapping from knob name to its (expected) speedup. """ def __init__( self, app: ModeledApp, op_costs: Dict[str, float], knob_speedups: Dict[str, float], ) -> None: import numpy as np import pandas as pd super().__init__() self.app = app knob_cost_factor_v = 1 / np.array(list(knob_speedups.values())) layer_cost_v = np.array(list(op_costs.values())) costs = np.outer(layer_cost_v, knob_cost_factor_v) self.cost_df = pd.DataFrame( costs, index=op_costs.keys(), columns=knob_speedups.keys(), dtype=float ) @property def name(self) -> str: return "cost_linear" def measure_cost(self, with_approxes: KnobsT) -> float: with_approxes = self.app.add_baseline_to_knobs(with_approxes) return float( sum(self.cost_df.loc[layer, knob] for layer, knob in with_approxes.items()) )
[docs]class QoSModelP1(IQoSModel): """QoS model `P1` in ApproxTuner. :param app: The `ModeledApp` to predict QoS for. :param tensor_output_getter: A function that can run the tensor-based application with a config and return a single tensor result. Note that here we require the return value to be a PyTorch tensor. :param qos_metric: A function that compute a QoS level from the return value of `tensor_output_getter`. :param storage: A file of PyTorch format to store this model into, if the file doesn't exist, or load the model from if the file exists. If not given, the model will not be stored. """ def __init__( self, app: ModeledApp, tensor_output_getter: Callable[[KnobsT], torch.Tensor], qos_metric: Callable[[torch.Tensor], float], storage: PathLike = None, ) -> None: from torch.nn.functional import softmax super().__init__() self.app = app self.output_f = tensor_output_getter self.qos_metric = qos_metric self.storage = Path(storage) if storage else None self.delta_tensors = { op: {k.name: None for k in self.app.knobs} for op in self.app.ops } self.baseline_tensor = None @property def name(self) -> str: return "qos_p1" def measure_qos(self, with_approxes: KnobsT) -> float: assert self.baseline_tensor is not None with_approxes = self.app.add_baseline_to_knobs(with_approxes) delta_sum = sum( [self.delta_tensors[op][knob] for op, knob in with_approxes.items()] ) ret = delta_sum + self.baseline_tensor return float(self.qos_metric(ret)) def _init(self): if self.storage and self.storage.is_file(): self.delta_tensors, self.baseline_tensor = torch.load(self.storage) dt = self.delta_tensors btensor = self.baseline_tensor = self.output_f({}) updated = False for op, knob in barred_ravel_knobs(self.app): if dt[op][knob] is not None: continue updated = True delta_tensor = self.output_f({op: knob}) - btensor dt[op][knob] = delta_tensor if self.storage and updated: os.makedirs(self.storage.parent, exist_ok=True) torch.save((dt, btensor), self.storage) super()._init()
[docs]class QoSModelP2(IQoSModel): """QoS model `P1` in ApproxTuner. :param app: The `ModeledApp` to predict QoS for. :param storage: A JSON file to store this model into, if the file doesn't exist, or load the model from if the file exists. If not given, the model will not be stored. """ def __init__(self, app: ModeledApp, storage: PathLike = None) -> None: super().__init__() self.app = app self.storage = Path(storage) if storage else None self.qos_df = None self.baseline_qos = None @property def name(self) -> str: return "qos_p2" def _empirical_measure_qos(self, with_approxes: KnobsT) -> float: """An internal QoS-measuring method. The point is P2 queries some QoS results and caches them before tuning starts, and then defines a `measure_qos` that doesn't run the application during tuning (to reduce overhead). """ qos, _ = self.app.empirical_measure_qos_cost(with_approxes, False) return qos def measure_qos(self, with_approxes: KnobsT) -> float: assert self.baseline_qos is not None and self.qos_df is not None with_approxes = self.app.add_baseline_to_knobs(with_approxes) delta_qoses = ( np.array([self.qos_df.loc[kv] for kv in with_approxes.items()]) - self.baseline_qos ) ret = delta_qoses.sum() + self.baseline_qos assert not np.isnan(ret) return float(ret) def _init(self): if self.storage and self.storage.is_file(): self._load(self.storage) else: knob_names = [k.name for k in self.app.knobs] self.qos_df = pd.DataFrame(index=self.app.ops, columns=knob_names) self.qos_df = self.qos_df.where(pd.notnull(self.qos_df), None) self.baseline_qos = self._empirical_measure_qos({}) df = self.qos_df for op, knob in barred_ravel_knobs(self.app): if df.loc[op, knob] is not None: continue df.loc[op, knob] = self._empirical_measure_qos({op: knob}) if self.storage and not self.storage.is_file(): self._save(self.storage) super()._init() def _load(self, path: Path): with path.open() as f: data = json.load(f) if "app_name" in data: name = data["app_name"] if self.app.name != name: msg_logger.error( f'Profile at {path} belongs to app "{name}" ' f"while our app is {self.app.name}" ) else: msg_logger.warning("Loaded profile does not have app name identifier") msg_logger.info(f"Model {self.name} loaded saved model at {path}") self.qos_df = pd.DataFrame(data["df"]) self.baseline_qos = float(data["bqos"]) def _save(self, path: Path): if not path.parent.is_dir(): os.makedirs(path.parent) with path.open("w") as f: json.dump( { "app_name": self.app.name, "df": self.qos_df.to_dict(), "bqos": self.baseline_qos, }, f, )
[docs]class ValConfig(Config): """An `.approxapp.Config` that also optionally stores the "validation QoS". Validation QoS is the empirically measured QoS in the "validation phase" at the end of tuning (see `ApproxModeledTuner.tune`). :param qos: The maybe-predicted QoS of this config. (If tuning is empirical then this is empirical, not predicted, QoS.) This is in contrast to `Config.qos`, which is always empirically measured on tuning dataset. :param cost: The *relative* cost (time, energy, etc.) of this config compared to the baseline config. This is essentially :math:`1 / speedup`. :param knobs: The op-knob mapping in this configuration. :param test_qos: The empirically measured QoS of this config on test mode. :param validated_qos: The empirically measured QoS of this config on tuning mode, in the validation phase. See `ApproxModeledTuner.tune`. """ def __init__( self, qos: float, cost: float, knobs: KnobsT, test_qos: Optional[float] = None, validated_qos: Optional[float] = None, ) -> None: super().__init__(qos, cost, knobs, test_qos) self.validated_qos = validated_qos
[docs]class ApproxModeledTuner(ApproxTuner): app: ModeledApp
[docs] def tune( self, max_iter: int, qos_tuner_threshold: float, qos_keep_threshold: Optional[float] = None, is_threshold_relative: bool = False, take_best_n: Optional[int] = None, test_configs: bool = True, validate_configs: Optional[bool] = None, cost_model: Optional[str] = None, qos_model: Optional[str] = None, ) -> List[ValConfig]: """Runs a tuning session. :param max_iter: Number of iterations to use in tuning. :param qos_tuner_threshold: The QoS threshold that the tuner should aim for. QoS is assumed to be a higher-better quantity. This should be slightly tighter than `qos_keep_threshold` to account for extra error when running on test dataset. :param qos_keep_threshold: The QoS threshold beyond which we will keep the configuration. By default it is equal to `qos_keep_threshold`. :param is_threshold_relative: If True, the actual thresholds are considered to be ``baseline_qos - given_threshold``. This applies to `qos_tuner_threshold` and `qos_keep_threshold`. :param take_best_n: Take the best :math:`n` configurations after tuning. "Best" is defined as the configurations closest to the pareto curve of the QoS-cost tradeoff space. If `take_best_n` is None, only the configurations strictly on the pareto curve are taken. :param test_configs: If True, runs the configs on the test dataset, filter the taken configs by `qos_keep_threshold`, and fill the `test_qos` field of `ValConfig`. :param validate_configs: If True, runs a validation step that empirically measures the QoS of configs, filter the taken configs by `qos_keep_threshold`, and fill the `validated_qos` field of `ValConfig`. :param cost_model: The cost model to use for this tuning session. :param qos_model: The QoS model to use for this tuning session. This and `cost_model` are relayed down the line to `ModeledApp.measure_qos_cost`. """ qos_desc = ( "no model for qos" if qos_model is None else f'qos model "{qos_model}"' ) cost_desc = ( "no model for cost" if cost_model is None else f'cost model "{cost_model}"' ) msg_logger.info("Starting tuning with %s and %s", qos_desc, cost_desc) if qos_model is not None: msg_logger.info("Initializing qos model %s", qos_model) self.app.init_model(qos_model) if cost_model is not None: msg_logger.info("Initializing cost model %s", cost_model) self.app.init_model(cost_model) super().tune( max_iter=max_iter, qos_tuner_threshold=qos_tuner_threshold, qos_keep_threshold=qos_keep_threshold, is_threshold_relative=is_threshold_relative, take_best_n=take_best_n, test_configs=False, # Test configs below by ourselves app_kwargs={"cost_model": cost_model, "qos_model": qos_model}, ) if validate_configs is None and qos_model is not None: msg_logger.info( 'Validating configurations due to using qos model "%s"', qos_model ) self._update_configs_(self.best_configs_prefilter, False) elif validate_configs: msg_logger.info("Validating configurations as user requested") self._update_configs_(self.best_configs_prefilter, False) if test_configs: msg_logger.info("Calibrating configurations on test inputs") self._update_configs_(self.best_configs_prefilter, True) self.best_configs = self._filter_configs(self.best_configs_prefilter) return self.best_configs
def _update_configs_(self, configs: List[ValConfig], test_mode: bool): from tqdm import tqdm if not configs: msg_logger.info("No configurations found.") return ret_configs = [] total_error = 0 for cfg in tqdm(configs, leave=False): qos, _ = self.app.measure_qos_cost(cfg.knobs, test_mode) if test_mode: assert cfg.test_qos is None cfg.test_qos = qos msg_logger.debug(f"Test: {cfg.qos} (mean) -> {qos} (mean)") else: assert cfg.validated_qos is None cfg.validated_qos = qos msg_logger.debug(f"Validation: {cfg.qos} (mean) -> {qos} (mean)") total_error += abs(cfg.qos - qos) mean_err = total_error / len(configs) dataset_name = "test" if test_mode else "tune" msg_logger.info( "QoS changed by %f on %s dataset (mean abs diff)", mean_err, dataset_name ) def _filter_configs(self, configs: List[ValConfig]): ret_configs = [ cfg for cfg in configs if (not cfg.validated_qos or cfg.validated_qos >= self.tune_keep_threshold) and cfg.test_qos >= self.test_keep_threshold ] msg_logger.info( "%d of %d configs remain after validation and testing", len(ret_configs), len(configs), ) return ret_configs
[docs] def plot_configs( self, show_qos_loss: bool = False, connect_best_points: bool = False, ) -> plt.Figure: """Plots 1 to 3 QoS-vs-speedup scatter plot of configurations. All kept configurations and all "best" configurations (before test-set filtering if any) are always plotted in the first subplot. If there was a validation phase during tuning, the second subplot contains the "best" configurations plotted twice, with predicted and empirically measured QoS (on tune set) respectively. If both validation and test-set filtering was used, the last subplot contains the "best" configurations with *empirically measured* tune-set and test-set QoS loss respectively. :param show_qos_loss: If True, uses the loss of QoS (compared to the baseline) instead of the absolute QoS in the first 2 graphs. *This does not apply to the third graph* if it exists, which always use QoS loss for ease of comparison. """ if not self.tuned: raise RuntimeError( f"No tuning session has been run; call self.tune() first." ) dot_format = "-o" if connect_best_points else "o" # Without `ax` argument, this function returns if we can # do the second/third plot or not. # plot_test_phase returns True implies plot_validation_phase returning True. val_phase = self.plot_validation_phase() test_phase = self.plot_test_phase() n_subplots = 1 + int(val_phase) + int(test_phase) fig, axes = plt.subplots( 1, n_subplots, squeeze=False, figsize=(6 + 4 * n_subplots, 6), dpi=300 ) i = 1 self.plot_kept_and_best(axes[0, 0], show_qos_loss) if val_phase: ax = axes[0, i] self.plot_validation_phase(ax, show_qos_loss, dot_format) i += 1 if test_phase: ax = axes[0, i] tuneset_key = "validated_qos" if val_phase else "qos" self.plot_test_phase(ax, dot_format, tuneset_key) i += 1 fig.tight_layout() return fig
def plot_validation_phase( self, ax: plt.Axes = None, show_qos_loss: bool = False, dot_format: str = "o" ): configs = self.best_configs_prefilter validated = [conf.validated_qos is not None for conf in configs] can_plot = all(validated) if not ax: return can_plot assert can_plot pred_x, pred_y = self._config_qos_speedups(configs, "qos", show_qos_loss, False) measured_x, measured_y = self._config_qos_speedups( configs, "validated_qos", show_qos_loss, False ) ax.plot(pred_x, pred_y, dot_format, label="Predicted QoS") ax.plot(measured_x, measured_y, dot_format, label="Validated QoS") self._set_xy_limit(ax, show_qos_loss) if show_qos_loss: ax.set_xlabel("QoS Loss (tune dataset)") rthres = self.baseline_tune_qos - self.tune_keep_threshold self._draw_qos_line(ax, rthres, f"Relative threshold: {rthres:.2f}") else: ax.set_xlabel("QoS (tune dataset)") ax.set_ylabel("Speedup (x)") ax.legend() @classmethod def _get_config_class(cls) -> Type[Config]: return ValConfig
def barred_ravel_knobs(app: ApproxApp) -> Iterator[Tuple[str, str]]: """Flattens op_knobs of app to a list of (layer, knob) pairs while showing 2 levels of progress bar.""" from tqdm import tqdm bar1 = tqdm(app.op_knobs.items(), leave=None) for op_name, knobs in bar1: bar1.set_postfix(op=op_name) bar2 = tqdm(knobs, leave=None) for knob in bar2: bar2.set_postfix(knob=knob.name) yield op_name, knob.name