Skip to content

IDKD

ikpykit.anomaly.IDKD

IDKD(
    n_estimators=200,
    max_samples="auto",
    contamination="auto",
    method="inne",
    random_state=None,
)

Bases: OutlierMixin, BaseEstimator

Isolation Distributional Kernel for anomaly detection.

IDKD measures the similarity between distributions to identify anomalies. An observation is considered anomalous when its Dirac measure has a low similarity with respect to the reference distribution from which the dataset was generated.

This implementation follows the algorithm described in [1]_.

Parameters:

Name Type Description Default
n_estimators int

Number of base estimators in the ensemble.

200
max_samples (auto, int, float)

Number of samples to draw from X to train each base estimator.

  • If "auto", then max_samples=min(8, n_samples).
  • If int, then draw max_samples samples.
  • If float, then draw max_samples * X.shape[0] samples.
"auto"
method (inne, anne, auto)

Isolation method to use. The original algorithm described in [1]_ uses "inne".

"inne"
contamination (auto, float)

The proportion of outliers in the data set.

  • If "auto", the threshold is determined as in [1]_.
  • If float, the contamination should be in the range (0, 0.5].

Used to define the threshold on the decision function.

"auto"
random_state int, RandomState instance or None

Controls the randomness of the estimator. Pass an int for reproducible results across multiple function calls.

None

Attributes:

Name Type Description
offset_ float

Offset used to define the decision function from the raw scores.

max_samples_ int

Actual number of samples used.

iso_kernel_ IsoKernel

The fitted isolation kernel.

References

.. [1] Kai Ming Ting, Bi-Cun Xu, Washio Takashi, Zhi-Hua Zhou (2022). "Isolation Distributional Kernel: A new tool for kernel based point and group anomaly detections." IEEE Transactions on Knowledge and Data Engineering.

Examples:

>>> from ikpykit.anomaly import IDKD
>>> import numpy as np
>>> X = np.array([[-1.1, 0.2], [0.3, 0.5], [0.5, 1.1], [100, 90]])
>>> clf = IDKD(max_samples=2, contamination=0.25).fit(X)
>>> clf.predict([[0.1, 0.3], [0, 0.7], [90, 85]])
array([ 1,  1, -1])
Source code in ikpykit/anomaly/_idkd.py
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    n_estimators=200,
    max_samples="auto",
    contamination="auto",
    method="inne",
    random_state=None,
):
    self.n_estimators = n_estimators
    self.max_samples = max_samples
    self.random_state = random_state
    self.contamination = contamination
    self.method = method

fit

fit(X, y=None)

Fit the IDKD model.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Training data. Use dtype=np.float32 for maximum efficiency.

required
y Ignored

Not used, present for API consistency.

None

Returns:

Name Type Description
self object

Fitted estimator.

Source code in ikpykit/anomaly/_idkd.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def fit(self, X, y=None):
    """Fit the IDKD model.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Training data. Use ``dtype=np.float32`` for maximum efficiency.

    y : Ignored
        Not used, present for API consistency.

    Returns
    -------
    self : object
        Fitted estimator.
    """

    # Check data
    X = check_array(X, accept_sparse=False)

    n_samples = X.shape[0]
    if isinstance(self.max_samples, str):
        if self.max_samples == "auto":
            max_samples = min(16, n_samples)
        else:
            raise ValueError(
                "max_samples (%s) is not supported."
                'Valid choices are: "auto", int or'
                "float" % self.max_samples
            )

    elif isinstance(self.max_samples, numbers.Integral):
        if self.max_samples > n_samples:
            warn(
                "max_samples (%s) is greater than the "
                "total number of samples (%s). max_samples "
                "will be set to n_samples for estimation."
                % (self.max_samples, n_samples)
            )
            max_samples = n_samples
        else:
            max_samples = self.max_samples
    else:  # float
        if not 0.0 < self.max_samples <= 1.0:
            raise ValueError(
                "max_samples must be in (0, 1], got %r" % self.max_samples
            )
        max_samples = int(self.max_samples * X.shape[0])

    self.max_samples_ = max_samples
    self._fit(X)
    self.is_fitted_ = True

    if self.contamination != "auto":
        if not (0.0 < self.contamination <= 0.5):
            raise ValueError(
                "contamination must be in (0, 0.5], got: %f" % self.contamination
            )

    if self.contamination == "auto":
        # 0.5 plays a special role as described in the original paper.
        # we take the opposite as we consider the opposite of their score.
        self.offset_ = -0.5
    else:
        # else, define offset_ wrt contamination parameter
        self.offset_ = np.percentile(
            self.score_samples(X), 100.0 * self.contamination
        )

    return self

predict

predict(X)

Predict if samples are outliers or not.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The query samples.

required

Returns:

Name Type Description
is_inlier ndarray of shape (n_samples,)

Returns +1 for inliers and -1 for outliers.

Source code in ikpykit/anomaly/_idkd.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def predict(self, X):
    """Predict if samples are outliers or not.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The query samples.

    Returns
    -------
    is_inlier : ndarray of shape (n_samples,)
        Returns +1 for inliers and -1 for outliers.
    """
    check_is_fitted(self)
    decision_func = self.decision_function(X)
    is_inlier = np.ones_like(decision_func, dtype=int)
    is_inlier[decision_func < 0] = -1
    return is_inlier

decision_function

decision_function(X)

Compute the decision function for each sample.

The decision function is defined as score_samples(X) - offset_. Negative values are considered outliers and positive values are considered inliers.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The query samples.

required

Returns:

Name Type Description
scores ndarray of shape (n_samples,)

Decision function values for each sample. Negative values represent outliers, positive values represent inliers.

Source code in ikpykit/anomaly/_idkd.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def decision_function(self, X):
    """Compute the decision function for each sample.

    The decision function is defined as score_samples(X) - offset_.
    Negative values are considered outliers and positive values are considered inliers.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The query samples.

    Returns
    -------
    scores : ndarray of shape (n_samples,)
        Decision function values for each sample.
        Negative values represent outliers, positive values represent inliers.
    """
    # We subtract self.offset_ to make 0 be the threshold value for being
    # an outlier.
    return self.score_samples(X) - self.offset_

score_samples

score_samples(X)

Compute the anomaly scores for each sample.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

The query samples.

required

Returns:

Name Type Description
scores ndarray of shape (n_samples,)

The anomaly score of each input sample. The lower the score, the more anomalous the sample.

Source code in ikpykit/anomaly/_idkd.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def score_samples(self, X):
    """Compute the anomaly scores for each sample.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        The query samples.

    Returns
    -------
    scores : ndarray of shape (n_samples,)
        The anomaly score of each input sample.
        The lower the score, the more anomalous the sample.
    """
    check_is_fitted(self, "is_fitted_")
    # Check data
    X = check_array(X, accept_sparse=False)

    X_trans = self.iso_kernel_.transform(X)
    kme = np.average(X_trans.toarray(), axis=0) / self.max_samples_
    scores = safe_sparse_dot(X_trans, kme.T).flatten()

    return scores