Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

#757 Feature/sg 635 fix dataloader multiprocessing

Merged
Ghost merged 1 commits into Deci-AI:master from deci-ai:feature/SG-635-fix_dataloader_multiprocessing
@@ -1,11 +1,13 @@
-import os
 import logging
 import logging
 import logging.config
 import logging.config
 from typing import Union
 from typing import Union
 
 
-from super_gradients.common.environment.env_variables import env_variables
+from super_gradients.common.abstractions.mute_processes import mute_subprocesses
 from super_gradients.common.auto_logging.auto_logger import AutoLoggerConfig
 from super_gradients.common.auto_logging.auto_logger import AutoLoggerConfig
 
 
+# Mute on import to avoid the import prints/logs on sub processes
+mute_subprocesses()
+
 
 
 def get_logger(logger_name: str, log_level: Union[str, None] = None) -> logging.Logger:
 def get_logger(logger_name: str, log_level: Union[str, None] = None) -> logging.Logger:
     AutoLoggerConfig.get_instance()
     AutoLoggerConfig.get_instance()
@@ -13,8 +15,7 @@ def get_logger(logger_name: str, log_level: Union[str, None] = None) -> logging.
     if log_level is not None:
     if log_level is not None:
         logger.setLevel(log_level)
         logger.setLevel(log_level)
 
 
-    if int(env_variables.LOCAL_RANK) > 0:
-        mute_current_process()
+    mute_subprocesses()
     return logger
     return logger
 
 
 
 
@@ -26,21 +27,3 @@ class ILogger:
     def __init__(self, logger_name: str = None):
     def __init__(self, logger_name: str = None):
         logger_name = logger_name if logger_name else str(self.__module__)
         logger_name = logger_name if logger_name else str(self.__module__)
         self._logger: logging.Logger = get_logger(logger_name)
         self._logger: logging.Logger = get_logger(logger_name)
-
-
-def mute_current_process():
-    """Mute prints, warnings and all logs except ERRORS. This is meant when running multiple processes."""
-    # Ignore warnings
-    import warnings
-
-    warnings.filterwarnings("ignore")
-
-    # Ignore prints
-    import sys
-
-    sys.stdout = open(os.devnull, "w")
-
-    # Only show ERRORS
-    process_loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
-    for logger in process_loggers:
-        logger.setLevel(logging.ERROR)
Discard
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  1. import logging
  2. import os
  3. import platform
  4. import psutil
  5. from super_gradients.common.environment.env_variables import env_variables
  6. def mute_subprocesses():
  7. """Mute (prints, warnings and all logs except ERRORS) of some subprocesses to avoid having duplicates in the logs."""
  8. # When running DDP, mute all nodes except for the master node
  9. if int(env_variables.LOCAL_RANK) > 0:
  10. mute_current_process()
  11. mute_non_linux_dataloader_worker_process()
  12. def mute_current_process():
  13. """Mute prints, warnings and all logs except ERRORS. This is meant when running multiple processes."""
  14. # Ignore warnings
  15. import warnings
  16. warnings.filterwarnings("ignore")
  17. # Ignore prints
  18. import sys
  19. sys.stdout = open(os.devnull, "w")
  20. # Only show ERRORS
  21. process_loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
  22. for logger in process_loggers:
  23. logger.setLevel(logging.ERROR)
  24. def mute_non_linux_dataloader_worker_process() -> None:
  25. """Mute any worker process when running on mac/windows.
  26. This is required because the dataloader workers are "spawned" on mac/windows and "forked" on linux.
  27. The consequence being that the on mac/windows every module will be imported on each worker process, leading to a huge number of prints/logs that are
  28. displayed on import.
  29. For more information: https://pytorch.org/docs/stable/data.html#platform-specific-behaviors
  30. To avoid this, we mute the dataloader workers when running on mac/windows.
  31. Note:
  32. We assume that the process tree looks like this:
  33. Without DDP:
  34. ... -> main_process -> worker_process
  35. With DDP:
  36. ... -> main_process -> node_process -> worker_process
  37. Knowing that depending on how the script is launched, main_process might be child of other non "python" processes such as:
  38. ssh(non-python) -> pycharm(non-python) -> main_process(python) -> ...
  39. """
  40. if is_non_linux_dataloader_worker_process():
  41. mute_current_process()
  42. def is_non_linux_dataloader_worker_process() -> bool:
  43. """Check if current process is a dataloader worker process on a non linux device."""
  44. if any(os_name in platform.platform() for os_name in ["macOS", "Windows"]):
  45. # When using DDP with SG launcher, we expect the worker process to have 2 parents processes using python, and only 1 otherwise.
  46. # Note that this is a "root_process" is the root process only if current process is a worker process
  47. if int(env_variables.LOCAL_RANK) == -1:
  48. # NO DDP
  49. main_process = psutil.Process().parent()
  50. elif os.environ.get("TORCHELASTIC_RUN_ID") == "sg_initiated":
  51. # DDP launched using SG logic
  52. main_process = psutil.Process().parent().parent()
  53. else:
  54. # DDP launched using torch.distributed.launch or torchrun
  55. main_process = psutil.Process().parent()
  56. is_worker_process = main_process and "python" in main_process.name()
  57. if is_worker_process:
  58. return True
  59. return False
Discard
@@ -36,6 +36,7 @@ def is_main_process():
         - If DDP launched using SuperGradients: main process is the launching process (rank=-1)
         - If DDP launched using SuperGradients: main process is the launching process (rank=-1)
         - If DDP launched with torch: main process is rank 0
         - If DDP launched with torch: main process is rank 0
     """
     """
+
     if not is_distributed():  # If no DDP, or DDP launching process
     if not is_distributed():  # If no DDP, or DDP launching process
         return True
         return True
     elif (
     elif (
Discard
@@ -27,7 +27,7 @@ def check_os():
 
 
     if "linux" not in sys.platform.lower():
     if "linux" not in sys.platform.lower():
         error = "Deci officially supports only Linux kernels. Some features may not work as expected."
         error = "Deci officially supports only Linux kernels. Some features may not work as expected."
-        logger.error(msg=format_error_msg(test_name="operating system", error_msg=error))
+        logger.warning(msg=format_error_msg(test_name="operating system", error_msg=error))
 
 
 
 
 def get_requirements_path(requirements_file_name: str) -> Optional[Path]:
 def get_requirements_path(requirements_file_name: str) -> Optional[Path]:
@@ -89,7 +89,7 @@ def check_packages():
 
 
         if package_name not in installed_packages.keys():
         if package_name not in installed_packages.keys():
             error = f"{package_name} required but not found"
             error = f"{package_name} required but not found"
-            logger.error(msg=format_error_msg(test_name=test_name, error_msg=error))
+            logger.warning(msg=format_error_msg(test_name=test_name, error_msg=error))
             continue
             continue
 
 
         installed_version_str = installed_packages[package_name]
         installed_version_str = installed_packages[package_name]
@@ -104,7 +104,7 @@ def check_packages():
 
 
                 requires_at_least = operator_str in ("==", "~=", ">=", ">")
                 requires_at_least = operator_str in ("==", "~=", ">=", ">")
                 if requires_at_least and installed_version < req_version:
                 if requires_at_least and installed_version < req_version:
-                    logger.error(msg=format_error_msg(test_name=test_name, error_msg=error))
+                    logger.warning(msg=format_error_msg(test_name=test_name, error_msg=error))
                 else:
                 else:
                     logger.debug(msg=error)
                     logger.debug(msg=error)
 
 
Discard
@@ -18,7 +18,8 @@ from super_gradients.common.environment.argparse_utils import EXTRA_ARGS
 from super_gradients.common.environment.ddp_utils import find_free_port, is_distributed, is_launched_using_sg
 from super_gradients.common.environment.ddp_utils import find_free_port, is_distributed, is_launched_using_sg
 
 
 
 
-from super_gradients.common.abstractions.abstract_logger import get_logger, mute_current_process
+from super_gradients.common.abstractions.abstract_logger import get_logger
+from super_gradients.common.abstractions.mute_processes import mute_current_process
 from super_gradients.common.environment.device_utils import device_config
 from super_gradients.common.environment.device_utils import device_config
 
 
 from super_gradients.common.decorators.factory_decorator import resolve_param
 from super_gradients.common.decorators.factory_decorator import resolve_param
Discard