1
+ import json
2
+ import logging
1
3
import os
2
- import subprocess
4
+ import sys
5
+ import tempfile
3
6
import threading
4
7
import timeit
5
8
from collections import deque
6
-
7
- from logging import critical , info , debug , exception
9
+ from logging .handlers import RotatingFileHandler
8
10
from math import ceil
9
11
from queue import Queue
10
12
from time import time
11
13
12
- from .service import Service , ServiceConfig
14
+ from .command_runner import command_runner
13
15
from .queuemanager import (
14
16
QueueManager ,
15
17
TimedQueueManager ,
20
22
PollerQueueManager ,
21
23
DiscoveryQueueManager ,
22
24
)
25
+ from .service import Service , ServiceConfig
26
+
27
+ # Hard limit script execution time so we don't get to "hang"
28
+ DEFAULT_SCRIPT_TIMEOUT = 3600
29
+ MAX_LOGFILE_SIZE = (1024 ** 2 ) * 10 # 10 Megabytes max log files
30
+
31
+ logger = logging .getLogger (__name__ )
32
+
33
+ # Logging functions ########################################################
34
+ # Original logger functions from ofunctions.logger_utils package
35
+
36
+ FORMATTER = logging .Formatter ("%(asctime)s :: %(levelname)s :: %(message)s" )
37
+
38
+
39
+ def logger_get_console_handler ():
40
+ try :
41
+ console_handler = logging .StreamHandler (sys .stdout )
42
+ except OSError as exc :
43
+ print ("Cannot log to stdout, trying stderr. Message %s" % exc )
44
+ try :
45
+ console_handler = logging .StreamHandler (sys .stderr )
46
+ console_handler .setFormatter (FORMATTER )
47
+ return console_handler
48
+ except OSError as exc :
49
+ print ("Cannot log to stderr neither. Message %s" % exc )
50
+ return False
51
+ else :
52
+ console_handler .setFormatter (FORMATTER )
53
+ return console_handler
54
+
55
+
56
+ def logger_get_file_handler (log_file ):
57
+ err_output = None
58
+ try :
59
+ file_handler = RotatingFileHandler (
60
+ log_file ,
61
+ mode = "a" ,
62
+ encoding = "utf-8" ,
63
+ maxBytes = MAX_LOGFILE_SIZE ,
64
+ backupCount = 3 ,
65
+ )
66
+ except OSError as exc :
67
+ try :
68
+ print (
69
+ "Cannot create logfile. Trying to obtain temporary log file.\n Message: %s"
70
+ % exc
71
+ )
72
+ err_output = str (exc )
73
+ temp_log_file = tempfile .gettempdir () + os .sep + __name__ + ".log"
74
+ print ("Trying temporary log file in " + temp_log_file )
75
+ file_handler = RotatingFileHandler (
76
+ temp_log_file ,
77
+ mode = "a" ,
78
+ encoding = "utf-8" ,
79
+ maxBytes = MAX_LOGFILE_SIZE ,
80
+ backupCount = 1 ,
81
+ )
82
+ file_handler .setFormatter (FORMATTER )
83
+ err_output += "\n Using [%s]" % temp_log_file
84
+ return file_handler , err_output
85
+ except OSError as exc :
86
+ print (
87
+ "Cannot create temporary log file either. Will not log to file. Message: %s"
88
+ % exc
89
+ )
90
+ return False
91
+ else :
92
+ file_handler .setFormatter (FORMATTER )
93
+ return file_handler , err_output
94
+
95
+
96
+ def logger_get_logger (log_file = None , temp_log_file = None , debug = False ):
97
+ # If a name is given to getLogger, than modules can't log to the root logger
98
+ _logger = logging .getLogger ()
99
+ if debug is True :
100
+ _logger .setLevel (logging .DEBUG )
101
+ else :
102
+ _logger .setLevel (logging .INFO )
103
+ console_handler = logger_get_console_handler ()
104
+ if console_handler :
105
+ _logger .addHandler (console_handler )
106
+ if log_file is not None :
107
+ file_handler , err_output = logger_get_file_handler (log_file )
108
+ if file_handler :
109
+ _logger .addHandler (file_handler )
110
+ _logger .propagate = False
111
+ if err_output is not None :
112
+ print (err_output )
113
+ _logger .warning (
114
+ "Failed to use log file [%s], %s." , log_file , err_output
115
+ )
116
+ if temp_log_file is not None :
117
+ if os .path .isfile (temp_log_file ):
118
+ try :
119
+ os .remove (temp_log_file )
120
+ except OSError :
121
+ logger .warning ("Cannot remove temp log file [%s]." % temp_log_file )
122
+ file_handler , err_output = logger_get_file_handler (temp_log_file )
123
+ if file_handler :
124
+ _logger .addHandler (file_handler )
125
+ _logger .propagate = False
126
+ if err_output is not None :
127
+ print (err_output )
128
+ _logger .warning (
129
+ "Failed to use log file [%s], %s." , log_file , err_output
130
+ )
131
+ return _logger
132
+
133
+
134
+ # Generic functions ########################################################
135
+
136
+
137
+ def check_for_file (file ):
138
+ try :
139
+ with open (file ) as file :
140
+ pass
141
+ except IOError as exc :
142
+ logger .error ("File '%s' is not readable" % file )
143
+ logger .debug ("Traceback:" , exc_info = True )
144
+ sys .exit (2 )
145
+
146
+
147
+ # Config functions #########################################################
148
+
149
+
150
+ def get_config_data (base_dir ):
151
+ check_for_file (os .path .join (base_dir , ".env" ))
152
+
153
+ try :
154
+ import dotenv
155
+
156
+ env_path = "{}/.env" .format (base_dir )
157
+ logger .info ("Attempting to load .env from '%s'" , env_path )
158
+ dotenv .load_dotenv (dotenv_path = env_path , verbose = True )
159
+
160
+ if not os .getenv ("NODE_ID" ):
161
+ logger .critical (".env does not contain a valid NODE_ID setting." )
162
+
163
+ except ImportError as exc :
164
+ logger .critical (
165
+ 'Could not import "%s" - Please check that the poller user can read the file, and that composer install has been run recently\n Additional info: %s'
166
+ % (env_path , exc )
167
+ )
168
+ logger .debug ("Traceback:" , exc_info = True )
169
+
170
+ config_cmd = ["/usr/bin/env" , "php" , "%s/config_to_json.php" % base_dir ]
171
+ try :
172
+ exit_code , output = command_runner (config_cmd , timeout = 300 )
173
+ if exit_code == 0 :
174
+ return json .loads (output )
175
+ raise EnvironmentError
176
+ except Exception as exc :
177
+ logger .critical ("ERROR: Could not execute command [%s]: %s" % (config_cmd , exc ))
178
+ logger .debug ("Traceback:" , exc_info = True )
179
+ return None
23
180
24
181
25
182
def normalize_wait (seconds ):
@@ -28,8 +185,9 @@ def normalize_wait(seconds):
28
185
29
186
def call_script (script , args = ()):
30
187
"""
31
- Run a LibreNMS script. Captures all output and throws an exception if a non-zero
32
- status is returned. Blocks parent signals (like SIGINT and SIGTERM).
188
+ Run a LibreNMS script. Captures all output returns exit code.
189
+ Blocks parent signals (like SIGINT and SIGTERM).
190
+ Kills script if it takes too long
33
191
:param script: the name of the executable relative to the base directory
34
192
:param args: a tuple of arguments to send to the command
35
193
:returns the output of the command
@@ -42,14 +200,10 @@ def call_script(script, args=()):
42
200
43
201
base_dir = os .path .realpath (os .path .dirname (__file__ ) + "/.." )
44
202
cmd = base + ("{}/{}" .format (base_dir , script ),) + tuple (map (str , args ))
45
- debug ("Running {}" .format (cmd ))
203
+ logger . debug ("Running {}" .format (cmd ))
46
204
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
47
- return subprocess .check_call (
48
- cmd ,
49
- stdout = subprocess .DEVNULL ,
50
- stderr = subprocess .STDOUT ,
51
- preexec_fn = os .setsid ,
52
- close_fds = True ,
205
+ return command_runner (
206
+ cmd , preexec_fn = os .setsid , close_fds = True , timeout = DEFAULT_SCRIPT_TIMEOUT
53
207
)
54
208
55
209
@@ -70,15 +224,15 @@ def connect(self):
70
224
import pymysql
71
225
72
226
pymysql .install_as_MySQLdb ()
73
- info ("Using pure python SQL client" )
227
+ logger . info ("Using pure python SQL client" )
74
228
except ImportError :
75
- info ("Using other SQL client" )
229
+ logger . info ("Using other SQL client" )
76
230
77
231
try :
78
232
import MySQLdb
79
233
except ImportError :
80
- critical ("ERROR: missing a mysql python module" )
81
- critical (
234
+ logger . critical ("ERROR: missing a mysql python module" )
235
+ logger . critical (
82
236
"Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI"
83
237
)
84
238
raise
@@ -99,7 +253,7 @@ def connect(self):
99
253
conn .ping (True )
100
254
self ._db [threading .get_ident ()] = conn
101
255
except Exception as e :
102
- critical ("ERROR: Could not connect to MySQL database! {}" .format (e ))
256
+ logger . critical ("ERROR: Could not connect to MySQL database! {}" .format (e ))
103
257
raise
104
258
105
259
def db_conn (self ):
@@ -128,7 +282,7 @@ def query(self, query, args=None):
128
282
cursor .close ()
129
283
return cursor
130
284
except Exception as e :
131
- critical ("DB Connection exception {}" .format (e ))
285
+ logger . critical ("DB Connection exception {}" .format (e ))
132
286
self .close ()
133
287
raise
134
288
@@ -167,7 +321,7 @@ def stop(self):
167
321
168
322
169
323
class Lock :
170
- """ Base lock class this is not thread safe"""
324
+ """Base lock class this is not thread safe"""
171
325
172
326
def __init__ (self ):
173
327
self ._locks = {} # store a tuple (owner, expiration)
@@ -210,7 +364,7 @@ def check_lock(self, name):
210
364
return False
211
365
212
366
def print_locks (self ):
213
- debug (self ._locks )
367
+ logger . debug (self ._locks )
214
368
215
369
216
370
class ThreadingLock (Lock ):
@@ -269,7 +423,7 @@ def __init__(self, namespace="lock", **redis_kwargs):
269
423
self ._redis = redis .Redis (** kwargs )
270
424
self ._redis .ping ()
271
425
self ._namespace = namespace
272
- info (
426
+ logger . info (
273
427
"Created redis lock manager with socket_timeout of {}s" .format (
274
428
redis_kwargs ["socket_timeout" ]
275
429
)
@@ -296,7 +450,7 @@ def lock(self, name, owner, expiration=1, allow_owner_relock=False):
296
450
non_existing = not (allow_owner_relock and self ._redis .get (key ) == owner )
297
451
return self ._redis .set (key , owner , ex = int (expiration ), nx = non_existing )
298
452
except redis .exceptions .ResponseError as e :
299
- exception (
453
+ logger . critical (
300
454
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s" ,
301
455
name ,
302
456
owner ,
@@ -351,7 +505,7 @@ def __init__(self, name, namespace="queue", **redis_kwargs):
351
505
self ._redis = redis .Redis (** kwargs )
352
506
self ._redis .ping ()
353
507
self .key = "{}:{}" .format (namespace , name )
354
- info (
508
+ logger . info (
355
509
"Created redis queue with socket_timeout of {}s" .format (
356
510
redis_kwargs ["socket_timeout" ]
357
511
)
@@ -371,10 +525,20 @@ def put(self, item):
371
525
self ._redis .zadd (self .key , {item : time ()}, nx = True )
372
526
373
527
def get (self , block = True , timeout = None ):
374
- if block :
375
- item = self ._redis .bzpopmin (self .key , timeout = timeout )
376
- else :
377
- item = self ._redis .zpopmin (self .key )
528
+ try :
529
+ if block :
530
+ item = self ._redis .bzpopmin (self .key , timeout = timeout )
531
+ else :
532
+ item = self ._redis .zpopmin (self .key )
533
+ # Unfortunately we cannot use _redis.exceptions.ResponseError Exception here
534
+ # Since it would trigger another exception in queuemanager
535
+ except Exception as e :
536
+ logger .critical (
537
+ "BZPOPMIN/ZPOPMIN command failed: {}\n Note that redis >= 5.0 is required." .format (
538
+ e
539
+ )
540
+ )
541
+ raise
378
542
379
543
if item :
380
544
item = item [1 ]
0 commit comments