Skip to content

INNE

ikpykit.anomaly.INNE

INNE(
    n_estimators=200,
    max_samples="auto",
    contamination="auto",
    random_state=None,
)

Bases: OutlierMixin, BaseEstimator

Isolation-based anomaly detection using nearest-neighbor ensembles.

The INNE algorithm uses the nearest neighbour ensemble to isolate anomalies. It partitions the data space into regions using a subsample and determines an isolation score for each region. As each region adapts to local distribution, the calculated isolation score is a local measure that is relative to the local neighbourhood, enabling it to detect both global and local anomalies. INNE has linear time complexity to efficiently handle large and high-dimensional datasets with complex distributions.

Parameters:

Name Type Description Default
n_estimators int

The number of base estimators in the ensemble.

200
max_samples int

The number of samples to draw from X to train each base estimator.

- If int, then draw `max_samples` samples.
- If float, then draw `max_samples` * X.shape[0]` samples.
- If "auto", then `max_samples=min(8, n_samples)`.
"auto"
contamination auto or float

The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples.

- If "auto", the threshold is determined as in the original paper.
- If float, the contamination should be in the range (0, 0.5].
"auto"
random_state int, RandomState instance or None

Controls the pseudo-randomness of the selection of the feature and split values for each branching step and each tree in the forest.

Pass an int for reproducible results across multiple function calls. See :term:Glossary <random_state>.

None
References

.. [1] T. R. Bandaragoda, K. Ming Ting, D. Albrecht, F. T. Liu, Y. Zhu, and J. R. Wells. "Isolation-based anomaly detection using nearest-neighbor ensembles." In Computational Intelligence, vol. 34, 2018, pp. 968-998.

Examples:

>>> from ikpykit.anomaly import INNE
>>> import numpy as np
>>> X = np.array([[-1.1, 0.2], [0.3, 0.5], [0.5, 1.1], [100, 90]])
>>> clf = INNE(contamination=0.25).fit(X)
>>> clf.predict([[0.1, 0.3], [0, 0.7], [90, 85]])
array([ 1,  1, -1])
Source code in ikpykit/anomaly/_inne.py
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    n_estimators=200,
    max_samples="auto",
    contamination="auto",
    random_state=None,
):
    self.n_estimators = n_estimators
    self.max_samples = max_samples
    self.random_state = random_state
    self.contamination = contamination

fit

fit(X, y=None)

Fit estimator.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The input samples. Use dtype=np.float32 for maximum efficiency.

required
y Ignored

Not used, present for API consistency by convention.

None

Returns:

Name Type Description
self object

Fitted estimator.

Source code in ikpykit/anomaly/_inne.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def fit(self, X, y=None):
    """
    Fit estimator.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The input samples. Use ``dtype=np.float32`` for maximum
        efficiency.

    y : Ignored
        Not used, present for API consistency by convention.

    Returns
    -------
    self : object
        Fitted estimator.
    """

    # Check data
    X = check_array(X, accept_sparse=False)

    n_samples = X.shape[0]
    if isinstance(self.max_samples, str):
        if self.max_samples == "auto":
            max_samples = min(16, n_samples)
        else:
            raise ValueError(
                "max_samples (%s) is not supported."
                'Valid choices are: "auto", int or'
                "float" % self.max_samples
            )

    elif isinstance(self.max_samples, numbers.Integral):
        if self.max_samples > n_samples:
            warn(
                "max_samples (%s) is greater than the "
                "total number of samples (%s). max_samples "
                "will be set to n_samples for estimation."
                % (self.max_samples, n_samples)
            )
            max_samples = n_samples
        else:
            max_samples = self.max_samples
    else:  # float
        if not 0.0 < self.max_samples <= 1.0:
            raise ValueError(
                "max_samples must be in (0, 1], got %r" % self.max_samples
            )
        max_samples = int(self.max_samples * X.shape[0])

    self.max_samples_ = max_samples
    self._fit(X)
    self.is_fitted_ = True

    if self.contamination != "auto":
        if not (0.0 < self.contamination <= 0.5):
            raise ValueError(
                "contamination must be in (0, 0.5], got: %f" % self.contamination
            )

    if self.contamination == "auto":
        # 0.5 plays a special role as described in the original paper.
        # we take the opposite as we consider the opposite of their score.
        self.offset_ = -0.5
    else:
        # else, define offset_ wrt contamination parameter
        self.offset_ = np.percentile(
            self.score_samples(X), 100.0 * self.contamination
        )

    return self

predict

predict(X)

Predict if a particular sample is an outlier or not.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The input samples. Internally, it will be converted to dtype=np.float32 and if a sparse matrix is provided to a sparse csr_matrix.

required

Returns:

Name Type Description
is_inlier ndarray of shape (n_samples,)

For each observation, tells whether or not (+1 or -1) it should be considered as an inlier according to the fitted model.

Source code in ikpykit/anomaly/_inne.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def predict(self, X):
    """
    Predict if a particular sample is an outlier or not.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The input samples. Internally, it will be converted to
        ``dtype=np.float32`` and if a sparse matrix is provided
        to a sparse ``csr_matrix``.

    Returns
    -------
    is_inlier : ndarray of shape (n_samples,)
        For each observation, tells whether or not (+1 or -1) it should
        be considered as an inlier according to the fitted model.
    """

    check_is_fitted(self)
    decision_func = self.decision_function(X)
    is_inlier = np.ones_like(decision_func, dtype=int)
    is_inlier[decision_func < 0] = -1
    return is_inlier

decision_function

decision_function(X)

Average anomaly score of X of the base classifiers.

The anomaly score of an input sample is computed as the mean anomaly score of the .

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The input samples. Internally, it will be converted to dtype=np.float32.

required

Returns:

Name Type Description
scores ndarray of shape (n_samples,)

The anomaly score of the input samples. The lower, the more abnormal. Negative scores represent outliers, positive scores represent inliers.

Source code in ikpykit/anomaly/_inne.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def decision_function(self, X):
    """
    Average anomaly score of X of the base classifiers.

    The anomaly score of an input sample is computed as
    the mean anomaly score of the .

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The input samples. Internally, it will be converted to
        ``dtype=np.float32``.

    Returns
    -------
    scores : ndarray of shape (n_samples,)
        The anomaly score of the input samples.
        The lower, the more abnormal. Negative scores represent outliers,
        positive scores represent inliers.
    """
    # We subtract self.offset_ to make 0 be the threshold value for being
    # an outlier.

    return self.score_samples(X) - self.offset_

score_samples

score_samples(X)

Opposite of the anomaly score defined in the original paper. The anomaly score of an input sample is computed as the mean anomaly score of the trees in the forest.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The input samples.

required

Returns:

Name Type Description
scores ndarray of shape (n_samples,)

The anomaly score of the input samples. The lower, the more abnormal.

Source code in ikpykit/anomaly/_inne.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def score_samples(self, X):
    """
    Opposite of the anomaly score defined in the original paper.
    The anomaly score of an input sample is computed as
    the mean anomaly score of the trees in the forest.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The input samples.

    Returns
    -------
    scores : ndarray of shape (n_samples,)
        The anomaly score of the input samples.
        The lower, the more abnormal.
    """

    check_is_fitted(self, "is_fitted_")
    # Check data
    X = check_array(X, accept_sparse=False)

    isolation_scores = np.ones([self.n_estimators, X.shape[0]])
    # each test instance is evaluated against n_estimators sets of hyperspheres
    for i in range(self.n_estimators):
        x_dists = euclidean_distances(X, self._centroids[i], squared=True)
        # find instances that are covered by at least one hypersphere.
        cover_radius = np.where(
            x_dists <= self._centroids_radius[i], self._centroids_radius[i], np.nan
        )
        x_covered = np.where(~np.isnan(cover_radius).all(axis=1))
        # the centroid of the hypersphere covering x and having the smallest radius
        cnn_x = np.nanargmin(cover_radius[x_covered], axis=1)
        isolation_scores[i][x_covered] = self._ratio[i][cnn_x]
    # the isolation scores are averaged to produce the anomaly score
    scores = np.mean(isolation_scores, axis=0)
    return -scores