Coverage for src/onorm/pipeline.py: 98%
40 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 20:22 +0000
1import json
2from typing import Any, Dict, List
4import numpy as np
6from .normalization_base import Normalizer
9class Pipeline(Normalizer):
10 """
11 Pipeline for chaining multiple normalizers sequentially.
13 Applies a sequence of normalizers in order, where each normalizer's output
14 becomes the input to the next normalizer. This allows combining different
15 normalization strategies (e.g., winsorization followed by standardization).
17 During partial_fit, each normalizer is fitted with the transformed output
18 from the previous normalizer, ensuring that each stage learns from the
19 distribution it will actually see during inference.
21 Parameters
22 ----------
23 normalizers : List[Normalizer]
24 List of normalizer instances to apply in sequence. Can be empty.
26 Attributes
27 ----------
28 normalizers : List[Normalizer]
29 The list of normalizers in the pipeline.
31 Examples
32 --------
33 ```{python}
34 from onorm import Pipeline, Winsorizer, StandardScaler, MinMaxScaler
35 import numpy as np
36 # Create a pipeline: clip outliers, then standardize, then scale to [0,1]
37 pipeline = Pipeline(
38 [
39 Winsorizer(n_dim=3, clip_q=(0.05, 0.95)),
40 StandardScaler(n_dim=3),
41 MinMaxScaler(n_dim=3),
42 ]
43 )
44 X = np.random.normal(size=(100, 3))
45 for x in X:
46 pipeline.partial_fit(x)
47 x_new = np.array([2.0, -1.0, 0.5])
48 x_normalized = pipeline.transform(x_new.copy())
50 # Empty pipeline (identity transformation)
51 identity_pipeline = Pipeline([])
52 ```
54 Notes
55 -----
56 - Order matters: Pipeline([A, B]) produces different results than Pipeline([B, A])
57 - Each normalizer is fitted with the transformed output from the previous stage
58 - Both fitting and transformation are applied sequentially through the chain
59 - Empty pipeline returns input unchanged
60 """
62 def __init__(self, normalizers: List[Normalizer]) -> None:
63 self.normalizers = normalizers
65 def partial_fit(self, x: np.ndarray) -> None:
66 """
67 Update all normalizers sequentially with transformed observations.
69 Each normalizer is fitted with the transformed output from the previous
70 normalizer in the pipeline. This ensures each stage learns from the
71 distribution it will encounter during transformation.
73 Parameters
74 ----------
75 x : np.ndarray
76 A 1-D array representing a new observation. The shape should match
77 the n_dim parameter used by the normalizers.
79 Notes
80 -----
81 The data is transformed sequentially: normalizer 1 fits on x, normalizer 2
82 fits on transform_1(x), normalizer 3 fits on transform_2(transform_1(x)),
83 and so on.
84 """
85 x_transformed = x.copy()
86 for normalizer in self.normalizers:
87 normalizer.partial_fit(x_transformed)
88 x_transformed = normalizer.transform(x_transformed)
90 def transform(self, x: np.ndarray) -> np.ndarray:
91 """
92 Apply all normalizers sequentially to transform the data.
94 The output of each normalizer becomes the input to the next normalizer
95 in the pipeline.
97 Parameters
98 ----------
99 x : np.ndarray
100 A 1-D array to normalize.
102 Returns
103 -------
104 np.ndarray
105 The transformed array after applying all normalizers in sequence.
106 If the pipeline is empty, returns the input unchanged.
108 Notes
109 -----
110 This method modifies the input array in-place for efficiency. Pass a
111 copy if you need to preserve the original: transform(x.copy()).
112 """
113 for normalizer in self.normalizers:
114 x = normalizer.transform(x)
115 return x
117 def reset(self) -> None:
118 """
119 Reset all normalizers in the pipeline to their initial state.
121 Calls reset() on each normalizer, clearing all learned statistics.
122 """
123 for normalizer in self.normalizers:
124 normalizer.reset()
126 def to_dict(self) -> Dict[str, Any]:
127 """
128 Serialize the pipeline to a dictionary.
130 Returns
131 -------
132 dict
133 Dictionary with recursively serialized normalizers.
135 Notes
136 -----
137 Each normalizer in the pipeline is serialized using its own to_dict() method.
138 """
139 return {
140 "version": "1.0",
141 "class": "Pipeline",
142 "config": {},
143 "state": {"normalizers": [norm.to_dict() for norm in self.normalizers]},
144 }
146 @classmethod
147 def from_dict(cls, data: Dict[str, Any]) -> "Pipeline":
148 """
149 Deserialize a pipeline from a dictionary.
151 Parameters
152 ----------
153 data : dict
154 Dictionary created by to_dict().
156 Returns
157 -------
158 Pipeline
159 Deserialized pipeline instance.
161 Notes
162 -----
163 Automatically detects the class of each normalizer and deserializes accordingly.
164 """
165 if data.get("class") != "Pipeline":
166 raise ValueError(f"Cannot deserialize {data.get('class')} as Pipeline")
168 # Import normalizer classes (avoid circular imports)
169 from . import (
170 MinMaxScaler,
171 MultivariateNormalizer,
172 QuantileTransformer,
173 StandardScaler,
174 Winsorizer,
175 )
177 class_map = {
178 "MinMaxScaler": MinMaxScaler,
179 "StandardScaler": StandardScaler,
180 "MultivariateNormalizer": MultivariateNormalizer,
181 "Winsorizer": Winsorizer,
182 "QuantileTransformer": QuantileTransformer,
183 "Pipeline": cls,
184 }
186 state = data["state"]
187 normalizers = []
188 for norm_data in state["normalizers"]:
189 norm_class = class_map.get(norm_data["class"])
190 if norm_class is None:
191 raise ValueError(f"Unknown normalizer class: {norm_data['class']}")
192 normalizers.append(norm_class.from_dict(norm_data))
194 return cls(normalizers)
196 def to_json(self) -> str:
197 """Serialize the pipeline to a JSON string."""
198 return json.dumps(self.to_dict(), indent=2)
200 @classmethod
201 def from_json(cls, json_str: str) -> "Pipeline":
202 """Deserialize a pipeline from a JSON string."""
203 return cls.from_dict(json.loads(json_str))