Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

path_utils.py 9.3 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
  1. import os
  2. import time
  3. supported_pt_extensions = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors'])
  4. folder_names_and_paths = {}
  5. base_path = os.getcwd()
  6. models_dir = os.path.join(base_path, "models")
  7. folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions)
  8. folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"])
  9. folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions)
  10. folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions)
  11. folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions)
  12. folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions)
  13. folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions)
  14. folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions)
  15. folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions)
  16. folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"])
  17. folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions)
  18. folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions)
  19. folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions)
  20. folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions)
  21. folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], [])
  22. folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions)
  23. folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions)
  24. folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
  25. output_directory = os.path.join(os.getcwd(), "output")
  26. temp_directory = os.path.join(os.getcwd(), "temp")
  27. input_directory = os.path.join(os.getcwd(), "input")
  28. user_directory = os.path.join(os.getcwd(), "user")
  29. filename_list_cache = {}
  30. if not os.path.exists(input_directory):
  31. try:
  32. pass # os.makedirs(input_directory)
  33. except:
  34. print("Failed to create input directory")
  35. def set_output_directory(output_dir):
  36. global output_directory
  37. output_directory = output_dir
  38. def set_temp_directory(temp_dir):
  39. global temp_directory
  40. temp_directory = temp_dir
  41. def set_input_directory(input_dir):
  42. global input_directory
  43. input_directory = input_dir
  44. def get_output_directory():
  45. global output_directory
  46. return output_directory
  47. def get_temp_directory():
  48. global temp_directory
  49. return temp_directory
  50. def get_input_directory():
  51. global input_directory
  52. return input_directory
  53. #NOTE: used in http server so don't put folders that should not be accessed remotely
  54. def get_directory_by_type(type_name):
  55. if type_name == "output":
  56. return get_output_directory()
  57. if type_name == "temp":
  58. return get_temp_directory()
  59. if type_name == "input":
  60. return get_input_directory()
  61. return None
  62. # determine base_dir rely on annotation if name is 'filename.ext [annotation]' format
  63. # otherwise use default_path as base_dir
  64. def annotated_filepath(name):
  65. if name.endswith("[output]"):
  66. base_dir = get_output_directory()
  67. name = name[:-9]
  68. elif name.endswith("[input]"):
  69. base_dir = get_input_directory()
  70. name = name[:-8]
  71. elif name.endswith("[temp]"):
  72. base_dir = get_temp_directory()
  73. name = name[:-7]
  74. else:
  75. return name, None
  76. return name, base_dir
  77. def get_annotated_filepath(name, default_dir=None):
  78. name, base_dir = annotated_filepath(name)
  79. if base_dir is None:
  80. if default_dir is not None:
  81. base_dir = default_dir
  82. else:
  83. base_dir = get_input_directory() # fallback path
  84. return os.path.join(base_dir, name)
  85. def exists_annotated_filepath(name):
  86. name, base_dir = annotated_filepath(name)
  87. if base_dir is None:
  88. base_dir = get_input_directory() # fallback path
  89. filepath = os.path.join(base_dir, name)
  90. return os.path.exists(filepath)
  91. def add_model_folder_path(folder_name, full_folder_path):
  92. global folder_names_and_paths
  93. if folder_name in folder_names_and_paths:
  94. folder_names_and_paths[folder_name][0].append(full_folder_path)
  95. else:
  96. folder_names_and_paths[folder_name] = ([full_folder_path], set())
  97. def get_folder_paths(folder_name):
  98. return folder_names_and_paths[folder_name][0][:]
  99. def recursive_search(directory, excluded_dir_names=None):
  100. if not os.path.isdir(directory):
  101. return [], {}
  102. if excluded_dir_names is None:
  103. excluded_dir_names = []
  104. result = []
  105. dirs = {}
  106. # Attempt to add the initial directory to dirs with error handling
  107. try:
  108. dirs[directory] = os.path.getmtime(directory)
  109. except FileNotFoundError:
  110. print(f"Warning: Unable to access {directory}. Skipping this path.")
  111. for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True):
  112. subdirs[:] = [d for d in subdirs if d not in excluded_dir_names]
  113. for file_name in filenames:
  114. relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory)
  115. result.append(relative_path)
  116. for d in subdirs:
  117. path = os.path.join(dirpath, d)
  118. try:
  119. dirs[path] = os.path.getmtime(path)
  120. except FileNotFoundError:
  121. print(f"Warning: Unable to access {path}. Skipping this path.")
  122. continue
  123. return result, dirs
  124. def filter_files_extensions(files, extensions):
  125. return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
  126. def get_full_path(folder_name, filename):
  127. global folder_names_and_paths
  128. if folder_name not in folder_names_and_paths:
  129. return None
  130. folders = folder_names_and_paths[folder_name]
  131. filename = os.path.relpath(os.path.join("/", filename), "/")
  132. for x in folders[0]:
  133. full_path = os.path.join(x, filename)
  134. if os.path.isfile(full_path):
  135. return full_path
  136. return None
  137. def get_filename_list_(folder_name):
  138. global folder_names_and_paths
  139. output_list = set()
  140. folders = folder_names_and_paths[folder_name]
  141. output_folders = {}
  142. for x in folders[0]:
  143. files, folders_all = recursive_search(x, excluded_dir_names=[".git"])
  144. output_list.update(filter_files_extensions(files, folders[1]))
  145. output_folders = {**output_folders, **folders_all}
  146. return (sorted(list(output_list)), output_folders, time.perf_counter())
  147. def cached_filename_list_(folder_name):
  148. global filename_list_cache
  149. global folder_names_and_paths
  150. if folder_name not in filename_list_cache:
  151. return None
  152. out = filename_list_cache[folder_name]
  153. for x in out[1]:
  154. time_modified = out[1][x]
  155. folder = x
  156. if os.path.getmtime(folder) != time_modified:
  157. return None
  158. folders = folder_names_and_paths[folder_name]
  159. for x in folders[0]:
  160. if os.path.isdir(x):
  161. if x not in out[1]:
  162. return None
  163. return out
  164. def get_filename_list(folder_name):
  165. out = cached_filename_list_(folder_name)
  166. if out is None:
  167. out = get_filename_list_(folder_name)
  168. global filename_list_cache
  169. filename_list_cache[folder_name] = out
  170. return list(out[0])
  171. def get_save_image_path(filename_prefix, output_dir, image_width=0, image_height=0):
  172. def map_filename(filename):
  173. prefix_len = len(os.path.basename(filename_prefix))
  174. prefix = filename[:prefix_len + 1]
  175. try:
  176. digits = int(filename[prefix_len + 1:].split('_')[0])
  177. except:
  178. digits = 0
  179. return (digits, prefix)
  180. def compute_vars(input, image_width, image_height):
  181. input = input.replace("%width%", str(image_width))
  182. input = input.replace("%height%", str(image_height))
  183. return input
  184. filename_prefix = compute_vars(filename_prefix, image_width, image_height)
  185. subfolder = os.path.dirname(os.path.normpath(filename_prefix))
  186. filename = os.path.basename(os.path.normpath(filename_prefix))
  187. full_output_folder = os.path.join(output_dir, subfolder)
  188. if os.path.commonpath((output_dir, os.path.abspath(full_output_folder))) != output_dir:
  189. err = "**** ERROR: Saving image outside the output folder is not allowed." + \
  190. "\n full_output_folder: " + os.path.abspath(full_output_folder) + \
  191. "\n output_dir: " + output_dir + \
  192. "\n commonpath: " + os.path.commonpath((output_dir, os.path.abspath(full_output_folder)))
  193. print(err)
  194. raise Exception(err)
  195. try:
  196. counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1
  197. except ValueError:
  198. counter = 1
  199. except FileNotFoundError:
  200. os.makedirs(full_output_folder, exist_ok=True)
  201. counter = 1
  202. return full_output_folder, filename, counter, subfolder, filename_prefix
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...