Source code for orion.benchmark

# -*- coding: utf-8 -*-

import argparse
import ast
import concurrent
import json
import logging
import os
import pickle
import uuid
import warnings
from copy import deepcopy
from datetime import datetime
from functools import partial
from glob import glob
from pathlib import Path

import numpy as np
import pandas as pd
import tqdm
from scipy import signal as scipy_signal

from orion.analysis import _load_pipeline, analyze
from orion.data import load_anomalies, load_signal
from orion.evaluation import CONTEXTUAL_METRICS as METRICS
from orion.evaluation import contextual_confusion_matrix
from orion.progress import TqdmLogger, progress

warnings.simplefilter('ignore')

LOGGER = logging.getLogger(__name__)

BUCKET = 'sintel-orion'
S3_URL = 'https://{}.s3.amazonaws.com/{}'

BENCHMARK_PATH = os.path.join(os.path.join(
    os.path.dirname(os.path.abspath(__file__)), '..'),
    'benchmark'
)

BENCHMARK_DATA = pd.read_csv(S3_URL.format(
    BUCKET, 'datasets.csv'), index_col=0, header=None).applymap(ast.literal_eval).to_dict()[1]
BENCHMARK_PARAMS = pd.read_csv(S3_URL.format(
    BUCKET, 'parameters.csv'), index_col=0, header=None).applymap(ast.literal_eval).to_dict()[1]

PIPELINE_DIR = os.path.join(os.path.dirname(__file__), 'pipelines', 'verified')

VERIFIED_PIPELINES = [
    'arima', 'lstm_dynamic_threshold', 'azure', 'tadgan',
    'aer', 'lstm_autoencoder', 'dense_autoencoder', 'vae'
]

VERIFIED_PIPELINES_GPU = {
    'arima': 'arima',
    'lstm_dynamic_threshold': 'lstm_dynamic_threshold',
    'azure': 'azure',
    'tadgan': 'tadgan_without_dropout_gpu',
    'aer': 'aer',
    'lstm_autoencoder': 'lstm_autoencoder',
    'dense_autoencoder': 'dense_autoencoder',
    'vae': 'vae'
}


def _load_signal(signal, test_split):
    if isinstance(test_split, float):
        train, test = load_signal(signal, test_size=test_split)
    elif test_split:
        train = load_signal(signal + '-train')
        test = load_signal(signal + '-test')
    else:
        train = test = load_signal(signal)

    return train, test


def _detrend_signal(df, value_column):
    df[value_column] = scipy_signal.detrend(df[value_column])
    return df


def _get_pipeline_hyperparameter(hyperparameters, dataset_name, pipeline_name):
    hyperparameters_ = deepcopy(hyperparameters)

    if hyperparameters:
        hyperparameters_ = hyperparameters_.get(dataset_name) or hyperparameters_
        hyperparameters_ = hyperparameters_.get(pipeline_name) or hyperparameters_

    if hyperparameters_ is None and dataset_name and pipeline_name:
        file_path = os.path.join(
            PIPELINE_DIR, pipeline_name, pipeline_name + '_' + dataset_name.lower() + '.json')
        if os.path.exists(file_path):
            hyperparameters_ = file_path

    if isinstance(hyperparameters_, str) and os.path.exists(hyperparameters_):
        with open(hyperparameters_) as f:
            hyperparameters_ = json.load(f)

    return hyperparameters_


def _parse_confusion_matrix(scores, truth):
    columns = ["tn", "fp", "fn", "tp"]
    metric_ = 'confusion_matrix'

    values = scores[metric_]
    if values == 0:
        fn = len(truth)
        values = (None, 0, fn, 0)  # (tn, fp, fn, tp)

    # formating output
    for metric_name, score in zip(columns, list(values)):
        scores[metric_name] = score

    del scores[metric_]


def _sort_leaderboard(df, rank, metrics):
    if rank not in df.columns:
        rank_ = list(metrics.keys())[0]
        LOGGER.exception("Rank %s is not in %s, using %s instead.",
                         rank, df.columns, rank_)
        rank = rank_

    df.sort_values(rank, ascending=False, inplace=True)
    df.reset_index(drop=True, inplace=True)
    df.index.name = 'rank'
    df.reset_index(drop=False, inplace=True)
    df['rank'] += 1

    pipeline_col = df.pop('pipeline')
    df.insert(0, 'pipeline', pipeline_col)
    return df


def _evaluate_signal(pipeline, signal, hyperparameter, metrics, test_split=False,
                     detrend=False, pipeline_path=None, anomaly_path=None):

    train, test = _load_signal(signal, test_split)
    truth = load_anomalies(signal)

    if detrend:
        train = _detrend_signal(train, 'value')
        test = _detrend_signal(test, 'value')

    try:
        LOGGER.info("Scoring pipeline %s on signal %s (test split: %s)",
                    pipeline, signal, test_split)

        start = datetime.utcnow()
        pipeline = _load_pipeline(pipeline, hyperparameter)
        anomalies = analyze(pipeline, train, test)
        elapsed = datetime.utcnow() - start

        scores = {
            name: scorer(truth, anomalies, test)
            for name, scorer in metrics.items()
        }

        status = 'OK'

    except Exception as ex:
        LOGGER.exception("Exception scoring pipeline %s on signal %s (test split: %s), error %s.",
                         pipeline, signal, test_split, ex)

        elapsed = datetime.utcnow() - start
        anomalies = pd.DataFrame([], columns=['start', 'end', 'score'])
        scores = {
            name: 0 for name in metrics.keys()
        }

        status = 'ERROR'

    if 'confusion_matrix' in metrics.keys():
        _parse_confusion_matrix(scores, truth)

    scores['status'] = status
    scores['elapsed'] = elapsed.total_seconds()
    scores['split'] = test_split

    if pipeline_path:
        with open(pipeline_path, 'wb') as f:
            pickle.dump(pipeline, f)

    if anomaly_path:
        anomalies.to_csv(anomaly_path, index=False)

    return scores


def _run_job(args):
    # Reset random seed
    np.random.seed()

    (pipeline, pipeline_name, dataset, signal, hyperparameter, metrics, test_split, detrend,
        iteration, cache_dir, pipeline_dir, anomaly_dir, run_id) = args

    pipeline_path = pipeline_dir
    if pipeline_dir:
        base_path = str(pipeline_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}')
        pipeline_path = base_path + '_pipeline.pkl'

    anomaly_path = anomaly_dir
    if anomaly_dir:
        base_path = str(anomaly_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}')
        anomaly_path = base_path + '_anomalies.csv'

    LOGGER.info('Evaluating pipeline %s on signal %s dataset %s (test split: %s); iteration %s',
                pipeline_name, signal, dataset, test_split, iteration)

    output = _evaluate_signal(
        pipeline,
        signal,
        hyperparameter,
        metrics,
        test_split,
        detrend,
        pipeline_path,
        anomaly_path
    )
    scores = pd.DataFrame.from_records([output], columns=output.keys())

    scores.insert(0, 'dataset', dataset)
    scores.insert(1, 'pipeline', pipeline_name)
    scores.insert(2, 'signal', signal)
    scores.insert(3, 'iteration', iteration)
    scores['run_id'] = run_id

    if cache_dir:
        base_path = str(cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}_{run_id}')
        scores.to_csv(base_path + '_scores.csv', index=False)

    return scores


def _run_on_dask(jobs, verbose):
    """Run the tasks in parallel using dask."""
    try:
        import dask
    except ImportError as ie:
        ie.msg += (
            '\n\nIt seems like `dask` is not installed.\n'
            'Please install `dask` and `distributed` using:\n'
            '\n    pip install dask distributed'
        )
        raise

    scorer = dask.delayed(_run_job)
    persisted = dask.persist(*[scorer(args) for args in jobs])
    if verbose:
        try:
            progress(persisted)
        except ValueError:
            pass

    return dask.compute(*persisted)


[docs]def benchmark(pipelines=None, datasets=None, hyperparameters=None, metrics=METRICS, rank='f1', test_split=False, detrend=False, iterations=1, workers=1, show_progress=False, cache_dir=None, resume=False, output_path=None, pipeline_dir=None, anomaly_dir=None): """Run pipelines on the given datasets and evaluate the performance. The pipelines are used to analyze the given signals and later on the detected anomalies are scored against the known anomalies using the indicated metrics. Finally, the scores obtained with each metric are averaged accross all the signals, ranked by the indicated metric and returned on a ``pandas.DataFrame``. Args: pipelines (dict or list): dictionary with pipeline names as keys and their JSON paths as values. If a list is given, it should be of JSON paths, and the paths themselves will be used as names. If not give, all verified pipelines will be used for evaluation. datasets (dict or list): dictionary of dataset name as keys and list of signals as values. If a list is given then it will be under a generic name ``dataset``. If not given, all benchmark datasets will be used used. hyperparameters (dict or list): dictionary with pipeline names as keys and their hyperparameter JSON paths or dictionaries as values. If a list is given, it should be of corresponding order to pipelines. metrics (dict or list): dictionary with metric names as keys and scoring functions as values. If a list is given, it should be of scoring functions, and they ``__name__`` value will be used as the metric name. If not given, all the available metrics will be used. rank (str): Sort and rank the pipelines based on the given metric. If not given, rank using the first metric. test_split (bool or float): Whether to use the prespecified train-test split. If float, then it should be between 0.0 and 1.0 and represent the proportion of the signal to include in the test split. If not given, use ``False``. detrend (bool): Whether to use ``scipy.detrend``. If not given, use ``False``. iterations (int): Number of iterations to perform over each signal and pipeline. Defaults to 1. workers (int or str): If ``workers`` is given as an integer value other than 0 or 1, a multiprocessing Pool is used to distribute the computation across the indicated number of workers. If the string ``dask`` is given, the computation is distributed using ``dask``. In this case, setting up the ``dask`` cluster and client is expected to be handled outside of this function. show_progress (bool): Whether to use tqdm to keep track of the progress. Defaults to ``True``. cache_dir (str): If a ``cache_dir`` is given, intermediate results are stored in the indicated directory as CSV files as they get computted. This allows inspecting results while the benchmark is still running and also recovering results in case the process does not finish properly. Defaults to ``None``. resume (bool): Whether to continue running the experiments in the benchmark from the current progress in ``cache_dir``. output_path (str): Location to save the intermediatry results. If not given, intermediatry results will not be saved. pipeline_dir (str): If a ``pipeline_dir`` is given, pipelines will get dumped in the specificed directory as pickle files. Defaults to ``None``. anomaly_dir (str): If a ``anomaly_dir`` is given, detected anomalies will get dumped in the specificed directory as csv files. Defaults to ``None``. Returns: pandas.DataFrame: A table containing the scores obtained with each scoring function accross all the signals for each pipeline. """ pipelines = pipelines or VERIFIED_PIPELINES datasets = datasets or BENCHMARK_DATA run_id = os.getenv('RUN_ID') or str(uuid.uuid4())[:10] if isinstance(pipelines, list): pipelines = {pipeline: pipeline for pipeline in pipelines} if isinstance(datasets, list): datasets = {'dataset': datasets} if isinstance(hyperparameters, list): hyperparameters = {pipeline: hyperparameter for pipeline, hyperparameter in zip(pipelines.keys(), hyperparameters)} if isinstance(metrics, list): metrics_ = dict() for metric in metrics: if callable(metric): metrics_[metric.__name__] = metric elif metric in METRICS: metrics_[metric] = METRICS[metric] else: raise ValueError('Unknown metric: {}'.format(metric)) metrics = metrics_ if cache_dir: cache_dir = Path(cache_dir) os.makedirs(cache_dir, exist_ok=True) if pipeline_dir: pipeline_dir = Path(pipeline_dir) os.makedirs(pipeline_dir, exist_ok=True) if anomaly_dir: anomaly_dir = Path(anomaly_dir) os.makedirs(anomaly_dir, exist_ok=True) jobs = list() for dataset, signals in datasets.items(): for pipeline_name, pipeline in pipelines.items(): hyperparameter = _get_pipeline_hyperparameter(hyperparameters, dataset, pipeline_name) parameters = BENCHMARK_PARAMS.get(dataset) if parameters is not None: detrend, test_split = parameters.values() for signal in signals: for iteration in range(iterations): if resume: experiment = str( cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}' ) if len(glob(experiment + '*.csv')) > 0: LOGGER.warning(f'skipping {experiment}') continue args = ( pipeline, pipeline_name, dataset, signal, hyperparameter, metrics, test_split, detrend, iteration, cache_dir, pipeline_dir, anomaly_dir, run_id, ) jobs.append(args) if workers == 'dask': scores = _run_on_dask(jobs, show_progress) else: if workers in (0, 1): scores = map(_run_job, jobs) else: pool = concurrent.futures.ProcessPoolExecutor(workers) scores = pool.map(_run_job, jobs) scores = tqdm.tqdm(scores, total=len(jobs), file=TqdmLogger()) if show_progress: scores = tqdm.tqdm(scores, total=len(jobs)) if scores: scores = pd.concat(scores) if output_path: LOGGER.info('Saving benchmark report to %s', output_path) scores.to_csv(output_path, index=False) return _sort_leaderboard(scores, rank, metrics) LOGGER.info('No scores to be recorded.') return pd.DataFrame()
def main(pipelines, datasets, resume, workers, output_path, cache_dir, pipeline_dir, anomaly_dir, **kwargs): # output path output_path = os.path.join(BENCHMARK_PATH, 'results', output_path) # metrics del METRICS['accuracy'] METRICS['confusion_matrix'] = contextual_confusion_matrix metrics = {k: partial(fun, weighted=False) for k, fun in METRICS.items()} results = benchmark( pipelines=pipelines, datasets=datasets, metrics=metrics, output_path=output_path, workers=workers, resume=resume, pipeline_dir=pipeline_dir, cache_dir=cache_dir, anomaly_dir=anomaly_dir ) return results if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-p', '--pipelines', nargs='+', type=str, default=VERIFIED_PIPELINES_GPU) parser.add_argument('-d', '--datasets', nargs='+', type=str, default=BENCHMARK_DATA) parser.add_argument('-g', '--gpu', type=bool, default=False) parser.add_argument('-r', '--resume', type=bool, default=False) parser.add_argument('-w', '--workers', default=1) parser.add_argument('-o', '--output_path', type=str, default='results.csv') parser.add_argument('-c', '--cache_dir', type=str, default='cache') parser.add_argument('-pd', '--pipeline_dir', type=str, default='pipeline_dir') parser.add_argument('-ad', '--anomaly_dir', type=str, default='anomaly_dir') config = parser.parse_args() if config.gpu: config.pipelines = dict(zip(config.pipelines, config.pipelines)) if 'tadgan' in config.pipelines.keys(): config.pipelines['tadgan'] = 'tadgan_without_dropout_gpu' if any([dataset in BENCHMARK_DATA.keys() for dataset in config.datasets]): config.datasets = dict((dataset, BENCHMARK_DATA[dataset]) for dataset in config.datasets) results = main(**vars(config))