-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest code
87 lines (76 loc) · 2.26 KB
/
test code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col
from pyspark.sql.types import StringType
# Remove empty strings and trim white spaces from string columns
def clean_data(df):
for column_name in df.columns:
if df.schema[column_name].dataType == StringType():
df = df.withColumn(
column_name,
F.when(
col(column_name).isNull() | (F.trim(col(column_name)) == ""),
None,
).otherwise(F.trim(col(column_name))),
)
return df
# Dictionary to rename columns
RENAME_DICT = {
"EstimatedPrimary": "Illnesses",
"DateFirstIll": "FirstIll",
"MultiStateExposure": "Multistate",
"DeathsNum": "Deaths",
"HospitalNum": "Hospitalizations",
"CAFC": "IFSACCategory",
"WaterExposureID": "WaterExposure",
}
@transform_df(
Output("ri.foundry.main.dataset.8828225b-9479-4afa-bcc1-c67acdaaab68"),
main=Input("ri.foundry.main.dataset.1db5224f-4552-4e15-a828-4bde2a27401a"),
)
def compute(main):
df = main.select(
"CDCID",
"StateID",
"RecordStatus",
"PrimaryMode",
"DateFirstIll",
"MultiStateExposure",
"DeathsNum",
"DeathsInfo",
"HospitalNum",
"HospitalInfo",
"EstimatedPrimary",
"CDCStatus",
)
# Apply the cleaning function to raw data
df = clean_data(df)
# Extract year and month from 'FirstIll' column
df = (
df.withColumn(
"DateFirstIll",
F.date_format(F.from_unixtime(col("DateFirstIll") / 1000), "yyyy-MM-dd"),
)
.withColumn("Year", F.year("DateFirstIll"))
.withColumn("Month", F.month("DateFirstIll"))
.withColumn("OutbreakMainID", col("CDCID"))
)
# Rename columns
for old_name, new_name in RENAME_DICT.items():
df = df.withColumnRenamed(old_name, new_name)
df = df.select(
"OutbreakMainID",
"CDCID",
"StateID",
"PrimaryMode",
"Multistate",
"Year",
"Month",
"FirstIll",
"Illnesses",
"Hospitalizations",
"Deaths",
"CDCStatus",
"RecordStatus",
)
return df