Files
sem_cython12-demos/02_anomaly_detection.py

103 lines
3.5 KiB
Python

"""Demo 2 - Parameter-free anomaly detection.
Split a dataset into 'reference' (known-normal) and 'query' (a mix of
normal and anomalous), and score each query by its similarity to the
reference set. No labels touched on the query side, no thresholds
set by hand, no training step.
We compare against sklearn's IsolationForest (with default settings)
on the same data.
Run:
python 02_anomaly_detection.py
"""
from __future__ import annotations
import numpy as np
from sem_cython12 import wrapper as cy
def main() -> int:
if not cy.available():
print("ERROR: sem_cython12 compiled extension did not load.")
return 1
rng = np.random.default_rng(0)
N_NORMAL = 500
N_ANOMALY = 10
D = 5
# Generate data
normal = rng.standard_normal((N_NORMAL, D))
anomalies = rng.standard_normal((N_ANOMALY, D)) + 8.0
# Split: 80% of normals are 'reference' (known good), 20% are
# query. Queries also include all 10 anomalies.
perm = rng.permutation(N_NORMAL)
n_ref = int(0.8 * N_NORMAL)
ref_idx = perm[:n_ref]
query_normal_idx = perm[n_ref:]
reference = normal[ref_idx]
query_normal = normal[query_normal_idx]
queries = np.vstack([query_normal, anomalies])
y_query = np.concatenate([
np.zeros(len(query_normal_idx), dtype=int),
np.ones(N_ANOMALY, dtype=int),
])
# Auto-derive scale from the reference set's geometry
nn = cy.nn_distances(reference)
lam = float(np.median(nn[np.isfinite(nn)]))
# Score each query by similarity to the reference.
# Lower similarity = farther from anything known = anomaly.
sim = cy.batch_max_similarity(queries, reference, lam=lam)
scores_sem = -sim # higher score = more anomalous
top_k_sem = np.argsort(scores_sem)[::-1][:N_ANOMALY]
correct_sem = int(np.sum(y_query[top_k_sem] == 1))
print("=" * 60)
print("SEM (sem_cython12 - one batch_max_similarity call)")
print("=" * 60)
print(f" Top-{N_ANOMALY} retrieved as anomalous: precision = {correct_sem}/{N_ANOMALY}")
try:
from sklearn.metrics import roc_auc_score
auc_sem = roc_auc_score(y_query, scores_sem)
print(f" ROC AUC = {auc_sem:.4f}")
from sklearn.ensemble import IsolationForest
iso = IsolationForest(random_state=0, contamination='auto')
iso.fit(reference)
scores_iso = -iso.score_samples(queries)
top_k_iso = np.argsort(scores_iso)[::-1][:N_ANOMALY]
correct_iso = int(np.sum(y_query[top_k_iso] == 1))
auc_iso = roc_auc_score(y_query, scores_iso)
print()
print("=" * 60)
print("Baseline: sklearn IsolationForest (default settings)")
print("=" * 60)
print(f" Top-{N_ANOMALY} retrieved as anomalous: precision = {correct_iso}/{N_ANOMALY}")
print(f" ROC AUC = {auc_iso:.4f}")
print()
print("=" * 60)
if auc_sem >= auc_iso - 0.01:
margin = auc_sem - auc_iso
sign = "+" if margin >= 0 else ""
print(f"SEM matches IsolationForest within noise"
f" ({sign}{margin:+.4f} AUC),")
print("with one function call and zero tuning.")
else:
print(f"IsolationForest leads by {auc_iso - auc_sem:.4f} AUC; "
f"SEM is competitive without parameters.")
except ImportError:
print("\n(Install scikit-learn to see the IsolationForest comparison.)")
return 0
if __name__ == "__main__":
raise SystemExit(main())