Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

parse_ca.py 6.5 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
  1. """
  2. Filters down a directory of grb2 files to a specific geometry.
  3. FIXME:100 There is a memory leak somewhere in this file. No clue where, but at month-level granularities, you need at least 2GB ram/year-month.
  4. """
  5. import os
  6. import click
  7. import pandas as pd
  8. import geopandas as gpd
  9. import xarray as xr
  10. import tarfile
  11. import tempfile
  12. from datetime import datetime
  13. from invoke import run
  14. from pathlib import Path
  15. from loguru import logger
  16. from google.cloud import storage
  17. from filelock import Timeout, FileLock
  18. from src.conf import settings
  19. from src.data.gfs.utils import GRB2_CRS, grb2gdf
  20. OUTPUT_DIR = settings.DATA_DIR / "interim/gfs/ca/"
  21. os.environ.setdefault("SHAPE_RESTORE_SHX", "yes")
  22. # Canonical geom_types
  23. POLY_TYPES = ("Polygon", "MultiPolygon")
  24. # GFS Grib2 Options
  25. GRB2_SURFACE_FILTER = {"filter_by_keys": {"typeOfLevel": "surface"}}
  26. SURFACE_VARS = (
  27. "t", # Air Temperature
  28. "gust", # Wind speed
  29. "tp", # Total Precipitation
  30. "dswrf", # Downward Short-Wave Radiation
  31. "uswrf", # Upward Short-Wave Radiation
  32. "SUNSD", # Sunshine duration
  33. "al", # Albedo
  34. "sp", # Surface pressure
  35. "csnow", # Categorical: Snow
  36. "cicep", # Categorical: Ice pellets
  37. "cfrzr", # Categorical: Freezing rain
  38. "crain", # Categorical: Rain
  39. "sde", # Snow Depth
  40. )
  41. def parse_gribfile(fp: str):
  42. """
  43. """
  44. # We are mostly interested in surface variables
  45. try:
  46. ds = xr.open_dataset(fp, backend_kwargs=GRB2_SURFACE_FILTER, engine="cfgrib")
  47. ds = ds.drop_vars("step")
  48. except KeyError:
  49. logger.warning("GRB2 file {fp} does not contain 'surface' data.", fp=fp)
  50. data_vars = list(filter(lambda data_var: data_var in ds.data_vars, SURFACE_VARS))
  51. # Only retain variables of interest
  52. ds = ds[data_vars]
  53. return ds.to_dataframe().reset_index()
  54. def parse_data(gfs_grb2_df: pd.DataFrame, envelope_gdf: gpd.GeoDataFrame):
  55. """Read in a GFS GRB2 file and bound it by a geodataframe of envelopes.
  56. Returns:
  57. gpd.GeoDataFrame: GeoDataFrame of related GFS points.
  58. """
  59. df = gfs_grb2_df
  60. # Assert that there are no rows which are not poly types in our envelopes.
  61. assert (~envelope_gdf.geom_type.isin(POLY_TYPES)).sum() == 0
  62. gdf = grb2gdf(df)
  63. gdf = gpd.tools.sjoin(gdf, envelope_gdf.to_crs(GRB2_CRS), op="within", how="inner")
  64. return gdf
  65. def parse_gfs_archive(gfs_archive, forecasts=[]):
  66. """Parse a GFS Archive file
  67. """
  68. logger.info("Parsing and unpacking {fp}", fp=gfs_archive)
  69. to_parse = []
  70. with tarfile.open(gfs_archive) as tf:
  71. tmpdir = Path(tempfile.mkdtemp())
  72. for tarinfo in tf:
  73. logger.debug("Parsing {fp}/{fn}", fp=gfs_archive, fn=tarinfo.name)
  74. # last 8 characters represent forecast
  75. forecast = tarinfo.name[-8:-5]
  76. if forecast not in forecasts:
  77. continue
  78. fp = (Path(tmpdir) / tarinfo.name).absolute()
  79. # Extract member from tar archive
  80. tf.extract(tarinfo, path=tmpdir)
  81. to_parse.append((fp, tarinfo.name))
  82. for fp, name in to_parse:
  83. # Parse single forecast
  84. gdf = parse_gribfile(fp)
  85. # Yield GeoDataFrame of weather data
  86. yield gdf, name
  87. for fp_ in fp.parent.glob(f"{fp.name}*"):
  88. os.remove(fp_)
  89. tmpdir.rmdir()
  90. def get_gfs_archives(project, prefix="grid3"):
  91. """Download and yield pre-downloaded data from GFS grid 3.
  92. """
  93. client = storage.Client(project)
  94. blobs = client.list_blobs("carecur-gfs-data", prefix=prefix)
  95. for blob in blobs:
  96. yield blob
  97. def parse_archive(gfs_archive, envelope, forecasts=[]):
  98. """Main parsing function for unit of data (single NOAA Archive)
  99. Args:
  100. gfs_archive (google.cloud.storage.blob.Blob): Storage blob object.
  101. """
  102. fd, fp_ = tempfile.mkstemp()
  103. # Download file from storage
  104. gfs_archive.download_to_filename(fp_)
  105. # FIXME: Some 2019 files are throwing a keyerror. We should resolve that error instead of silently failing.
  106. try:
  107. for gdf, fn in parse_gfs_archive(fp_, forecasts=forecasts):
  108. fn = Path(fn).stem
  109. gdf = parse_data(gdf, envelope)
  110. gdf.drop(columns="geometry").to_parquet(OUTPUT_DIR / f"{fn}.parquet")
  111. except Exception as e:
  112. logger.exception(e)
  113. os.close(fd)
  114. os.remove(fp_)
  115. @click.command()
  116. @click.argument(
  117. "project", type=str,
  118. )
  119. @click.option(
  120. "-y", "--year", "year", type=int, required=True, help="Forecast year to parse"
  121. )
  122. @click.option(
  123. "-m", "--month", "month", type=int, required=True, help="Forecast year to parse"
  124. )
  125. @click.option(
  126. "-d", "--day", "day", type=int, required=False, help="Forecast day to parse."
  127. )
  128. def main(project, year, month, day=None):
  129. """Parse through gfs grid-3 archive data for a single year and month.
  130. Arguments:
  131. project: Google cloud project to bill data access to.
  132. """
  133. envelope = gpd.read_file(
  134. settings.DATA_DIR / "processed/geography/CA_Counties/CA_Counties_TIGER2016.shp"
  135. )
  136. # Parse out months as well!
  137. # Archive data only exist in this bucket for 2017 to 2019
  138. assert year in range(2017, 2020)
  139. assert month in range(1, 13)
  140. prefix = f"grid3/gfs_3_{year}{month:02}"
  141. if day:
  142. prefix = f"{prefix}{day:02}"
  143. # Point of forecast, day ahead, and 7-day
  144. # Note that forecasts are expresed as UTC, so we must account for US/Pacific.
  145. forecasts = [
  146. # Day of
  147. *(f"{i*3:03}" for i in range(5, 10)),
  148. # 24-hour
  149. *(f"{i*3:03}" for i in range(13, 18)),
  150. # 7-day
  151. *(f"{i*3:03}" for i in range(61, 66)),
  152. ]
  153. for gfs_archive in get_gfs_archives(project, prefix=prefix):
  154. lockfile = OUTPUT_DIR / "status" / f"{gfs_archive.name}.txt.lock"
  155. if not lockfile.parent.exists():
  156. lockfile.parent.mkdir(exist_ok=True, parents=True)
  157. lock = FileLock(lockfile, timeout=5)
  158. status_file = OUTPUT_DIR / "status" / f"{gfs_archive.name}.txt"
  159. # Check for lock file first
  160. with lock:
  161. if status_file.exists():
  162. logger.debug("Skipping... {} already processed.", gfs_archive.name)
  163. continue
  164. with status_file.open("w") as fh:
  165. fh.write(f"Started: {datetime.now()}")
  166. logger.info("Parsing archive {}", gfs_archive)
  167. parse_archive(gfs_archive, envelope, forecasts=forecasts)
  168. with status_file.open("w") as fh:
  169. fh.write(f"Completed: {datetime.now()}")
  170. lock.release()
  171. if __name__ == "__main__":
  172. OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
  173. main()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...