Coverage for src/onorm/mvnorm.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-07 20:22 +0000

1import base64 

2import json 

3from typing import Any, Dict 

4 

5import numpy as np 

6from scipy.linalg import cholesky, inv 

7 

8from .normalization_base import Normalizer 

9 

10 

11class MultivariateNormalizer(Normalizer): 

12 r""" 

13 Online multivariate normalizer with decorrelation via covariance estimation. 

14 

15 Transforms multivariate data to have zero mean, unit variance, and zero 

16 correlation (decorrelation). Uses online estimation of the covariance matrix 

17 and computes the inverse square root for transformation. 

18 

19 The transformation is: 

20 

21 $$\Sigma^{-1/2} (x - \mu)$$ 

22 

23 where $\mu$ is the estimated mean and $\Sigma$ is the estimated covariance matrix. 

24 

25 Parameters 

26 ---------- 

27 n_dim : int 

28 Number of dimensions/features in the data. 

29 

30 Attributes 

31 ---------- 

32 n : int 

33 Number of observations seen so far. 

34 muhat : np.ndarray 

35 Estimated mean vector, shape (n_dim,). 

36 Sigmahat : np.ndarray 

37 Estimated covariance matrix (computed from _M), shape (n_dim, n_dim). 

38 invsqrtSigmahat : np.ndarray 

39 Inverse square root of covariance matrix ($\Sigma^{-1/2}$), shape (n_dim, n_dim). 

40 

41 Examples 

42 -------- 

43 ```{python} 

44 from onorm import MultivariateNormalizer 

45 import numpy as np 

46 normalizer = MultivariateNormalizer(n_dim=3) 

47 # Generate correlated data 

48 cov = np.array([[1, 0.5, 0.3], [0.5, 1, 0.4], [0.3, 0.4, 1]]) 

49 X = np.random.multivariate_normal([0, 0, 0], cov, size=100) 

50 for x in X: 

51 normalizer.partial_fit(x) 

52 x_new = np.array([1.0, 1.0, 1.0]) 

53 x_normalized = normalizer.transform(x_new.copy()) 

54 # x_normalized will be decorrelated with zero mean and unit variance 

55 ``` 

56 

57 Notes 

58 ----- 

59 - Time complexity: O(d²) per observation for fitting, O(d²) for transformation 

60 - Space complexity: O(d²) for storing covariance matrix 

61 - The covariance matrix is computed using Welford's online algorithm 

62 - Transformation uses Cholesky decomposition of the inverse covariance 

63 

64 References 

65 ---------- 

66 [Welford's online algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm) 

67 """ 

68 

69 def __init__(self, n_dim: int) -> None: 

70 self.n_dim = n_dim 

71 self.reset() 

72 

73 def partial_fit(self, x: np.ndarray) -> None: 

74 """ 

75 Update mean and covariance estimates with a new observation. 

76 

77 Uses Welford's online algorithm to incrementally update the mean 

78 and covariance matrix. 

79 

80 Parameters 

81 ---------- 

82 x : np.ndarray 

83 A 1-D array of shape (n_dim,) representing a new observation. 

84 

85 See Also 

86 -------- 

87 Normalizer.partial_fit : Base class method for incremental fitting. 

88 

89 References 

90 ---------- 

91 [Welford's online algorithm](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm) 

92 """ 

93 delta = x - self.muhat 

94 self.n += 1 

95 self._update_muhat(delta) 

96 self._update_Sigmahat(delta) 

97 

98 def _update_muhat(self, delta: np.ndarray) -> None: 

99 """Update running mean estimate using Welford's algorithm.""" 

100 self.muhat += delta / self.n 

101 

102 def _update_Sigmahat(self, delta: np.ndarray) -> None: 

103 """ 

104 Update covariance matrix using Welford's algorithm. 

105 

106 Accumulates the sum of outer products in _M, which is used to 

107 compute the covariance matrix. 

108 """ 

109 delta = delta.reshape(-1, 1) 

110 frac = (self.n - 1) / self.n 

111 self._M += frac * delta @ delta.T 

112 

113 @property 

114 def Sigmahat(self) -> np.ndarray: 

115 """ 

116 Compute the sample covariance matrix. 

117 

118 Returns 

119 ------- 

120 np.ndarray 

121 Sample covariance matrix, shape (n_dim, n_dim). Returns identity 

122 matrix if n <= 1. 

123 """ 

124 if self.n <= 1: 

125 return np.eye(self.n_dim, dtype=np.float64) 

126 return self._M / (self.n - 1) 

127 

128 @property 

129 def invsqrtSigmahat(self) -> np.ndarray: 

130 r""" 

131 Compute the inverse square root of the covariance matrix. 

132 

133 Returns: 

134 

135 $$\Sigma^{-1/2}$$ 

136 

137 Computed via Cholesky decomposition of the inverse covariance matrix. 

138 

139 Returns 

140 ------- 

141 np.ndarray 

142 Inverse square root of covariance matrix, shape (n_dim, n_dim). 

143 Returns identity matrix if covariance is not positive definite. 

144 """ 

145 try: 

146 sqrtinvSigma = cholesky(inv(self.Sigmahat, check_finite=False), check_finite=False) 

147 except np.linalg.LinAlgError: 

148 sqrtinvSigma = np.eye(self.n_dim, dtype=np.float64) 

149 return sqrtinvSigma 

150 

151 def transform(self, x: np.ndarray) -> np.ndarray: 

152 """ 

153 Apply multivariate normalization (decorrelation and standardization). 

154 

155 Parameters 

156 ---------- 

157 x : np.ndarray 

158 A 1-D array of shape (n_dim,) to normalize. 

159 

160 Returns 

161 ------- 

162 np.ndarray 

163 Decorrelated and standardized array of shape (n_dim,). 

164 """ 

165 return (self.invsqrtSigmahat @ (x - self.muhat)).reshape(-1) 

166 

167 def to_dict(self) -> Dict[str, Any]: 

168 """ 

169 Serialize the normalizer state to a dictionary. 

170 

171 Returns 

172 ------- 

173 dict 

174 Dictionary with JSON-serializable metadata and base64-encoded arrays. 

175 """ 

176 return { 

177 "version": "1.0", 

178 "class": "MultivariateNormalizer", 

179 "config": {"n_dim": self.n_dim}, 

180 "state": { 

181 "n": self.n, 

182 "muhat": base64.b64encode(self.muhat.tobytes()).decode("ascii"), 

183 "_M": base64.b64encode(self._M.tobytes()).decode("ascii"), 

184 }, 

185 } 

186 

187 @classmethod 

188 def from_dict(cls, data: Dict[str, Any]) -> "MultivariateNormalizer": 

189 """ 

190 Deserialize a normalizer from a dictionary. 

191 

192 Parameters 

193 ---------- 

194 data : dict 

195 Dictionary created by to_dict(). 

196 

197 Returns 

198 ------- 

199 MultivariateNormalizer 

200 Deserialized normalizer instance. 

201 """ 

202 if data.get("class") != "MultivariateNormalizer": 

203 raise ValueError(f"Cannot deserialize {data.get('class')} as MultivariateNormalizer") 

204 

205 config = data["config"] 

206 instance = cls(n_dim=config["n_dim"]) 

207 

208 state = data["state"] 

209 instance.n = state["n"] 

210 instance.muhat = np.frombuffer(base64.b64decode(state["muhat"]), dtype=np.float64) 

211 instance._M = np.frombuffer(base64.b64decode(state["_M"]), dtype=np.float64).reshape( 

212 (config["n_dim"], config["n_dim"]) 

213 ) 

214 

215 return instance 

216 

217 def to_json(self) -> str: 

218 """Serialize the normalizer to a JSON string.""" 

219 return json.dumps(self.to_dict(), indent=2) 

220 

221 @classmethod 

222 def from_json(cls, json_str: str) -> "MultivariateNormalizer": 

223 """Deserialize a normalizer from a JSON string.""" 

224 return cls.from_dict(json.loads(json_str)) 

225 

226 def reset(self) -> None: 

227 """ 

228 Reset the normalizer to initial state. 

229 

230 Reinitializes observation count, mean, and covariance accumulator (_M) 

231 to zeros. 

232 """ 

233 self.n = 0 

234 self.muhat = np.zeros(self.n_dim, dtype=np.float64) 

235 self._M = np.zeros((self.n_dim, self.n_dim), dtype=np.float64)