Coverage for src/onorm/pipeline.py: 98%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-07 20:22 +0000

1import json 

2from typing import Any, Dict, List 

3 

4import numpy as np 

5 

6from .normalization_base import Normalizer 

7 

8 

9class Pipeline(Normalizer): 

10 """ 

11 Pipeline for chaining multiple normalizers sequentially. 

12 

13 Applies a sequence of normalizers in order, where each normalizer's output 

14 becomes the input to the next normalizer. This allows combining different 

15 normalization strategies (e.g., winsorization followed by standardization). 

16 

17 During partial_fit, each normalizer is fitted with the transformed output 

18 from the previous normalizer, ensuring that each stage learns from the 

19 distribution it will actually see during inference. 

20 

21 Parameters 

22 ---------- 

23 normalizers : List[Normalizer] 

24 List of normalizer instances to apply in sequence. Can be empty. 

25 

26 Attributes 

27 ---------- 

28 normalizers : List[Normalizer] 

29 The list of normalizers in the pipeline. 

30 

31 Examples 

32 -------- 

33 ```{python} 

34 from onorm import Pipeline, Winsorizer, StandardScaler, MinMaxScaler 

35 import numpy as np 

36 # Create a pipeline: clip outliers, then standardize, then scale to [0,1] 

37 pipeline = Pipeline( 

38 [ 

39 Winsorizer(n_dim=3, clip_q=(0.05, 0.95)), 

40 StandardScaler(n_dim=3), 

41 MinMaxScaler(n_dim=3), 

42 ] 

43 ) 

44 X = np.random.normal(size=(100, 3)) 

45 for x in X: 

46 pipeline.partial_fit(x) 

47 x_new = np.array([2.0, -1.0, 0.5]) 

48 x_normalized = pipeline.transform(x_new.copy()) 

49 

50 # Empty pipeline (identity transformation) 

51 identity_pipeline = Pipeline([]) 

52 ``` 

53 

54 Notes 

55 ----- 

56 - Order matters: Pipeline([A, B]) produces different results than Pipeline([B, A]) 

57 - Each normalizer is fitted with the transformed output from the previous stage 

58 - Both fitting and transformation are applied sequentially through the chain 

59 - Empty pipeline returns input unchanged 

60 """ 

61 

62 def __init__(self, normalizers: List[Normalizer]) -> None: 

63 self.normalizers = normalizers 

64 

65 def partial_fit(self, x: np.ndarray) -> None: 

66 """ 

67 Update all normalizers sequentially with transformed observations. 

68 

69 Each normalizer is fitted with the transformed output from the previous 

70 normalizer in the pipeline. This ensures each stage learns from the 

71 distribution it will encounter during transformation. 

72 

73 Parameters 

74 ---------- 

75 x : np.ndarray 

76 A 1-D array representing a new observation. The shape should match 

77 the n_dim parameter used by the normalizers. 

78 

79 Notes 

80 ----- 

81 The data is transformed sequentially: normalizer 1 fits on x, normalizer 2 

82 fits on transform_1(x), normalizer 3 fits on transform_2(transform_1(x)), 

83 and so on. 

84 """ 

85 x_transformed = x.copy() 

86 for normalizer in self.normalizers: 

87 normalizer.partial_fit(x_transformed) 

88 x_transformed = normalizer.transform(x_transformed) 

89 

90 def transform(self, x: np.ndarray) -> np.ndarray: 

91 """ 

92 Apply all normalizers sequentially to transform the data. 

93 

94 The output of each normalizer becomes the input to the next normalizer 

95 in the pipeline. 

96 

97 Parameters 

98 ---------- 

99 x : np.ndarray 

100 A 1-D array to normalize. 

101 

102 Returns 

103 ------- 

104 np.ndarray 

105 The transformed array after applying all normalizers in sequence. 

106 If the pipeline is empty, returns the input unchanged. 

107 

108 Notes 

109 ----- 

110 This method modifies the input array in-place for efficiency. Pass a 

111 copy if you need to preserve the original: transform(x.copy()). 

112 """ 

113 for normalizer in self.normalizers: 

114 x = normalizer.transform(x) 

115 return x 

116 

117 def reset(self) -> None: 

118 """ 

119 Reset all normalizers in the pipeline to their initial state. 

120 

121 Calls reset() on each normalizer, clearing all learned statistics. 

122 """ 

123 for normalizer in self.normalizers: 

124 normalizer.reset() 

125 

126 def to_dict(self) -> Dict[str, Any]: 

127 """ 

128 Serialize the pipeline to a dictionary. 

129 

130 Returns 

131 ------- 

132 dict 

133 Dictionary with recursively serialized normalizers. 

134 

135 Notes 

136 ----- 

137 Each normalizer in the pipeline is serialized using its own to_dict() method. 

138 """ 

139 return { 

140 "version": "1.0", 

141 "class": "Pipeline", 

142 "config": {}, 

143 "state": {"normalizers": [norm.to_dict() for norm in self.normalizers]}, 

144 } 

145 

146 @classmethod 

147 def from_dict(cls, data: Dict[str, Any]) -> "Pipeline": 

148 """ 

149 Deserialize a pipeline from a dictionary. 

150 

151 Parameters 

152 ---------- 

153 data : dict 

154 Dictionary created by to_dict(). 

155 

156 Returns 

157 ------- 

158 Pipeline 

159 Deserialized pipeline instance. 

160 

161 Notes 

162 ----- 

163 Automatically detects the class of each normalizer and deserializes accordingly. 

164 """ 

165 if data.get("class") != "Pipeline": 

166 raise ValueError(f"Cannot deserialize {data.get('class')} as Pipeline") 

167 

168 # Import normalizer classes (avoid circular imports) 

169 from . import ( 

170 MinMaxScaler, 

171 MultivariateNormalizer, 

172 QuantileTransformer, 

173 StandardScaler, 

174 Winsorizer, 

175 ) 

176 

177 class_map = { 

178 "MinMaxScaler": MinMaxScaler, 

179 "StandardScaler": StandardScaler, 

180 "MultivariateNormalizer": MultivariateNormalizer, 

181 "Winsorizer": Winsorizer, 

182 "QuantileTransformer": QuantileTransformer, 

183 "Pipeline": cls, 

184 } 

185 

186 state = data["state"] 

187 normalizers = [] 

188 for norm_data in state["normalizers"]: 

189 norm_class = class_map.get(norm_data["class"]) 

190 if norm_class is None: 

191 raise ValueError(f"Unknown normalizer class: {norm_data['class']}") 

192 normalizers.append(norm_class.from_dict(norm_data)) 

193 

194 return cls(normalizers) 

195 

196 def to_json(self) -> str: 

197 """Serialize the pipeline to a JSON string.""" 

198 return json.dumps(self.to_dict(), indent=2) 

199 

200 @classmethod 

201 def from_json(cls, json_str: str) -> "Pipeline": 

202 """Deserialize a pipeline from a JSON string.""" 

203 return cls.from_dict(json.loads(json_str))