Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

#647 Feature/sg 573 Integrate new EMA decay schedules

Merged
Ghost merged 1 commits into Deci-AI:master from deci-ai:feature/SG-573-Integrate-EMA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  1. import dataclasses
  2. import time
  3. from functools import partial
  4. from typing import Callable, List, Iterator, Union, Any, Optional
  5. from super_gradients.common.environment.monitoring.gpu import init_nvidia_management_lib, count_gpus
  6. @dataclasses.dataclass
  7. class StatAggregator:
  8. """Accumulate statistics samples and aggregates them.
  9. :param name: Name of the statistic
  10. :param sampling_fn: How the statistic is sampled
  11. :param aggregate_fn: How the statistic samples are aggregated, has to take "samples: List[Any]" and "time: float" as parameters
  12. :param reset_callback_fn: Optional, can be used to reset any system metric
  13. """
  14. name: str
  15. sampling_fn: Callable
  16. aggregate_fn: Callable[[List[Any], float], float]
  17. reset_callback_fn: Optional[Callable] = None
  18. _samples: List = dataclasses.field(default_factory=list)
  19. _reset_time: float = None
  20. def sample(self):
  21. try:
  22. self._samples.append(self.sampling_fn())
  23. except Exception:
  24. pass
  25. def aggregate(self) -> Union[float, None]:
  26. if len(self._samples) > 0:
  27. time_diff = time.time() - self._reset_time
  28. return self.aggregate_fn(self._samples, time_diff)
  29. def reset(self):
  30. self._samples = []
  31. self._reset_time = time.time()
  32. if self.reset_callback_fn:
  33. self.reset_callback_fn()
  34. @dataclasses.dataclass
  35. class GPUStatAggregatorIterator:
  36. """Iterator of multiple StatAggregator, that accumulate samples and aggregates them for each NVIDIA device.
  37. :param name: Name of the statistic
  38. :param sampling_fn: How the statistic is sampled
  39. :param aggregate_fn: How the statistic samples are aggregated
  40. """
  41. name: str
  42. device_sampling_fn: Callable
  43. device_aggregate_fn: Callable
  44. _per_device_stat_aggregator: List[StatAggregator] = dataclasses.field(init=False)
  45. def __post_init__(self):
  46. """Initialize nvidia_management_lib and create a list of StatAggregator, one for each NVIDIA device."""
  47. init_nvidia_management_lib()
  48. self._per_device_stat_aggregator = [
  49. StatAggregator(name=f"{self.name}/device_{i}", sampling_fn=partial(self.device_sampling_fn, i), aggregate_fn=self.device_aggregate_fn)
  50. for i in range(count_gpus())
  51. ]
  52. def __iter__(self) -> Iterator[StatAggregator]:
  53. """Iterate over the StatAggregator of each node"""
  54. return iter(self._per_device_stat_aggregator)
Discard
Tip!

Press p or to see the previous file or, n or to see the next file