-
Notifications
You must be signed in to change notification settings - Fork 0
/
transactions_processing_functions.py
387 lines (293 loc) · 19.5 KB
/
transactions_processing_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def savings_file_cleanup(csv_file):
"""
Cleans and prepares a CSV file from savings account on Chase website for loading into transaction_facts table.
Args:
csv_file (str): The path to the CSV file containing the savings data.
Returns:
tuple: A tuple containing two DataFrames:
- savings_df: The cleaned and transformed savings account transactions Dataframe.
- review_df: The DataFrame containing transactions that need manual review for assignment of
transaction_type_id and category_id.
"""
import numpy as np
import pandas as pd
# Read the CSV file into a DataFrame
savings_df = pd.read_csv(csv_file, encoding = 'unicode_escape', index_col=False)
# Drop unecessary columns
savings_df = savings_df.drop(['Details', 'Type', 'Balance', 'Check or Slip #'], axis=1)
# Convert all string values to lowercase in the DataFrame
savings_df = savings_df.applymap(lambda x: x.lower() if isinstance(x, str) else x)
savings_df.columns = [column.lower() for column in savings_df.columns]
# Insert needed columns.
# Add account_id # "3" to reference "savings account" as the account and initialize other columns with "0" value to be updated later
savings_df.insert(0, 'account_id', 3)
savings_df.insert(1, 'transaction_type_id', 0)
savings_df.insert(2, 'category_id', 0)
# Rename columns in the DataFrame to match database
savings_df = savings_df.rename(columns={'posting date': 'short_date', 'description': 'transaction_description',
'amount': 'transaction_amount'})
# Convert short_date to datetime format
savings_df['short_date'] = pd.to_datetime(savings_df['short_date'])
# Next step is to set transaction_type_id and category_id (or null category) based on transaction description and amount (positive/negative)
# Only PURCHASES are to be given a category, the rest are set to NaN
# Identify interest revenue and set transaction_type_id (no category assigned since this is not a purchase)
savings_df.loc[savings_df['transaction_description'].str.contains('interest'),
['transaction_type_id', 'category_id']] = [9, np.nan]
# Identify transfers FROM savings and set transaction_type_id (no category)
savings_df.loc[(savings_df['transaction_description'].str.contains('transfer')) & (savings_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [5, np.nan]
# Identify transfers TO savings and set transaction_type_id (no category)
savings_df.loc[(savings_df['transaction_description'].str.contains('transfer')) & (savings_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [6, np.nan]
# Identify deposits and set transaction_type_id (no category)
savings_df.loc[(savings_df['transaction_description'].str.contains('deposit')) & (savings_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify withdraws and set transaction_type_id (no category)
savings_df.loc[(savings_df['transaction_description'].str.contains('withdraw')) & (savings_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [4, np.nan]
# Set any remaining non-assigned POSITIVE value transaction_type_ids as deposit (no category)
savings_df.loc[(savings_df['transaction_type_id'] == 0) & (savings_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Set any non-assigned NEGATIVE value transaction_type_ids as withdraw (no category)
savings_df.loc[(savings_df['transaction_type_id'] == 0) & (savings_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [4, np.nan]
# Truncate transaction_description to match database constraints
savings_df['transaction_description'] = savings_df['transaction_description'].str.slice(0,100)
# Select transactions that have not been assigned a transaction_type_id and category_id for manual review
# Add these transactions to review_df and drop from savings_df
mask = ((savings_df['transaction_type_id'] == 0) & (savings_df['category_id'] == 0))
review_df = savings_df[mask].reset_index(drop=True)
savings_df = savings_df[~mask].reset_index(drop=True)
# Print transactions that need to be manually processed
print("Transactions successfully transformed."
"The following transactions need to be reviewed."
"Once values have been assigned to transaction_type_id and category_id (or a null category_id),"
"use to_spend_save() to update database.")
print(review_df)
# Return cleaned data and data to be manually processed as two DataFrames
return savings_df, review_df
def checking_file_cleanup(csv_file):
"""
Performs data cleaning and transformation on a checking account DataFrame loaded from a CSV file.
Args:
csv_file (str): Path to the CSV file containing the checking account data.
Returns:
tuple: A tuple containing two DataFrames:
- checking_df: The cleaned and transformed checking account transactions Dataframe.
- review_df: The DataFrame containing transactions that need manual review for assignment of
transaction_type_id and category_id.
"""
import numpy as np
import pandas as pd
# Read the csv file into a DataFrame
checking_df = pd.read_csv(csv_file, encoding = 'unicode_escape', index_col=False)
# Convert all string values to lowercase in the DataFrame
checking_df = checking_df.applymap(lambda x: x.lower() if isinstance(x, str) else x)
# Convert all column titles to lowercase and drop unnecessary columns
checking_df.columns = [column.lower() for column in checking_df.columns]
checking_df = checking_df.drop(['details', 'type', 'balance', 'check or slip #'], axis=1)
# Update column titles to match MySQL database.
checking_df = checking_df.rename(columns={'posting date': 'short_date', 'description': 'transaction_description',
'amount': 'transaction_amount'})
# Convert short_date to a datetime format
checking_df['short_date'] = pd.to_datetime(checking_df['short_date'])
# Insert columns needed to be loaded into MySQL database
# Add account_id "2" for "checking account" value and initialize other columns with "0" value to be updated later.
checking_df.insert(1, 'account_id', 2)
checking_df.insert(2, 'transaction_type_id', 0)
checking_df.insert(3, 'category_id', 0)
# Convert category_id to int64, since this column contains null values and cannot be given a standard int data type
checking_df['category_id'] = checking_df['category_id'].fillna(0).astype('Int64')
# Next step is to set transaction_type_id and category_id (or null category) based on
# transaction description and amount (positive/negative)
# Only PURCHASES are to be given a category, the rest are set to NaN
# Identify transfers to savings and set transaction_type_id (no category assigned since this is not a purchase)
checking_df.loc[(checking_df['transaction_description'].str.contains('transfer')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [6, np.nan]
# Identify transfers from savings and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('deposit')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify interest received and set transaction_type_id (no category)
checking_df.loc[checking_df['transaction_description'].str.contains('interest'),
['transaction_type_id', 'category_id']] = [9, np.nan]
# Identify paychecks from employers and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('gusto|exel')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify credit card bill payments and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('pay')) &
(checking_df['transaction_description'].str.contains('chase')) &
(checking_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [8, np.nan]
# Identify transfers from checking to savings and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('transfer')) & (checking_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [5, np.nan]
# Identify transfers from savings to checking and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('transfer')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [6, np.nan]
# Identify miscellaneous withdraws from checking account and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('atm')) &
(checking_df['transaction_description'].str.contains('chase')) &
(checking_df['transaction_amount'] < 0), ['transaction_type_id', 'category_id']] = [4, np.nan]
# Identify miscellaneous deposits to checking account and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('atm')) &
(checking_df['transaction_description'].str.contains('chase')) &
(checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify reimbursements through employer certify reimbursement system and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('certify')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify insurance payments and set transaction_type_id/category_id
checking_df.loc[(checking_df['transaction_description'].str.contains('allstate')) & (checking_df['transaction_amount'] < 0),
['transaction_type_id', 'category_id']] = [2, 1]
#Identifies Venmo "cashout"/deposit and set transaction_type_id (no category)
checking_df.loc[(checking_df['transaction_description'].str.contains('venmo')) & (checking_df['transaction_amount'] > 0),
['transaction_type_id', 'category_id']] = [3, np.nan]
# Identify Venmo payments. These need to be reviewed to determine category of transaction
# Add these transactions to review_df and drop from checking_df
mask = ((checking_df['transaction_description'].str.contains('venmo')) & (checking_df['transaction_amount'] < 0))
venmo_review_df = checking_df[mask].reset_index(drop=True)
checking_df = checking_df[~mask].reset_index(drop=True)
# Select transactions that have not been assigned a transaction_type_id and category_id for manual review
# Add these transactions to review_df and drop from checking_df
mask = ((checking_df['transaction_type_id'] == 0) & (checking_df['category_id'] == 0))
review_df = checking_df[mask].reset_index(drop=True)
checking_df = checking_df[~mask].reset_index(drop=True)
# Append venmo_review_df to review_df
review_df = pd.concat([review_df, venmo_review_df])
review_df = review_df.reset_index(drop=True)
# Truncate transaction_description at 100 characters to meet requirements of spend_save database
checking_df["transaction_description"] = checking_df["transaction_description"].str.slice(0, 100)
review_df["transaction_description"] = review_df["transaction_description"].str.slice(0, 100)
# Print transactions that need to be manually processed
print("Transactions successfully transformed."
"The following transactions need to be reviewed."
"Once values have been assigned to transaction_type_id and category_id (or a null category_id),"
"use to_spend_save() to update database.")
print(review_df)
# Return cleaned data and data to be manually processed as two DataFrames
return checking_df, review_df
def cc_file_cleanup(spend_save_password, csv_file):
"""
Cleans up a credit card transactions, performs data transformations, and returns the cleaned dataframes.
Args:
spend_save_password (str): Password to spend_save server. Needed to retrieve categories and category_ids
from database.
csv_file (str): input credit card transactions as CSV.
Returns:
tuple: A tuple containing two pandas DataFrames:
- cc_df: The cleaned and transformed credit card transactions DataFrame.
- cc_review_df: The DataFrame containing transactions that need manual review for assignment of
transaction_type_id and category_id.
"""
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
# Bank automatically marks credit card purchases with a category. These categories need to be assigned
# to the corresponding category_id to match database schema.
# The spend_save database will be queried to extract all category_ids and category_descriptions from the category table.
# This will be saved as a DataFrame and joined to the the DataFrame generated from the checking transactions csv.
# This will allow us to get all the category_id's for each transaction.
# Connect to database engine
engine = create_engine('mysql+pymysql://root:' + spend_save_password + '@localhost/spend_save')
# SQL query to extract category_id and category_description from spend_save database
query = ("""
SELECT *
FROM category
ORDER BY category_id
""")
# Run query passed as an argument
result = engine.execute(query)
# Fetch all rows from the result of query
rows = result.fetchall()
# Get the column names from the result of query
column_names = result.keys()
# Create a DataFrame from the result of the query to store category_id and category_description as "category_df"
category_df = pd.DataFrame(rows, columns=column_names)
# Close the result and the connection
result.close()
engine.dispose()
# Load credit card transactions from csv AS "cc_df" and drop empty rows that may be present at bottom of csv
cc_df = pd.read_csv(csv_file, encoding = 'unicode_escape', index_col=False)
cc_df.dropna(how='all', inplace=True)
# Casefold all letters in dataset/column titles drop irrelevant columns, and convert date column to datetime
cc_df = cc_df.applymap(lambda x: x.lower() if isinstance(x, str) else x)
cc_df.columns = [column.lower() for column in cc_df.columns]
# Drop columns that are not needed
cc_df = cc_df.drop(['post date', 'memo'], axis=1)
# merge category_df and cc_df on category/category_description to get category_id for each transaction in cc_df
cc_df = pd.merge(cc_df, category_df[['category_id', 'category_description']],
left_on='category', right_on='category_description',
how='left').drop('category_description', axis=1)
# Fill null values with 0 to be processed again below and convert category_id to integer data type
cc_df['category_id'] = cc_df['category_id'].fillna(0).astype('Int64')
# Convert "transaction date" to datetime data type
cc_df['transaction date'] = pd.to_datetime(cc_df['transaction date'])
# Reorder columns and drop category column since we now have category_id
column_names = cc_df.columns.tolist()
column_names = [column_names[-1]] + column_names[:-1]
cc_df = cc_df[column_names]
cc_df = cc_df.drop('category', axis=1)
# Insert account_id and transaction_type_id columns. Assign "1" to all account_id's since they are all credit card.
# Assign "0" to all transaction_type_id's to be updated below.
cc_df.insert(0, 'account_id', 1)
cc_df.insert(1, 'transaction_type_id', 0)
# Identify credit card purchases and set transaction_type_id (category_id was assigned when merging DataFrames avove)
cc_df.loc[cc_df['type'] == 'sale', 'transaction_type_id'] = 1
# Identify credit card bill payments (no category)
cc_df.loc[cc_df['type'] == 'payment', ['transaction_type_id', 'category_id']] = [7, np.nan]
# Identify fees and adjustments set transaction_type_id (no category)
cc_df.loc[cc_df['type'].str.contains('fee|adjustment'),
['transaction_type_id', 'category_id']] = [11, np.nan]
# Drop "type" column since it is no longer needed
cc_df = cc_df.drop('type', axis=1)
# Rename columns to match spend_save database
column_titles = ['account_id', 'transaction_type_id', 'category_id',
'short_date', 'transaction_description', 'transaction_amount']
cc_df.columns = column_titles
# Truncate transaction_description at 100 characters
cc_df["transaction_description"] = cc_df["transaction_description"].str.slice(0, 100)
# Create cc_review_df to manually review any transactions that were not assigned a category_id (or is null)
# or transaction_type_id
mask = (cc_df['category_id'].isin([0])) | (cc_df['transaction_type_id'].isin([0]))
cc_review_df = cc_df[mask].reset_index(drop=True)
cc_df = cc_df[~mask].reset_index(drop=True)
# Print transactions that need to be manually processed
print("Transactions successfully transformed."
"The following transactions need to be reviewed."
"Once values have been assigned to transaction_type_id and category_id, use to_spend_save() to update database.")
print(cc_review_df)
# Return cleaned data and data to be manually processed as two DataFrames
return cc_df, cc_review_df
def to_spend_save(df, data_source, password):
"""
Loads a DataFrame into a MySQL database and saves it as a CSV file.
Args:
df (pandas.DataFrame): The DataFrame containing the data to be loaded.
data_source (str): The source of the data.
password (str): The password to the MySQL database server.
Returns:
None
"""
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from datetime import date
# Create the connection string for the MySQL database
database = 'mysql+pymysql://root:' + password + '@localhost/spend_save'
# Create engine
engine = create_engine(database)
# Connect to MySQL database using engine
conn = engine.connect()
# Import data from Pandas DataFrame to MySQL database
df.to_sql('transaction_facts', con=conn, if_exists='append', index=False)
# Commit the transaction
conn.commit()
# Close connection
conn.close()
# Generate the file name to save as CSV using data_source and current_date()
current_date = date.today()
file_name = f"{data_source}_{current_date}.csv"
# Save the DataFrame as a CSV file
df.to_csv(file_name, index=False)
print(f"{data_source} data successfully loaded into spend_save MySQL database")
print(f"CSV file saved as {data_source}_{current_date}.csv")