3
3
import logging
4
4
import os
5
5
import re
6
- import sqlite3
6
+ import psycopg2
7
7
import time
8
8
import redis
9
9
import yaml
@@ -18,7 +18,13 @@ def load_config():
18
18
config .update ({
19
19
'REDIS_HOST' : os .getenv ('REDIS_HOST' , config ['redis' ]['host' ]),
20
20
'REDIS_PORT' : int (os .getenv ('REDIS_PORT' , config ['redis' ]['port' ])),
21
- 'SQLITE_DB_PATH' : os .getenv ('SQLITE_DB_PATH' , config ['sqlite' ]['db_path' ]),
21
+ 'POSTGRESQL' : {
22
+ 'host' : os .getenv ('POSTGRESQL_HOST' , config ['postgresql' ]['host' ]),
23
+ 'port' : int (os .getenv ('POSTGRESQL_PORT' , config ['postgresql' ]['port' ])),
24
+ 'dbname' : os .getenv ('POSTGRESQL_DBNAME' , config ['postgresql' ]['dbname' ]),
25
+ 'user' : os .getenv ('POSTGRESQL_USER' , config ['postgresql' ]['user' ]),
26
+ 'password' : os .getenv ('POSTGRESQL_PASSWORD' , config ['postgresql' ]['password' ])
27
+ },
22
28
'FIELDS' : os .getenv ('FIELDS' , ',' .join (config ['fields' ])).split (',' ),
23
29
'PAUSE' : int (os .getenv ('PAUSE' , config ['pause' ])),
24
30
'LOG_LEVEL' : os .getenv ('LOG_LEVEL' , config ['log_level' ])
@@ -30,89 +36,107 @@ def load_config():
30
36
31
37
def parse_arguments ():
32
38
"""Parses command line arguments."""
33
- parser = argparse .ArgumentParser (description = "Script for collecting logs from Redis to SQLite ." )
39
+ parser = argparse .ArgumentParser (description = "Script for collecting logs from Redis to PostgreSQL ." )
34
40
parser .add_argument ('-r' , '--redis' , type = str , help = "Redis server host." )
35
41
parser .add_argument ('-p' , '--port' , type = int , help = "Redis server port." )
36
- parser .add_argument ('-d' , '--db' , type = str , help = "Path to SQLite database." )
42
+ parser .add_argument ('-d' , '--db' , type = str , help = "PostgreSQL database name." )
43
+ parser .add_argument ('-u' , '--user' , type = str , help = "PostgreSQL user." )
44
+ parser .add_argument ('-w' , '--password' , type = str , help = "PostgreSQL password." )
45
+ parser .add_argument ('-P' , '--postgres_port' , type = int , help = "PostgreSQL port." )
46
+ parser .add_argument ('-H' , '--postgres_host' , type = str , help = "PostgreSQL host." )
37
47
parser .add_argument ('-f' , '--fields' , type = str , help = "Comma-separated list of fields." )
38
48
parser .add_argument ('-t' , '--time' , type = int , help = "Pause between iterations (in seconds)." )
39
49
parser .add_argument ('-l' , '--log_level' , type = str , help = "Logging level." )
40
50
return parser .parse_args ()
41
51
42
52
43
- def initialize_database (db_path ):
44
- """Initializes the SQLite database and creates a table for logs."""
45
- with sqlite3 .connect (db_path ) as conn :
46
- cursor = conn .cursor ()
47
- cursor .execute ("""
48
- CREATE TABLE IF NOT EXISTS logs (
49
- id INTEGER PRIMARY KEY AUTOINCREMENT,
50
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
51
- )
52
- """ )
53
- conn .commit ()
54
- logging .info ("Database initialized and table created if not exists." )
55
- return db_path
56
-
57
-
58
- def create_dynamic_columns (db_path , fields ):
59
- """Creates dynamic columns for the SQLite table based on fields from the configuration."""
60
- with sqlite3 .connect (db_path ) as conn :
61
- cursor = conn .cursor ()
62
- for field in fields :
53
+ def initialize_database (config ):
54
+ """Initializes the PostgreSQL database and creates a table for logs."""
55
+ conn = psycopg2 .connect (
56
+ host = config ['host' ],
57
+ port = config ['port' ],
58
+ dbname = config ['dbname' ],
59
+ user = config ['user' ],
60
+ password = config ['password' ]
61
+ )
62
+ cursor = conn .cursor ()
63
+ cursor .execute ("""
64
+ CREATE TABLE IF NOT EXISTS logs (
65
+ id SERIAL PRIMARY KEY,
66
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
67
+ )
68
+ """ )
69
+ conn .commit ()
70
+ logging .info ("Database initialized and table created if not exists." )
71
+ return conn
72
+
73
+
74
+ def create_dynamic_columns (conn , fields ):
75
+ cursor = conn .cursor ()
76
+ for field in fields :
77
+ cursor .execute (f"""
78
+ SELECT 1 FROM information_schema.columns
79
+ WHERE table_name = 'logs' AND column_name = %s
80
+ """ , (field ,))
81
+ if not cursor .fetchone ():
63
82
try :
64
83
cursor .execute (f"ALTER TABLE logs ADD COLUMN { field } TEXT" )
65
- except sqlite3 .OperationalError as e :
66
- if "duplicate column name" in str (e ):
67
- logging .warning (f"Column { field } already exists." )
68
- else :
69
- logging .error (f"Unexpected error: { e } " )
70
- conn .commit ()
71
- logging .info ("Dynamic columns created/verified." )
84
+ conn .commit ()
85
+ except psycopg2 .Error as e :
86
+ conn .rollback ()
87
+ logging .error (f"Error adding column { field } : { e } " )
88
+ logging .info ("Dynamic columns created/verified." )
72
89
73
90
74
- def clean_old_logs (db_path ):
91
+ def clean_old_logs (conn ):
75
92
"""Removes logs older than 24 hours."""
76
- with sqlite3 .connect (db_path ) as conn :
77
- cursor = conn .cursor ()
78
- cursor .execute ("DELETE FROM logs WHERE created_at < datetime('now', '-1 day')" )
79
- conn .commit ()
80
- logging .info ("Old logs have been deleted." )
93
+ cursor = conn .cursor ()
94
+ cursor .execute ("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '1 day'" )
95
+ conn .commit ()
96
+ logging .info ("Old logs have been deleted." )
81
97
82
98
99
+ def check_redis_connection (redis_client ):
100
+ """Checks if the connection to Redis is working."""
101
+ try :
102
+ redis_client .ping ()
103
+ logging .info ("Connection to Redis established." )
104
+ except redis .exceptions .ConnectionError as e :
105
+ logging .error ("Connection to Redis failed. Error: %s" , e )
106
+ raise
107
+
83
108
def sanitize_json (log ):
84
109
"""Fixes invalid JSON by replacing empty values (e.g., "key":,) with "key": null."""
85
110
invalid_field_pattern = r'"\w+":\s*,'
86
111
sanitized_log = re .sub (invalid_field_pattern , lambda match : match .group (0 ).replace (":" , ": null" ), log )
87
112
return sanitized_log
88
113
89
114
90
- def process_logs (db_path , redis_client , fields , batch_size = 100 ):
91
- """Processes logs from Redis and saves them to SQLite."""
92
- with sqlite3 .connect (db_path ) as conn :
93
- cursor = conn .cursor ()
94
- while True :
95
- logs = [redis_client .lpop ("logs" ) for _ in range (batch_size )]
96
- logs = [log for log in logs if log ] # Remove None values
97
- if not logs :
98
- logging .info ("No logs to process." )
99
- break
100
-
101
- for log in logs :
102
- try :
103
- log = sanitize_json (log )
104
- parsed_log = json .loads (log )
105
- except json .JSONDecodeError :
106
- logging .error (f"Error parsing log: { log } " )
107
- continue
108
-
109
- values = [parsed_log .get (field , None ) for field in fields ]
110
- cursor .execute (f"""
111
- INSERT INTO logs ({ ', ' .join (fields )} )
112
- VALUES ({ ', ' .join (['?' ] * len (fields ))} )
113
- """ , values )
114
- conn .commit ()
115
- logging .info ("Logs saved." )
115
+ def process_logs (conn , redis_client , fields , batch_size = 100 ):
116
+ """Processes logs from Redis and saves them to PostgreSQL."""
117
+ cursor = conn .cursor ()
118
+ while True :
119
+ logs = [redis_client .lpop ("logs" ) for _ in range (batch_size )]
120
+ logs = [log for log in logs if log ] # Remove None values
121
+ if not logs :
122
+ logging .info ("No logs to process." )
123
+ break
124
+
125
+ for log in logs :
126
+ try :
127
+ log = sanitize_json (log )
128
+ parsed_log = json .loads (log )
129
+ except json .JSONDecodeError as e :
130
+ logging .error (f"Error parsing log: { log } ; { e } " )
131
+ continue
132
+
133
+ values = [parsed_log .get (field , None ) for field in fields ]
134
+ cursor .execute (f"""
135
+ INSERT INTO logs ({ ', ' .join (fields )} )
136
+ VALUES ({ ', ' .join (['%s' ] * len (fields ))} )
137
+ """ , values )
138
+ conn .commit ()
139
+ logging .info ("Logs saved." )
116
140
117
141
118
142
def main ():
@@ -123,7 +147,13 @@ def main():
123
147
config .update ({
124
148
'REDIS_HOST' : args .redis or config ['REDIS_HOST' ],
125
149
'REDIS_PORT' : args .port or config ['REDIS_PORT' ],
126
- 'SQLITE_DB_PATH' : args .db or config ['SQLITE_DB_PATH' ],
150
+ 'POSTGRESQL' : {
151
+ 'host' : config ['POSTGRESQL' ]['host' ],
152
+ 'port' : config ['POSTGRESQL' ]['port' ],
153
+ 'dbname' : args .db or config ['POSTGRESQL' ]['dbname' ],
154
+ 'user' : config ['POSTGRESQL' ]['user' ],
155
+ 'password' : config ['POSTGRESQL' ]['password' ]
156
+ },
127
157
'FIELDS' : args .fields .split (',' ) if args .fields else config ['FIELDS' ],
128
158
'PAUSE' : args .time or config ['PAUSE' ],
129
159
'LOG_LEVEL' : args .log_level or config ['LOG_LEVEL' ]
@@ -140,22 +170,27 @@ def main():
140
170
logging .info ("Connected to Redis." )
141
171
142
172
try :
143
- db_path = initialize_database (config ['SQLITE_DB_PATH' ])
144
- except sqlite3 .Error as e :
145
- logging .error (f"Error initializing database: { e } ; PATH: { config ['SQLITE_DB_PATH' ]} " )
146
- logging .error (f"Owner and permissions of the file: { os .stat (config ['SQLITE_DB_PATH' ])} " )
173
+ conn = initialize_database (config ['POSTGRESQL' ])
174
+ except psycopg2 .Error as e :
175
+ logging .error (f"Error initializing database: { e } " )
176
+ return
177
+
178
+ try :
179
+ check_redis_connection (redis_client )
180
+ except redis .exceptions .ConnectionError :
147
181
return
148
- create_dynamic_columns (db_path , config ['FIELDS' ])
182
+ create_dynamic_columns (conn , config ['FIELDS' ])
149
183
150
184
try :
151
185
while True :
152
- process_logs (db_path , redis_client , config ['FIELDS' ])
153
- clean_old_logs (db_path )
186
+ process_logs (conn , redis_client , config ['FIELDS' ])
187
+ clean_old_logs (conn )
154
188
time .sleep (config ['PAUSE' ])
155
189
except KeyboardInterrupt :
156
190
logging .info ("Script terminated by user." )
157
191
finally :
158
192
logging .info ("Script finished." )
193
+ conn .close ()
159
194
160
195
161
196
if __name__ == "__main__" :
0 commit comments