Coverage for src/onorm/mvnorm.py: 100%
57 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
1import base64
2import json
3from typing import Any, Dict
5import numpy as np
6from scipy.linalg import cholesky, inv
8from .normalization_base import Normalizer
11class MultivariateNormalizer(Normalizer):
12 r"""
13 Online multivariate normalizer with decorrelation via covariance estimation.
15 Transforms multivariate data to have zero mean, unit variance, and zero
16 correlation (decorrelation). Uses online estimation of the covariance matrix
17 and computes the inverse square root for transformation.
19 The transformation is:
21 $$\Sigma^{-1/2} (x - \mu)$$
23 where $\mu$ is the estimated mean and $\Sigma$ is the estimated covariance matrix.
25 Parameters
26 ----------
27 n_dim : int
28 Number of dimensions/features in the data.
30 Attributes
31 ----------
32 n : int
33 Number of observations seen so far.
34 muhat : np.ndarray
35 Estimated mean vector, shape (n_dim,).
36 Sigmahat : np.ndarray
37 Estimated covariance matrix (computed from _M), shape (n_dim, n_dim).
38 invsqrtSigmahat : np.ndarray
39 Inverse square root of covariance matrix ($\Sigma^{-1/2}$), shape (n_dim, n_dim).
41 Examples
42 --------
43 ```{python}
44 from onorm import MultivariateNormalizer
45 import numpy as np
46 normalizer = MultivariateNormalizer(n_dim=3)
47 # Generate correlated data
48 cov = np.array([[1, 0.5, 0.3], [0.5, 1, 0.4], [0.3, 0.4, 1]])
49 X = np.random.multivariate_normal([0, 0, 0], cov, size=100)
50 for x in X:
51 normalizer.partial_fit(x)
52 x_new = np.array([1.0, 1.0, 1.0])
53 x_normalized = normalizer.transform(x_new.copy())
54 # x_normalized will be decorrelated with zero mean and unit variance
55 ```
57 Notes
58 -----
59 - Time complexity: O(d²) per observation for fitting, O(d²) for transformation
60 - Space complexity: O(d²) for storing covariance matrix
61 - The covariance matrix is computed using Welford's online algorithm
62 - Transformation uses Cholesky decomposition of the inverse covariance
64 References
65 ----------
66 [Welford's online algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)
67 """
69 def __init__(self, n_dim: int) -> None:
70 self.n_dim = n_dim
71 self.reset()
73 def partial_fit(self, x: np.ndarray) -> None:
74 """
75 Update mean and covariance estimates with a new observation.
77 Uses Welford's online algorithm to incrementally update the mean
78 and covariance matrix.
80 Parameters
81 ----------
82 x : np.ndarray
83 A 1-D array of shape (n_dim,) representing a new observation.
85 See Also
86 --------
87 Normalizer.partial_fit : Base class method for incremental fitting.
89 References
90 ----------
91 [Welford's online algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)
92 """
93 delta = x - self.muhat
94 self.n += 1
95 self._update_muhat(delta)
96 self._update_Sigmahat(delta)
98 def _update_muhat(self, delta: np.ndarray) -> None:
99 """Update running mean estimate using Welford's algorithm."""
100 self.muhat += delta / self.n
102 def _update_Sigmahat(self, delta: np.ndarray) -> None:
103 """
104 Update covariance matrix using Welford's algorithm.
106 Accumulates the sum of outer products in _M, which is used to
107 compute the covariance matrix.
108 """
109 delta = delta.reshape(-1, 1)
110 frac = (self.n - 1) / self.n
111 self._M += frac * delta @ delta.T
113 @property
114 def Sigmahat(self) -> np.ndarray:
115 """
116 Compute the sample covariance matrix.
118 Returns
119 -------
120 np.ndarray
121 Sample covariance matrix, shape (n_dim, n_dim). Returns identity
122 matrix if n <= 1.
123 """
124 if self.n <= 1:
125 return np.eye(self.n_dim, dtype=np.float64)
126 return self._M / (self.n - 1)
128 @property
129 def invsqrtSigmahat(self) -> np.ndarray:
130 r"""
131 Compute the inverse square root of the covariance matrix.
133 Returns:
135 $$\Sigma^{-1/2}$$
137 Computed via Cholesky decomposition of the inverse covariance matrix.
139 Returns
140 -------
141 np.ndarray
142 Inverse square root of covariance matrix, shape (n_dim, n_dim).
143 Returns identity matrix if covariance is not positive definite.
144 """
145 try:
146 sqrtinvSigma = cholesky(inv(self.Sigmahat, check_finite=False), check_finite=False)
147 except np.linalg.LinAlgError:
148 sqrtinvSigma = np.eye(self.n_dim, dtype=np.float64)
149 return sqrtinvSigma
151 def transform(self, x: np.ndarray) -> np.ndarray:
152 """
153 Apply multivariate normalization (decorrelation and standardization).
155 Parameters
156 ----------
157 x : np.ndarray
158 A 1-D array of shape (n_dim,) to normalize.
160 Returns
161 -------
162 np.ndarray
163 Decorrelated and standardized array of shape (n_dim,).
164 """
165 return (self.invsqrtSigmahat @ (x - self.muhat)).reshape(-1)
167 def to_dict(self) -> Dict[str, Any]:
168 """
169 Serialize the normalizer state to a dictionary.
171 Returns
172 -------
173 dict
174 Dictionary with JSON-serializable metadata and base64-encoded arrays.
175 """
176 return {
177 "version": "1.0",
178 "class": "MultivariateNormalizer",
179 "config": {"n_dim": self.n_dim},
180 "state": {
181 "n": self.n,
182 "muhat": base64.b64encode(self.muhat.tobytes()).decode("ascii"),
183 "_M": base64.b64encode(self._M.tobytes()).decode("ascii"),
184 },
185 }
187 @classmethod
188 def from_dict(cls, data: Dict[str, Any]) -> "MultivariateNormalizer":
189 """
190 Deserialize a normalizer from a dictionary.
192 Parameters
193 ----------
194 data : dict
195 Dictionary created by to_dict().
197 Returns
198 -------
199 MultivariateNormalizer
200 Deserialized normalizer instance.
201 """
202 if data.get("class") != "MultivariateNormalizer":
203 raise ValueError(f"Cannot deserialize {data.get('class')} as MultivariateNormalizer")
205 config = data["config"]
206 instance = cls(n_dim=config["n_dim"])
208 state = data["state"]
209 instance.n = state["n"]
210 instance.muhat = np.frombuffer(base64.b64decode(state["muhat"]), dtype=np.float64)
211 instance._M = np.frombuffer(base64.b64decode(state["_M"]), dtype=np.float64).reshape(
212 (config["n_dim"], config["n_dim"])
213 )
215 return instance
217 def to_json(self) -> str:
218 """Serialize the normalizer to a JSON string."""
219 return json.dumps(self.to_dict(), indent=2)
221 @classmethod
222 def from_json(cls, json_str: str) -> "MultivariateNormalizer":
223 """Deserialize a normalizer from a JSON string."""
224 return cls.from_dict(json.loads(json_str))
226 def reset(self) -> None:
227 """
228 Reset the normalizer to initial state.
230 Reinitializes observation count, mean, and covariance accumulator (_M)
231 to zeros.
232 """
233 self.n = 0
234 self.muhat = np.zeros(self.n_dim, dtype=np.float64)
235 self._M = np.zeros((self.n_dim, self.n_dim), dtype=np.float64)