Source code for clustering_metrics.skutils

"""
This module contains code pasted verbatim from Scikit-Learn
to avoid depending on scikit-learn
"""

import numpy as np
import scipy.sparse as sp
import warnings
from .fixes import array_equal


[docs]class UndefinedMetricWarning(UserWarning): """Warning used when the metric is invalid"""
[docs]class DataConversionWarning(UserWarning): """A warning on implicit data conversions happening in the code""" pass
warnings.simplefilter("always", DataConversionWarning)
[docs]def stable_cumsum(arr, rtol=1e-05, atol=1e-08): """Use high precision for cumsum and check that final value matches sum Parameters ---------- arr : array-like To be cumulatively summed as flat rtol : float Relative tolerance, see ``np.allclose`` atol : float Absolute tolerance, see ``np.allclose`` """ out = np.cumsum(arr, dtype=np.float64) expected = np.sum(arr, dtype=np.float64) if not np.allclose(out[-1], expected, rtol=rtol, atol=atol): raise RuntimeError('cumsum was found to be unstable: ' 'its last element does not correspond to sum') return out
def _assert_all_finite(X): """Like assert_all_finite, but only for ndarray.""" X = np.asanyarray(X) # First try an O(n) time, O(1) space solution for the common case that # everything is finite; fall back to O(n) space np.isfinite to prevent # false positives from overflow in sum method. if (X.dtype.char in np.typecodes['AllFloat'] and not np.isfinite(X.sum()) and not np.isfinite(X).all()): raise ValueError("Input contains NaN, infinity or a value too large for %r." % X.dtype)
[docs]def assert_all_finite(X): """Throw a ValueError if X contains NaN or infinity. Input MUST be an np.ndarray instance or a scipy.sparse matrix.""" _assert_all_finite(X.data if sp.issparse(X) else X)
def _num_samples(x): """Return number of samples in array-like x.""" if hasattr(x, 'fit'): # Don't get num_samples from an ensembles length! raise TypeError('Expected sequence or array-like, got ' 'estimator %s' % x) if not hasattr(x, '__len__') and not hasattr(x, 'shape'): if hasattr(x, '__array__'): x = np.asarray(x) else: raise TypeError("Expected sequence or array-like, got %s" % type(x)) if hasattr(x, 'shape'): if len(x.shape) == 0: raise TypeError("Singleton array %r cannot be considered" " a valid collection." % x) return x.shape[0] else: return len(x)
[docs]def check_consistent_length(*arrays): """Check that all arrays have consistent first dimensions. Checks whether all objects in arrays have the same shape or length. Parameters ---------- *arrays : list or tuple of input objects. Objects that will be checked for consistent length. """ uniques = np.unique([_num_samples(X) for X in arrays if X is not None]) if len(uniques) > 1: raise ValueError("Found arrays with inconsistent numbers of samples: " "%s" % str(uniques))
[docs]def column_or_1d(y, warn=False): """ Ravel column or 1d numpy array, else raises an error Parameters ---------- y : array-like warn : boolean, default False To control display of warnings. Returns ------- y : array """ shape = np.shape(y) if len(shape) == 1: return np.ravel(y) if len(shape) == 2 and shape[1] == 1: if warn: warnings.warn("A column-vector y was passed when a 1d array was" " expected. Please change the shape of y to " "(n_samples, ), for example using ravel().", DataConversionWarning, stacklevel=2) return np.ravel(y) raise ValueError("bad input shape {0}".format(shape))
[docs]def auc(x, y, reorder=False): """Compute Area Under the Curve (AUC) using the trapezoidal rule This is a general function, given points on a curve. For computing the area under the ROC-curve, see :func:`roc_auc_score`. Parameters ---------- x : array, shape = [n] x coordinates. y : array, shape = [n] y coordinates. reorder : boolean, optional (default=False) If True, assume that the curve is ascending in the case of ties, as for an ROC curve. If the curve is non-ascending, the result will be wrong. Returns ------- auc : float Examples -------- >>> import numpy as np >>> from sklearn import metrics >>> y = np.array([1, 1, 2, 2]) >>> pred = np.array([0.1, 0.4, 0.35, 0.8]) >>> fpr, tpr, thresholds = metrics.roc_curve(y, pred, pos_label=2) >>> metrics.auc(fpr, tpr) 0.75 See also -------- roc_auc_score : Computes the area under the ROC curve precision_recall_curve : Compute precision-recall pairs for different probability thresholds """ check_consistent_length(x, y) x = column_or_1d(x) y = column_or_1d(y) if x.shape[0] < 2: raise ValueError('At least 2 points are needed to compute' ' area under curve, but x.shape = %s' % x.shape) direction = 1 if reorder: # reorder the data points according to the x axis and using y to # break ties order = np.lexsort((y, x)) x, y = x[order], y[order] else: dx = np.diff(x) if np.any(dx < 0): if np.all(dx <= 0): direction = -1 else: raise ValueError("Reordering is not turned on, and " "the x array is not increasing: %s" % x) area = direction * np.trapz(y, x) if isinstance(area, np.memmap): # Reductions such as .sum used internally in np.trapz do not return a # scalar by default for numpy.memmap instances contrary to # regular numpy.ndarray instances. area = area.dtype.type(area) return area
def _binary_clf_curve(y_true, y_score, pos_label=None, sample_weight=None): """Calculate true and false positives per binary classification threshold. Parameters ---------- y_true : array, shape = [n_samples] True targets of binary classification y_score : array, shape = [n_samples] Estimated probabilities or decision function pos_label : int, optional (default=None) The label of the positive class sample_weight : array-like of shape = [n_samples], optional Sample weights. Returns ------- fps : array, shape = [n_thresholds] A count of false positives, at index i being the number of negative samples assigned a score >= thresholds[i]. The total number of negative samples is equal to fps[-1] (thus true negatives are given by fps[-1] - fps). tps : array, shape = [n_thresholds <= len(np.unique(y_score))] An increasing count of true positives, at index i being the number of positive samples assigned a score >= thresholds[i]. The total number of positive samples is equal to tps[-1] (thus false negatives are given by tps[-1] - tps). thresholds : array, shape = [n_thresholds] Decreasing score values. """ check_consistent_length(y_true, y_score) y_true = column_or_1d(y_true) y_score = column_or_1d(y_score) assert_all_finite(y_true) assert_all_finite(y_score) if sample_weight is not None: sample_weight = column_or_1d(sample_weight) # ensure binary classification if pos_label is not specified classes = np.unique(y_true) if (pos_label is None and not (array_equal(classes, [0, 1]) or array_equal(classes, [-1, 1]) or array_equal(classes, [0]) or array_equal(classes, [-1]) or array_equal(classes, [1]))): raise ValueError("Data is not binary and pos_label is not specified") elif pos_label is None: pos_label = 1. # make y_true a boolean vector y_true = (y_true == pos_label) # sort scores and corresponding truth values desc_score_indices = np.argsort(y_score, kind="mergesort")[::-1] y_score = y_score[desc_score_indices] y_true = y_true[desc_score_indices] if sample_weight is not None: weight = sample_weight[desc_score_indices] else: weight = 1. # y_score typically has many tied values. Here we extract # the indices associated with the distinct values. We also # concatenate a value for the end of the curve. distinct_value_indices = np.where(np.diff(y_score))[0] threshold_idxs = np.r_[distinct_value_indices, y_true.size - 1] # accumulate the true positives with decreasing threshold tps = stable_cumsum(y_true * weight)[threshold_idxs] if sample_weight is not None: fps = stable_cumsum(weight)[threshold_idxs] - tps else: fps = 1 + threshold_idxs - tps return fps, tps, y_score[threshold_idxs]
[docs]def roc_curve(y_true, y_score, pos_label=None, sample_weight=None, drop_intermediate=True): """Compute Receiver operating characteristic (ROC) Note: this implementation is restricted to the binary classification task. Read more in the :ref:`User Guide <roc_metrics>`. Parameters ---------- y_true : array, shape = [n_samples] True binary labels in range {0, 1} or {-1, 1}. If labels are not binary, pos_label should be explicitly given. y_score : array, shape = [n_samples] Target scores, can either be probability estimates of the positive class, confidence values, or non-thresholded measure of decisions (as returned by "decision_function" on some classifiers). pos_label : int Label considered as positive and others are considered negative. sample_weight : array-like of shape = [n_samples], optional Sample weights. drop_intermediate : boolean, optional (default=True) Whether to drop some suboptimal thresholds which would not appear on a plotted ROC curve. This is useful in order to create lighter ROC curves. .. versionadded:: 0.17 parameter *drop_intermediate*. Returns ------- fpr : array, shape = [>2] Increasing false positive rates such that element i is the false positive rate of predictions with score >= thresholds[i]. tpr : array, shape = [>2] Increasing true positive rates such that element i is the true positive rate of predictions with score >= thresholds[i]. thresholds : array, shape = [n_thresholds] Decreasing thresholds on the decision function used to compute fpr and tpr. `thresholds[0]` represents no instances being predicted and is arbitrarily set to `max(y_score) + 1`. See also -------- roc_auc_score : Compute Area Under the Curve (AUC) from prediction scores Notes ----- Since the thresholds are sorted from low to high values, they are reversed upon returning them to ensure they correspond to both ``fpr`` and ``tpr``, which are sorted in reversed order during their calculation. References ---------- .. [1] `Wikipedia entry for the Receiver operating characteristic <https://en.wikipedia.org/wiki/Receiver_operating_characteristic>`_ Examples -------- >>> import numpy as np >>> from sklearn import metrics >>> y = np.array([1, 1, 2, 2]) >>> scores = np.array([0.1, 0.4, 0.35, 0.8]) >>> fpr, tpr, thresholds = metrics.roc_curve(y, scores, pos_label=2) >>> fpr array([ 0. , 0.5, 0.5, 1. ]) >>> tpr array([ 0.5, 0.5, 1. , 1. ]) >>> thresholds array([ 0.8 , 0.4 , 0.35, 0.1 ]) """ fps, tps, thresholds = _binary_clf_curve( y_true, y_score, pos_label=pos_label, sample_weight=sample_weight) # Attempt to drop thresholds corresponding to points in between and # collinear with other points. These are always suboptimal and do not # appear on a plotted ROC curve (and thus do not affect the AUC). # Here np.diff(_, 2) is used as a "second derivative" to tell if there # is a corner at the point. Both fps and tps must be tested to handle # thresholds with multiple data points (which are combined in # _binary_clf_curve). This keeps all cases where the point should be kept, # but does not drop more complicated cases like fps = [1, 3, 7], # tps = [1, 2, 4]; there is no harm in keeping too many thresholds. if drop_intermediate and len(fps) > 2: optimal_idxs = np.where(np.r_[True, np.logical_or(np.diff(fps, 2), np.diff(tps, 2)), True])[0] fps = fps[optimal_idxs] tps = tps[optimal_idxs] thresholds = thresholds[optimal_idxs] if tps.size == 0 or fps[0] != 0: # Add an extra threshold position if necessary tps = np.r_[0, tps] fps = np.r_[0, fps] thresholds = np.r_[thresholds[0] + 1, thresholds] if fps[-1] <= 0: warnings.warn("No negative samples in y_true, " "false positive value should be meaningless", UndefinedMetricWarning) fpr = np.repeat(np.nan, fps.shape) else: fpr = fps / fps[-1] if tps[-1] <= 0: warnings.warn("No positive samples in y_true, " "true positive value should be meaningless", UndefinedMetricWarning) tpr = np.repeat(np.nan, tps.shape) else: tpr = tps / tps[-1] return fpr, tpr, thresholds