103 lines
3.5 KiB
Python
103 lines
3.5 KiB
Python
"""Demo 2 - Parameter-free anomaly detection.
|
|
|
|
Split a dataset into 'reference' (known-normal) and 'query' (a mix of
|
|
normal and anomalous), and score each query by its similarity to the
|
|
reference set. No labels touched on the query side, no thresholds
|
|
set by hand, no training step.
|
|
|
|
We compare against sklearn's IsolationForest (with default settings)
|
|
on the same data.
|
|
|
|
Run:
|
|
python 02_anomaly_detection.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
from sem_cython12 import wrapper as cy
|
|
|
|
|
|
def main() -> int:
|
|
if not cy.available():
|
|
print("ERROR: sem_cython12 compiled extension did not load.")
|
|
return 1
|
|
|
|
rng = np.random.default_rng(0)
|
|
N_NORMAL = 500
|
|
N_ANOMALY = 10
|
|
D = 5
|
|
|
|
# Generate data
|
|
normal = rng.standard_normal((N_NORMAL, D))
|
|
anomalies = rng.standard_normal((N_ANOMALY, D)) + 8.0
|
|
|
|
# Split: 80% of normals are 'reference' (known good), 20% are
|
|
# query. Queries also include all 10 anomalies.
|
|
perm = rng.permutation(N_NORMAL)
|
|
n_ref = int(0.8 * N_NORMAL)
|
|
ref_idx = perm[:n_ref]
|
|
query_normal_idx = perm[n_ref:]
|
|
|
|
reference = normal[ref_idx]
|
|
query_normal = normal[query_normal_idx]
|
|
queries = np.vstack([query_normal, anomalies])
|
|
y_query = np.concatenate([
|
|
np.zeros(len(query_normal_idx), dtype=int),
|
|
np.ones(N_ANOMALY, dtype=int),
|
|
])
|
|
|
|
# Auto-derive scale from the reference set's geometry
|
|
nn = cy.nn_distances(reference)
|
|
lam = float(np.median(nn[np.isfinite(nn)]))
|
|
|
|
# Score each query by similarity to the reference.
|
|
# Lower similarity = farther from anything known = anomaly.
|
|
sim = cy.batch_max_similarity(queries, reference, lam=lam)
|
|
scores_sem = -sim # higher score = more anomalous
|
|
|
|
top_k_sem = np.argsort(scores_sem)[::-1][:N_ANOMALY]
|
|
correct_sem = int(np.sum(y_query[top_k_sem] == 1))
|
|
|
|
print("=" * 60)
|
|
print("SEM (sem_cython12 - one batch_max_similarity call)")
|
|
print("=" * 60)
|
|
print(f" Top-{N_ANOMALY} retrieved as anomalous: precision = {correct_sem}/{N_ANOMALY}")
|
|
|
|
try:
|
|
from sklearn.metrics import roc_auc_score
|
|
auc_sem = roc_auc_score(y_query, scores_sem)
|
|
print(f" ROC AUC = {auc_sem:.4f}")
|
|
|
|
from sklearn.ensemble import IsolationForest
|
|
iso = IsolationForest(random_state=0, contamination='auto')
|
|
iso.fit(reference)
|
|
scores_iso = -iso.score_samples(queries)
|
|
top_k_iso = np.argsort(scores_iso)[::-1][:N_ANOMALY]
|
|
correct_iso = int(np.sum(y_query[top_k_iso] == 1))
|
|
auc_iso = roc_auc_score(y_query, scores_iso)
|
|
print()
|
|
print("=" * 60)
|
|
print("Baseline: sklearn IsolationForest (default settings)")
|
|
print("=" * 60)
|
|
print(f" Top-{N_ANOMALY} retrieved as anomalous: precision = {correct_iso}/{N_ANOMALY}")
|
|
print(f" ROC AUC = {auc_iso:.4f}")
|
|
print()
|
|
print("=" * 60)
|
|
if auc_sem >= auc_iso - 0.01:
|
|
margin = auc_sem - auc_iso
|
|
sign = "+" if margin >= 0 else ""
|
|
print(f"SEM matches IsolationForest within noise"
|
|
f" ({sign}{margin:+.4f} AUC),")
|
|
print("with one function call and zero tuning.")
|
|
else:
|
|
print(f"IsolationForest leads by {auc_iso - auc_sem:.4f} AUC; "
|
|
f"SEM is competitive without parameters.")
|
|
except ImportError:
|
|
print("\n(Install scikit-learn to see the IsolationForest comparison.)")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|