1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
- from functools import cached_property
- import lightgbm as lgb
- from sklearn.base import TransformerMixin
- from sklearn.metrics import *
- from yspecies.partition import ExpressionPartitions
- from yspecies.utils import *
- @dataclass(frozen=True)
- class BasicMetrics:
- MAE: float
- MSE: float
- huber: float
- @staticmethod
- def from_dict(dict: Dict):
- return BasicMetrics(dict["l1"], dict["l2"], dict["huber"])
- @staticmethod
- def from_dict(dict: Dict, row: int):
- return BasicMetrics(dict["l1"][row], dict["l2"][row], dict["huber"][row])
- @staticmethod
- def parse_eval(evals_result: Dict):
- dict = list(evals_result.values())[0]
- l = len(dict["l1"])
- return [BasicMetrics.from_dict(dict, i) for i in range(0, l)]
- @dataclass(frozen=True)
- class Metrics:
- @staticmethod
- def from_numpy(arr: np.ndarray):
- return Metrics(arr[0], arr[1], arr[2], arr[3])
- @staticmethod
- def average(metrics: List['Metrics']) -> 'Metrics':
- return Metrics.from_numpy(np.average([m.to_numpy for m in metrics], axis=0))
- '''
- Class to store metrics
- '''
- @staticmethod
- def to_dataframe(metrics: Union[List['Metrics'], 'Metrics']) -> pd.DataFrame:
- metrics = [metrics] if isinstance(metrics, Metrics) else metrics
- mts = pd.DataFrame(np.zeros([len(metrics), 4]), columns=["R^2", "MAE", "MSE", "huber"]) #, "MSLE"
- for i, m in enumerate(metrics):
- mts.iloc[i] = m.to_numpy
- return mts
- @staticmethod
- def calculate(ground_truth, prediction, huber: float = None) -> 'Metrics':
- """
- Calculates metrics while getting huber from outside
- :param ground_truth:
- :param prediction:
- :param huber:
- :return:
- """
- return Metrics(
- r2_score(ground_truth, prediction),
- mean_absolute_error(ground_truth, prediction),
- mean_squared_error(ground_truth, prediction),
- huber=huber
- #mean_squared_log_error(ground_truth, prediction)
- )
- R2: float
- MAE: float
- MSE: float
- huber: float
- #MSLE: float
- @cached_property
- def to_numpy(self):
- return np.array([self.R2, self.MAE, self.MSE, self.huber])
- @dataclass(frozen=True)
- class ResultsCV:
- parameters: Dict
- evaluation: Dict
- @staticmethod
- def take_best(results: List['ResultsCV'], metrics: str = "huber", last: bool = False):
- result: float = None
- for r in results:
- value = r.last(metrics) if last else r.min(metrics)
- result = value if result is None or value < result else result
- return result
- @cached_property
- def keys(self):
- return list(self.evaluation.keys())
- @cached_property
- def mins(self):
- return {k: (np.array(self.evaluation[k]).min()) for k in self.keys}
- @cached_property
- def latest(self):
- return {k: (np.array(self.evaluation[k])[-1]) for k in self.keys}
- def min(self, metrics: str) -> float:
- return self.mins[metrics] if metrics in self.mins else self.mins[metrics+"-mean"]
- def last(self, metrics: str) -> float:
- return self.latest[metrics] if metrics in self.latest else self.latest[metrics+"-mean"]
- def _repr_html_(self):
- first = self.evaluation[self.keys[0]]
- return f"""<table border='2'>
- <caption><h3>CrossValidation results</h3><caption>
- <tr style='text-align:center'>{"".join([f'<th>{k}</th>' for k in self.keys])}</tr>
- {"".join(["<tr>" + "".join([f"<td>{self.evaluation[k][i]}</td>" for k in self.keys]) + "</tr>" for i in range(0, len(first))])}
- </table>"""
- @dataclass
- class BasicCrossValidator(TransformerMixin):
- evaluation: ResultsCV = None
- num_iterations: int = 200
- early_stopping_rounds: int = 10
- def num_boost_round(self, parameters: Dict):
- return parameters.get("num_iterations") if parameters.get("num_iterations") is not None else parameters.get("num_boost_round") if parameters.get("num_boost_round") is not None else self.num_iterations
- def fit(self, to_fit: Tuple[ExpressionPartitions, Dict], y=None) -> Dict:
- partitions, parameters = to_fit
- cat = partitions.categorical_index if partitions.features.has_categorical else "auto"
- lgb_train = lgb.Dataset(partitions.X, partitions.Y, categorical_feature=cat, free_raw_data=False)
- num_boost_round = self.num_boost_round(parameters)
- iterations = parameters.get("num_boost_round") if parameters.get("num_iterations") is None else parameters.get("num_boost_round")
- stopping_callback = lgb.early_stopping(self.early_stopping_rounds)
- eval_hist = lgb.cv(parameters,
- lgb_train,
- folds=partitions.folds,
- metrics=["mae", "mse", "huber"],
- categorical_feature=cat,
- show_stdv=True,
- verbose_eval=num_boost_round,
- seed=partitions.seed,
- num_boost_round=num_boost_round,
- #early_stopping_rounds=self.early_stopping_rounds,
- callbacks=[stopping_callback]
- )
- self.evaluation = ResultsCV(parameters, eval_hist)
- return self
- def transform(self, to_fit: Tuple[ExpressionPartitions, Dict]):
- assert self.evaluation is not None, "Cross validation should be fitted before calling transform!"
- return self.evaluation
|