1
- """input output """
1
+ """the data that produced by scanner and send to workshop """
2
2
import os
3
- import sys
4
- import pickle
5
3
import hashlib
6
- from multiprocessing import Pool , Queue
4
+ from multiprocessing import Pool
7
5
8
6
from gevent .lock import RLock
9
7
10
8
from Ingram .utils import color
11
9
from Ingram .utils import logger
12
10
from Ingram .utils import singleton
11
+ from Ingram .utils import get_current_time
13
12
from Ingram .utils import get_ip_seg_len , get_all_ip
14
13
15
14
@@ -19,9 +18,9 @@ class Data:
19
18
def __init__ (self , _input , output ):
20
19
self .input = _input
21
20
self .output = output
22
- self .msg_queue = Queue ()
23
21
self .var_lock = RLock ()
24
22
self .file_lock = RLock ()
23
+ self .create_time = get_current_time ()
25
24
self .taskid = hashlib .md5 ((self .input + self .output ).encode ('utf-8' )).hexdigest ()
26
25
27
26
self .total = 0
@@ -63,39 +62,75 @@ def preprocess(self):
63
62
64
63
# the location to begin
65
64
if self .done != 0 :
66
- current = 0
67
- while self .lines :
68
- line = self .lines .pop (0 )
69
- current += get_ip_seg_len (line )
70
- if current == self .done :
71
- break
72
- elif current < self .done :
73
- continue
74
- else :
75
- ips = get_all_ip (line )
76
- self .lines = ips [- (current - self .done ):] + self .lines
77
- break
78
- logger .debug (f"current: { current } , done: { self .done } , total: { self .total } " )
65
+ for _ in range (self .done ):
66
+ next (self .ip_generator )
67
+ # current = 0
68
+ # while self.lines:
69
+ # line = self.lines.pop(0)
70
+ # current += get_ip_seg_len(line)
71
+ # if current == self.done:
72
+ # break
73
+ # elif current < self.done:
74
+ # continue
75
+ # else:
76
+ # ips = get_all_ip(line)
77
+ # self.lines = ips[-(current - self.done):] + self.lines
78
+ # break
79
+ # logger.debug(f"current: {current}, done: {self.done}, total: {self.total}")
79
80
80
81
# found
81
82
results_file = os .path .join (self .output , 'results.csv' )
82
83
if os .path .exists (results_file ):
83
84
with open (results_file , 'r' ) as f :
84
85
self .found = len ([l for l in f if l .strip ()])
85
86
86
- self .vuls = open (results_file , 'a' )
87
- self .not_vuls = open (os .path .join (self .output , 'not_vulnerable.csv' ), 'a' )
87
+ self .vul = open (results_file , 'a' )
88
+ self .not_vul = open (os .path .join (self .output , 'not_vulnerable.csv' ), 'a' )
88
89
89
90
def ip_generate (self ):
90
91
for line in self .lines :
91
- ips = get_all_ip (line )
92
- for ip in ips :
93
- yield ip
92
+ yield from get_all_ip (line )
93
+
94
+ def get_total (self ):
95
+ with self .var_lock :
96
+ return self .total
97
+
98
+ def get_done (self ):
99
+ with self .var_lock :
100
+ return self .done
101
+
102
+ def get_found (self ):
103
+ with self .var_lock :
104
+ return self .found
105
+
106
+ def found_add (self ):
107
+ with self .var_lock :
108
+ self .found += 1
109
+
110
+ def done_add (self ):
111
+ with self .var_lock :
112
+ self .done += 1
113
+
114
+ def vul_add (self , item ):
115
+ with self .file_lock :
116
+ self .vul .writelines (item )
117
+ self .vul .flush ()
118
+
119
+ def not_vul_add (self , item ):
120
+ with self .file_lock :
121
+ self .not_vul .writelines (item )
122
+ self .not_vul .flush ()
123
+
124
+ def record_running_state (self ):
125
+ # every 5 minutes
126
+ with self .var_lock :
127
+ time_interval = int (get_current_time () - self .create_time )
128
+ if time_interval % (5 * 60 ) == 0 :
129
+ logger .info (f"#@#{ self .taskid } #@#{ self .done } #@#running state" )
94
130
95
131
def __del__ (self ):
96
- try : # if dont add try, sys.exit() may cause error
97
- self .vuls .close ()
98
- self .not_vuls .close ()
99
- self .msg_queue .close ()
132
+ try : # if dont use try, sys.exit() may cause error
133
+ self .vul .close ()
134
+ self .not_vul .close ()
100
135
except Exception as e :
101
136
logger .error (e )
0 commit comments