Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

df_api.py 16 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
  1. import os
  2. import time
  3. import copy
  4. import json
  5. import uuid
  6. import boto3
  7. import requests
  8. from tqdm import tqdm
  9. from typing import List, Tuple, Optional
  10. #AWS
  11. AWS_ACCESS_KEY_ID = 'AKIAI6PFXVX6CBIHIJIA'
  12. AWS_SECRET_ACCESS_KEY = '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'
  13. #Anastasia
  14. CLIENT_UUID = 'fed091cb-754c-42fc-8d35-cd28601dfc75'
  15. PROJECT_UUID = '79d14f32-02b9-45fb-9392-284fb7c6a251'
  16. APPLICATION_UUID = 'f28bcce0-c521-41ae-9db7-f985de54272d'
  17. DATASOURCE_TYPE = 'T001' #tipo transaccional
  18. #APIs
  19. INTERNAL_DEV_API = 'http://int-srv-dev.anastasia.ai/'
  20. X_API_KEY = 'J48NWuVnLcFmavDH37wUvMUVKGd8Edda6VhVmDhMJj7xo'
  21. DATASOURCES_API_KEY = 'MM9gvWjG3Z522UfNkEu9ZbLF3r6e9jqBF33PY4z3usHV7mtE7w'
  22. #Datasets
  23. OPENDATA_COLUMNS = [
  24. "651c8079-a9d7-4ec9-8a5a-8ecb2375fdc6", "4492e66b-b569-4bd9-885c-ff504a64021e",
  25. "cd1bf19b-9f5b-4244-bdb1-f4c892fa02e1", "c546adf2-ff80-4965-9ec1-3982c621763e",
  26. "b0ae454e-bf5b-4e18-b35f-b416d56ec6da", "5d63e743-e4f9-432b-8fed-dfe2d0e9811d",
  27. "238c38eb-dc6c-4690-b89a-2693e05cfa1d", "ab3c92f1-32fe-463d-bb3b-15f79f8ec466",
  28. "3152982c-7809-4801-b63d-aba822f49798", "41a7b328-b955-4c2a-81ad-78100074942e",
  29. "a4c5077e-2750-4f32-8e27-0f4be9d61470", "f62385f1-2604-4e5c-adcf-af374e0a727f",
  30. "3eb5f163-ff9a-4e86-876b-3c41d4f9298a", "8874a1eb-89cb-4489-b552-219ffef0d5c4",
  31. "a8d989c5-2607-4af3-a3bb-acd590c30f74", "328bc204-99ad-4600-9aa1-588326cc7778",
  32. "4165f7cd-0760-4f2e-a262-fa74aa4a4829", "24aa83be-b4d6-40d1-b429-69977c4e3282",
  33. "98a06172-0637-412e-beab-4ae1e7c1b05e", "2a85dbcd-b13f-4a58-b584-050248b16635",
  34. "5ff95e31-40c1-4579-8a7e-2b893c8eba89", "7aeba544-7b1c-4baa-a0dc-beaf1145b09c",
  35. "433067ce-8be6-43cb-a68c-925690cb00be"
  36. ]
  37. class DemandForecasting:
  38. def __init__(self, verbose: bool = True) -> None:
  39. self.verbose = verbose
  40. def upload_datasource(self, name: str, columns: List[dict], files_path: str) -> str:
  41. """Crea un datasource y carga archivos hacia el mismo
  42. Parameters
  43. ----------
  44. name : str
  45. Nombre que tendra el datasource
  46. columns : List[dict]
  47. Descripción de estructura de columnas
  48. files_path : str
  49. Ruta en la cual se ubican los archivos a subir
  50. Returns
  51. -------
  52. datasource_uuid : str
  53. Uuid de identificación del datasource creado con los archivos cargados
  54. """
  55. datasource_columns = self._translate_columns_types(columns)
  56. datasource_uuid = self._create_datasource(name, datasource_columns)['code']
  57. datasource_info = self._get_datasource_info(datasource_uuid)
  58. s3_path = datasource_info['storageURL']
  59. datasets_uuids = self._upload_multi_files(files_path, datasource_uuid,s3_path)
  60. return datasource_uuid
  61. def run_job(self, prediction_params: dict, datasource_params: dict, extras: Optional[dict] = None, wait_until_finished: bool = True, timeout: int = 2*60*60) -> str:
  62. """Invocar a la API de jobs ejecutando la solucion Demand Forecasting
  63. Parameters
  64. ----------
  65. prediction_params : dict
  66. Descripcion de parametros relacionados a la prediccion
  67. datasource_params : dict
  68. Descripcion de estructura de columnas para asociar hacia el predictor
  69. extras : dict
  70. Parametros extras para modificar comportamiento de Predictor
  71. wait_until_finished : bool, default=True
  72. Bloquea la ejecucion de codigo posterior a la espera de que el job en la nube termine con un status de parada
  73. timeout : int, default=7200 (unidades en segundos)
  74. Tolerancia maxima de espera para esperar un status de finalizacion (siempre que la variable wait_until_finished sea gatillada)
  75. Returns
  76. -------
  77. job_uuid : str
  78. Uuid de identificación del job ejecutado
  79. """
  80. datasource_uuid = datasource_params["datasource_uuid"]
  81. use_opendata = datasource_params.get("use_opendata", None)
  82. datasource_info = self._get_datasource_info(datasource_uuid)
  83. columns_description = datasource_info["columns"]
  84. for column in columns_description:
  85. if column["originalName"] == datasource_params["date_column"]:
  86. date_column = column["mapName"]
  87. break
  88. date_column = [{"column":column["mapName"], "dataSourceCode": datasource_uuid} for column in columns_description if column["originalName"] == datasource_params["date_column"]][0]
  89. id_columns = [{"column":column["mapName"], "dataSourceCode": datasource_uuid} for column in columns_description if column["originalName"] in datasource_params["id_columns"]]
  90. target_column = [{"column":column["mapName"], "dataSourceCode": datasource_uuid} for column in columns_description if column["originalName"] == datasource_params["target_column"]][0]
  91. timeseries_params = {
  92. "aggregator": "SUM",
  93. "clientUUID": CLIENT_UUID,
  94. "projectUUID": PROJECT_UUID,
  95. "applicationUUID": APPLICATION_UUID,
  96. "granularity": "MONTH",
  97. "negativeValues": False,
  98. "accumulatedValues": False,
  99. "relationships": [],
  100. "extraValues": [],
  101. "eventsDataSourceCode": datasource_uuid,
  102. "valueColumn": target_column,
  103. "groupableColumns": id_columns,
  104. "dateColumn": date_column,
  105. }
  106. if use_opendata: timeseries_params["openData"] = OPENDATA_COLUMNS
  107. job_config = {
  108. "project_uuid": PROJECT_UUID,
  109. "client_uuid": CLIENT_UUID,
  110. "application_uuid": APPLICATION_UUID,
  111. "params": {
  112. "core": {},
  113. "platform": {
  114. "ray_params": {
  115. "ray_image": "510629978475.dkr.ecr.us-east-1.amazonaws.com/core-ai-library-dev",
  116. "head_instance_type": "m5.2xlarge",
  117. "worker_instance_type": "m5.2xlarge",
  118. "max_workers": 10,
  119. "job_submission": {
  120. "entrypoint": "python entrypoint/predictor.py"
  121. }
  122. },
  123. "time_series_params": timeseries_params,
  124. "prediction_params": prediction_params,
  125. "extras": extras
  126. }
  127. }
  128. }
  129. job_type = 'DEMAND_FORECASTING'
  130. url = self._merge_paths(INTERNAL_DEV_API, f'jobs/api/v1/{job_type}')
  131. payload = json.dumps(job_config)
  132. headers = {
  133. 'x-anastasia-api-key': X_API_KEY
  134. }
  135. try:
  136. response = requests.request("POST", url, headers=headers, data=payload)
  137. if response.status_code != 200:
  138. print(response.json())
  139. raise
  140. except Exception as e:
  141. print("Error while configuring job")
  142. print(e.text)
  143. raise
  144. job_uuid = response.json()["uuid"]
  145. if wait_until_finished:
  146. url = self._merge_paths(INTERNAL_DEV_API, f'/jobs/api/v1/{job_uuid}')
  147. pbar = tqdm(total=1, desc="Running Demand Forecasting via API in cloud cluster, waiting until job is completed")
  148. mustend = time.time() + timeout
  149. while time.time() < mustend:
  150. try:
  151. response = requests.request("GET", url, headers=headers).json()
  152. except Exception as e:
  153. print("Error reading job info")
  154. print(e)
  155. raise
  156. if response["status"] == "FINISHED":
  157. pbar.update(1)
  158. pbar.close()
  159. return job_uuid
  160. elif response["status"] == "FAILED":
  161. print(f"Error:\njob_uuid = {job_uuid}\nstatus = {response['status']}")
  162. pbar.close()
  163. return job_uuid
  164. print("error: timeout waiting job!")
  165. pbar.close()
  166. raise
  167. else:
  168. return job_uuid
  169. def get_job_status(self, job_uuid: str) -> str:
  170. """Consulta acerca del status de un job
  171. Parameters
  172. ----------
  173. job_uuid : str
  174. Uuid de identificación del job ejecutado
  175. Returns
  176. -------
  177. status : str
  178. Estado del job
  179. """
  180. headers = {
  181. 'x-anastasia-api-key': X_API_KEY
  182. }
  183. url = self._merge_paths(INTERNAL_DEV_API, f'/jobs/api/v1/{job_uuid}')
  184. try:
  185. response = requests.request("GET", url, headers=headers).json()
  186. except Exception as e:
  187. print("Error reading job info")
  188. print(e)
  189. raise
  190. return response["status"]
  191. def download_results(self, job_uuid: str, save_path: str) -> None:
  192. """Descarga los archivos resultantes de un job de Demand Forecasting
  193. Parameters
  194. ----------
  195. job_uuid : str
  196. Uuid de identificación del job ejecutado
  197. save_path : str
  198. Directorio en el cual se descargaran los archivos ["results.csv", "metrics.csv", "item_classes.csv", "predictions_confidence.csv"]
  199. """
  200. headers = {
  201. 'x-anastasia-api-key': X_API_KEY
  202. }
  203. url = self._merge_paths(INTERNAL_DEV_API, f'/jobs/api/v1/{job_uuid}')
  204. try:
  205. response = requests.request("GET", url, headers=headers).json()
  206. except Exception as e:
  207. print("Error reading job info")
  208. print(e)
  209. raise
  210. cloud_path = response["state"]["core"]["dataset"]["output"]["path"]
  211. file_names = ["results.csv", "metrics.csv", "item_classes.csv", "predictions_confidence.csv"]
  212. s3_client = boto3.client(
  213. "s3",
  214. aws_access_key_id=AWS_ACCESS_KEY_ID,
  215. aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
  216. bucket, filekey = self._decompose_s3_route(cloud_path)
  217. try:
  218. for file in file_names:
  219. response = s3_client.download_file(
  220. bucket,
  221. self._merge_paths(filekey,file),
  222. self._merge_paths(save_path,file),
  223. )
  224. if self.verbose:
  225. print(f"Download successful to {save_path}")
  226. except Exception as e:
  227. print("Error reading save_path")
  228. print(e)
  229. raise
  230. def _merge_paths(self, path_1: str, path_2: str):
  231. if path_1[-1] == '/' and path_2[0] == '/':
  232. return path_1 + path_2[1:]
  233. if path_1[-1] != '/' and path_2[0] != '/^':
  234. return path_1 + '/' + path_2
  235. return path_1 + path_2
  236. def _translate_columns_types(self, columns: List[dict]) -> List[dict]:
  237. code_map = {
  238. 'date':'C001',
  239. 'groupable': 'C002',
  240. 'item': 'C003',
  241. 'value': 'C004',
  242. 'description': 'C005',
  243. 'complementary': 'C006'
  244. }
  245. new_columns = []
  246. for column in columns:
  247. new_column = copy.deepcopy(column)
  248. new_column['columnType'] = code_map[column['columnType']]
  249. new_columns.append(new_column)
  250. return new_columns
  251. def _create_datasource(self, name, columns) -> dict:
  252. if self.verbose: print("Creating datasource...")
  253. url = self._merge_paths(INTERNAL_DEV_API, 'datasource/api/v2/datasources/')
  254. payload= json.dumps({
  255. "clientUUID": CLIENT_UUID,
  256. "projectUUID": PROJECT_UUID,
  257. "dataSourceTypeCode": DATASOURCE_TYPE,
  258. "name": name,
  259. "columns": columns
  260. })
  261. headers = {
  262. 'data_source-anastasia-api-key': DATASOURCES_API_KEY,
  263. 'Content-Type': 'application/json'
  264. }
  265. try:
  266. response = requests.request("POST", url, headers=headers, data=payload)
  267. except Exception as e:
  268. print("Error creating datasource")
  269. print(e.text)
  270. raise
  271. if self.verbose:
  272. print("Datasource created!")
  273. return response.json()
  274. def _get_datasource_info(self, datasource_uuid: str) -> dict:
  275. if self.verbose: print("Getting datasource info...")
  276. params = 'columns=true&storageLocation=internal&athenaTable=true'
  277. url = self._merge_paths(INTERNAL_DEV_API,f'datasource/api/v2/datasources/{datasource_uuid}?{params}')
  278. payload={}
  279. headers = {
  280. 'data_source-anastasia-api-key': DATASOURCES_API_KEY
  281. }
  282. try:
  283. response = requests.request("GET", url, headers=headers, data=payload)
  284. except Exception as e:
  285. print("Error reading datasource")
  286. print(e.text)
  287. raise
  288. return response.json()
  289. def _upload_multi_files(self, local_datasets_path, datasource_uuid, storageURL) -> dict:
  290. dataset_uuids = {}
  291. dataset_names = self._get_datasets_names(local_datasets_path)
  292. for name in dataset_names:
  293. dataset_uuid = str(uuid.uuid4())
  294. self._upload_file(
  295. output_location=storageURL,
  296. local_folder=local_datasets_path,
  297. input_filename = name,
  298. output_filename=dataset_uuid,
  299. )
  300. dataset_uuids[name] = f"{dataset_uuid}.csv"
  301. file_size = self._get_dataset_size(local_datasets_path,name)
  302. self._confirm_datasource_loaded(datasource_uuid, name, dataset_uuid, file_size)
  303. return dataset_uuids
  304. def _get_datasets_names(self, local_datasets_path: str) -> List[str]:
  305. datasets = []
  306. files = os.listdir(local_datasets_path)
  307. for file in files:
  308. if file[-4:] == '.csv':
  309. datasets.append(file)
  310. assert datasets, 'No data founded!'
  311. if self.verbose:
  312. print(f'Total files: {len(datasets)}')
  313. return datasets
  314. def _get_dataset_size(self, local_datasets_path: str, name: str) -> int:
  315. size = os.path.getsize(self._merge_paths(local_datasets_path,name))
  316. assert size > 0, 'No data founded!'
  317. if self.verbose:
  318. print(f'name: {name}, size: {size} bytes')
  319. return size
  320. def _decompose_s3_route(self, s3_route: str) -> Tuple[str]:
  321. s3_route = s3_route.replace("s3://", "").replace("https://", "")
  322. bucket = s3_route.split("/")[0]
  323. filekey = "/".join(s3_route.split("/")[1:])
  324. return bucket, filekey
  325. def _upload_file(self, output_location, local_folder, input_filename, output_filename) -> bool:
  326. if self.verbose:
  327. print("Uploading file to s3")
  328. s3_client = boto3.client(
  329. "s3",
  330. aws_access_key_id=AWS_ACCESS_KEY_ID,
  331. aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
  332. bucket, filekey = self._decompose_s3_route(output_location)
  333. if output_filename.split(".")[-1] != "csv":
  334. output_filename = output_filename + ".csv"
  335. try:
  336. response = s3_client.upload_file(
  337. self._merge_paths(local_folder,input_filename),
  338. bucket,
  339. filekey + output_filename
  340. )
  341. if self.verbose:
  342. print(f"Upload successful to {output_location}")
  343. print("-"*50)
  344. except Exception as e:
  345. print(e)
  346. return False
  347. return True
  348. def _confirm_datasource_loaded(self, datasource_uuid: str, name: str, dataset_uuid: str, file_size: int):
  349. if self.verbose: print("Confirming datasource...")
  350. url = self._merge_paths(INTERNAL_DEV_API,'datasource/api/v2/files')
  351. payload={
  352. "name": name,
  353. "size": file_size,
  354. "uuid": dataset_uuid,
  355. "dataSourceCode": datasource_uuid
  356. }
  357. headers = {
  358. 'data_source-anastasia-api-key': DATASOURCES_API_KEY
  359. }
  360. try:
  361. response = requests.request("POST", url, headers=headers, data=payload)
  362. except Exception as e:
  363. print("Error confirming datasource")
  364. print(e.text)
  365. raise
  366. if self.verbose:
  367. print("datasource confirmated!")
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...