Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

process_data.py 2.7 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
  1. import warnings
  2. from datetime import date
  3. import pandas as pd
  4. from omegaconf import DictConfig
  5. from prefect import flow, task
  6. from sklearn.preprocessing import StandardScaler
  7. from helper import create_parent_directory, load_config
  8. warnings.simplefilter(action="ignore", category=UserWarning)
  9. @task
  10. def read_data(config: DictConfig) -> pd.DataFrame:
  11. return pd.read_csv(config.raw_data.path)
  12. @task
  13. def drop_na(df: pd.DataFrame) -> pd.DataFrame:
  14. return df.dropna()
  15. @task
  16. def get_age(df: pd.DataFrame) -> pd.DataFrame:
  17. return df.assign(
  18. age=df["Year_Birth"].apply(lambda row: date.today().year - row)
  19. )
  20. @task
  21. def get_total_children(df: pd.DataFrame) -> pd.DataFrame:
  22. return df.assign(total_children=df["Kidhome"] + df["Teenhome"])
  23. @task
  24. def get_total_purchases(df: pd.DataFrame) -> pd.DataFrame:
  25. purchases_columns = df.filter(like="Purchases", axis=1).columns
  26. return df.assign(total_purchases=df[purchases_columns].sum(axis=1))
  27. @task
  28. def get_enrollment_years(df: pd.DataFrame) -> pd.DataFrame:
  29. df["Dt_Customer"] = pd.to_datetime(
  30. df["Dt_Customer"], infer_datetime_format=True
  31. )
  32. return df.assign(enrollment_years=2022 - df["Dt_Customer"].dt.year)
  33. @task
  34. def get_family_size(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame:
  35. return df.assign(
  36. family_size=df["Marital_Status"].map(config.process.family_size)
  37. + df["total_children"]
  38. )
  39. @task
  40. def drop_features(df: pd.DataFrame, config: DictConfig):
  41. df = df[config.process.keep_columns]
  42. return df
  43. @task
  44. def drop_outliers(df: pd.DataFrame, config: DictConfig):
  45. column_threshold = dict(config.process.remove_outliers_threshold)
  46. for col, threshold in column_threshold.items():
  47. df = df[df[col] < threshold]
  48. return df.reset_index(drop=True)
  49. @task
  50. def get_scaler(df: pd.DataFrame):
  51. scaler = StandardScaler()
  52. scaler.fit(df)
  53. return scaler
  54. @task
  55. def scale_features(df: pd.DataFrame, scaler: StandardScaler):
  56. return pd.DataFrame(scaler.transform(df), columns=df.columns)
  57. @task
  58. def save_process_data(df: pd.DataFrame, config: DictConfig):
  59. create_parent_directory(config.intermediate.path)
  60. df.to_csv(config.intermediate.path, index=False)
  61. @flow(name="Process data")
  62. def process_data():
  63. config = load_config()
  64. df = read_data(config)
  65. df = (
  66. df.pipe(drop_na)
  67. .pipe(get_age)
  68. .pipe(get_total_children)
  69. .pipe(get_total_purchases)
  70. .pipe(get_enrollment_years)
  71. .pipe(get_family_size, config)
  72. .pipe(drop_features, config)
  73. .pipe(drop_outliers, config)
  74. )
  75. scaler = get_scaler(df)
  76. df = scale_features(df, scaler)
  77. save_process_data(df, config)
  78. if __name__ == "__main__":
  79. process_data()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...