Coverage for src/onorm/winsorize.py: 100%
36 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
1import json
2from typing import Any, Dict, List, Tuple
4import numpy as np
5from fastdigest import TDigest
7from .normalization_base import Normalizer
10class Winsorizer(Normalizer):
11 r"""
12 Online winsorizer for robust outlier clipping using TDigest quantiles.
14 Clips extreme values to specified quantiles, replacing outliers with the
15 values at the quantile boundaries. Uses TDigest for efficient online
16 quantile estimation without storing all historical data.
18 For each feature $i$, the transformation is:
20 $$x_{\text{clip},i} = \begin{cases}
21 Q_{\text{lower},i} & \text{if } x_i < Q_{\text{lower},i} \\
22 x_i & \text{if } Q_{\text{lower},i} \leq x_i \leq Q_{\text{upper},i} \\
23 Q_{\text{upper},i} & \text{if } x_i > Q_{\text{upper},i}
24 \end{cases}$$
26 where $Q_{\text{lower},i}$ and $Q_{\text{upper},i}$ are the estimated quantiles.
28 Parameters
29 ----------
30 n_dim : int
31 Number of dimensions/features to normalize
32 clip_q : tuple of float, default=(0, 1)
33 Lower and upper quantiles for clipping, in range [0, 1].
34 For example, (0.1, 0.9) clips values below the 10th quantile
35 and above the 90th quantile.
36 max_centroids : int, default=1000
37 Maximum number of centroids for TDigest. Higher values increase precision
38 but use more memory.
40 Attributes
41 ----------
42 digests : List[TDigest]
43 List of TDigest objects for tracking quantiles per feature.
45 Examples
46 --------
47 ```{python}
48 from onorm import Winsorizer
49 import numpy as np
50 winsorizer = Winsorizer(n_dim=3, clip_q=(0.1, 0.9))
51 X = np.random.normal(size=(100, 3))
52 for x in X:
53 winsorizer.partial_fit(x)
54 x_new = np.array([10.0, 10.0, 10.0]) # Outlier
55 x_clipped = winsorizer.transform(x_new.copy()) # Clips to 90th quantile
56 ```
58 References
59 ----------
60 [Computing Extremely Accurate Quantiles Using t-Digests](https://arxiv.org/abs/1902.04023)
62 Notes
63 -----
64 - Winsorization is robust to outliers, unlike min-max scaling
65 - TDigest provides approximate quantiles with bounded memory
66 - Clipping is applied independently to each feature
67 """
69 def __init__(
70 self, n_dim: int, clip_q: Tuple[float, float] = (0, 1), max_centroids: int = 1000
71 ) -> None:
72 self.clip_q = clip_q
73 self.n_dim = n_dim
74 self.max_centroids = max_centroids
75 self.reset()
77 def partial_fit(self, x: np.ndarray) -> None:
78 """
79 Update quantile estimates for each feature.
81 Parameters
82 ----------
83 x : np.ndarray
84 A 1-D array of shape (n_dim,) representing a new observation.
85 """
86 for i, xi in enumerate(x):
87 self.digests[i].update(xi.item())
89 def transform(self, x: np.ndarray) -> np.ndarray:
90 """
91 Clip extreme values to learned quantile boundaries.
93 Parameters
94 ----------
95 x : np.ndarray
96 A 1-D array of shape (n_dim,) to clip.
98 Returns
99 -------
100 np.ndarray
101 Clipped array where values below the lower quantile are set to the
102 lower quantile value, and values above the upper quantile are set to
103 the upper quantile value.
104 """
105 for i in range(self.n_dim):
106 x[i] = np.clip(
107 x[i],
108 self.digests[i].quantile(self.clip_q[0]),
109 self.digests[i].quantile(self.clip_q[1]),
110 )
111 return x
113 def reset(self) -> None:
114 """
115 Reset the winsorizer to initial state.
117 Reinitializes TDigest objects for all features, clearing quantile estimates.
118 """
119 self.digests: List[TDigest] = [
120 TDigest(max_centroids=self.max_centroids) for _ in range(self.n_dim)
121 ]
123 def to_dict(self) -> Dict[str, Any]:
124 """
125 Serialize the winsorizer state to a dictionary.
127 Returns
128 -------
129 dict
130 Dictionary with JSON-serializable metadata and TDigest states.
132 Notes
133 -----
134 TDigest objects are serialized using their native to_dict() method,
135 which returns a JSON-serializable dictionary containing centroids,
136 min/max values, and max_centroids configuration.
137 """
138 return {
139 "version": "1.0",
140 "class": "Winsorizer",
141 "config": {
142 "n_dim": self.n_dim,
143 "clip_q": list(self.clip_q),
144 "max_centroids": self.max_centroids,
145 },
146 "state": {"digests": [digest.to_dict() for digest in self.digests]},
147 }
149 @classmethod
150 def from_dict(cls, data: Dict[str, Any]) -> "Winsorizer":
151 """
152 Deserialize a winsorizer from a dictionary.
154 Parameters
155 ----------
156 data : dict
157 Dictionary created by to_dict().
159 Returns
160 -------
161 Winsorizer
162 Deserialized winsorizer instance.
163 """
164 if data.get("class") != "Winsorizer":
165 raise ValueError(f"Cannot deserialize {data.get('class')} as Winsorizer")
167 config = data["config"]
168 instance = cls(
169 n_dim=config["n_dim"],
170 clip_q=tuple(config["clip_q"]),
171 max_centroids=config["max_centroids"],
172 )
174 state = data["state"]
175 instance.digests = [TDigest.from_dict(digest_dict) for digest_dict in state["digests"]]
177 return instance
179 def to_json(self) -> str:
180 """Serialize the winsorizer to a JSON string."""
181 return json.dumps(self.to_dict(), indent=2)
183 @classmethod
184 def from_json(cls, json_str: str) -> "Winsorizer":
185 """Deserialize a winsorizer from a JSON string."""
186 return cls.from_dict(json.loads(json_str))