Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

workflow.py 3.7 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
  1. from dataclasses import *
  2. from functools import cached_property
  3. from sklearn.base import TransformerMixin
  4. from sklearn.pipeline import Pipeline
  5. from sklearn.preprocessing import LabelEncoder
  6. from yspecies.dataset import ExpressionDataset
  7. from yspecies.utils import *
  8. @dataclass(frozen=True)
  9. class SplitReduce(TransformerMixin):
  10. '''
  11. This class is a bit complicated,
  12. it is needed when you want to split parameters in several pieces and send them to different pipelines/transformers
  13. and then assemble (reduce) result together.
  14. '''
  15. outputs: List[Union[TransformerMixin, Pipeline]] #transformers/pipelines to which we split the output
  16. split: Callable[[Any], List[Any]] #function that distributes/splits the outputs, should return a list with the same dimension as outputs field
  17. reduce: Callable[[Any, List[Any]], Any] # when
  18. def fit(self, X, y=None):
  19. return self
  20. def transform(self, X):
  21. data = X
  22. inputs = self.split(data)
  23. outputs = self.outputs if isinstance(self.outputs, Iterable) else [self.outputs]
  24. assert len(inputs) == len(outputs), f"splitter should give one input per each output! Now len(inputs) {len(inputs)} and len(outputs) {len(outputs)}"
  25. results = [o.fit_transform(inputs[i]) for i, o in enumerate(outputs)]
  26. reduced_results = self.reduce(data, results)
  27. return reduced_results
  28. @dataclass(frozen=True)
  29. class Join(TransformerMixin):
  30. inputs: List[Union[TransformerMixin, Pipeline]]
  31. output: Union[Union[TransformerMixin, Pipeline], Callable[[List[Any]], Any]]
  32. def fit(self, X, y=None):
  33. return self
  34. def transform(self, X):
  35. data = [t.fit_transform(X) for t in self.inputs]
  36. return self.output(data) if isinstance(self.output, Callable) else self.output.fit_transform(data)
  37. @dataclass(frozen=True)
  38. class Collect(TransformerMixin):
  39. '''
  40. turns a filtered (by filter) collection into one value
  41. '''
  42. fold: Callable[[Union[Iterable, Generator]], Any]
  43. filter: Callable[[Any], bool] = field(default_factory=lambda: lambda x: True) #just does nothing by default
  44. def fit(self, X, y=None):
  45. return self
  46. def transform(self, data: Iterable) -> Any:
  47. return self.fold([d for d in data if self.filter(d)])
  48. @dataclass(frozen=True)
  49. class Repeat(TransformerMixin):
  50. transformer: Union[TransformerMixin, Pipeline]
  51. repeats: Union[Union[Iterable, Generator], int]
  52. map: Callable[[Any, Any], Any] = field(default_factory=lambda: lambda x, i: x) #transforms data before passing it to the transformer
  53. @cached_property
  54. def iterable(self) -> Iterable:
  55. return self.repeats if (isinstance(self.repeats, Iterable) or isinstance(self.repeats, Generator)) else range(0, self.repeats)
  56. def fit(self, X, y=None):
  57. return self
  58. def transform(self, data: Any):
  59. return [self.transformer.fit_transform(self.map(data, i)) for i in self.iterable]
  60. @dataclass(frozen=True)
  61. class TupleWith(TransformerMixin):
  62. """
  63. Concatenates (in tuple) the results of Transformers or parameters plus transformers
  64. """
  65. parameters: Union[Union[TransformerMixin, Pipeline], Any]
  66. map_left: Callable[[Any], Any] = field(default_factory=lambda: lambda x: x)
  67. map_right: Callable[[Any], Any] = field(default_factory=lambda: lambda x: x)
  68. def fit(self, X, y = None):
  69. return self
  70. def transform(self, data: Any) -> Tuple:
  71. if isinstance(self.parameters, TransformerMixin) or isinstance(self.parameters, Pipeline):
  72. return (self.map_left(data), self.parameters.fit_transform(self.map_right(data)))
  73. else:
  74. return (self.map_left(data),) + (self.map_right(self.parameters) if isinstance(self.parameters, Tuple) else (self.map_right(self.parameters), ))
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...