4
4
from datetime import datetime
5
5
import io
6
6
from glob import glob
7
- from uuid import uuid1
8
7
from tabulate import tabulate
9
8
from collections import OrderedDict
10
9
from datetime import datetime
@@ -37,7 +36,7 @@ def to_markdown_table(data: OrderedDict) -> str:
37
36
def y_to_table (y ):
38
37
return tabulate (list (y .items ()), tablefmt = "pipe" )
39
38
40
- class log :
39
+ class Log :
41
40
@property
42
41
def timestamp (self ):
43
42
"""str: Current date."""
@@ -49,15 +48,15 @@ def prefix(self):
49
48
50
49
def __init__ (self ,subdir ,path = "./log" ,name = None ):
51
50
"""
52
- Logs data into csvs with timestamps.
51
+ Logs data into jsonls with timestamps.
53
52
54
53
Example:
55
54
log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
56
55
57
56
log/YEAR/MONTH/
58
57
├─ experiment_0/
59
- │ ├─ reactor_0.csv
60
- │ ├─ reactor_1.csv
58
+ │ ├─ reactor_0.jsonl
59
+ │ ├─ reactor_1.jsonl
61
60
62
61
Args:
63
62
subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`.
@@ -75,7 +74,7 @@ def __init__(self,subdir,path="./log",name=None):
75
74
self .subdir = subdir
76
75
else :
77
76
raise ValueError ("Invalid type for subdir. Must be either a list of strings or a glob string." )
78
- self .subdir = list (map (lambda x : str (x )+ ".csv " if len (os .path .splitext (str (x ))[1 ])== 0 else str (x ),self .subdir ))
77
+ self .subdir = list (map (lambda x : str (x )+ ".jsonl " if len (os .path .splitext (str (x ))[1 ])== 0 else str (x ),self .subdir ))
79
78
self .first_timestamp = None
80
79
self .data_frames = {}
81
80
@@ -88,7 +87,7 @@ def __init__(self,subdir,path="./log",name=None):
88
87
self .subdir = subdir
89
88
else :
90
89
raise ValueError ("Invalid type for subdir. Must be either a list of strings or a glob string." )
91
- self .subdir = list (map (lambda x : str (x )+ ".csv " if len (os .path .splitext (str (x ))[1 ])== 0 else str (x ),self .subdir ))
90
+ self .subdir = list (map (lambda x : str (x )+ ".jsonl " if len (os .path .splitext (str (x ))[1 ])== 0 else str (x ),self .subdir ))
92
91
self .first_timestamp = None
93
92
self .data_frames = {}
94
93
@@ -100,19 +99,18 @@ def backup_config_file(self):
100
99
with open (config_file ) as cfile , open (filename ,'w' ) as wfile :
101
100
wfile .write (cfile .read ())
102
101
103
- def log_rows (self ,rows ,subdir ,add_timestamp = True ,tags = None , ** kwargs ):
102
+ def log_rows (self ,rows ,subdir ,add_timestamp = True ,tags = None ):
104
103
"""
105
- Logs rows into csv format.
104
+ Logs rows into jsonl format.
106
105
107
106
Args:
108
107
rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe.
109
108
subdir (str): Subdirectory name. Intended to be an element of `self.subdir`.
110
109
add_timestamp (bool,optional): Whether or not to include a timestamp column.
111
110
tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns.
112
- **kwargs: Additional arguments passed to `pandas.to_csv`.
113
111
"""
114
112
t = self .timestamp
115
- path = os .path .join (self .path ,self .start_timestamp ,f"{ subdir } .csv " )
113
+ path = os .path .join (self .path ,self .start_timestamp ,f"{ subdir } .jsonl " )
116
114
117
115
df = pd .DataFrame ()
118
116
if isinstance (rows ,list ):
@@ -125,7 +123,7 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs):
125
123
if os .path .exists (path ):
126
124
if self .first_timestamp is None :
127
125
with open (path ) as file :
128
- head = pd .read_csv (io .StringIO (file .readline ()+ file .readline ()),index_col = False , ** kwargs )
126
+ head = pd .read_json (io .StringIO (file .readline ()+ file .readline ()), orient = "records" , lines = True )
129
127
self .first_timestamp = datetime_from_str (head .log_timestamp [0 ])
130
128
else :
131
129
self .first_timestamp = t
@@ -136,25 +134,21 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs):
136
134
for key ,value in tags .items ():
137
135
df .loc [:,key ] = value
138
136
139
- df .to_csv (
140
- path ,
141
- mode = "a" ,
142
- header = not os .path .exists (path ),
143
- index = False ,
144
- ** kwargs
145
- )
137
+ with open (path , mode = "a" ) as log_file :
138
+ log_file .write (df .to_json (orient = "records" , lines = True ))
139
+
146
140
return df
147
141
def log_many_rows (self ,data ,** kwargs ):
148
142
"""
149
- Logs rows into csv format.
143
+ Logs rows into jsonl format.
150
144
151
145
Args:
152
146
data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame.
153
147
**kwargs: Additional arguments passed to `self.log_rows`.
154
148
"""
155
149
self .data_frames = {}
156
150
for _id ,row in data .items ():
157
- df = self .log_rows (rows = [row ],subdir = _id ,sep = ' \t ' , ** kwargs )
151
+ df = self .log_rows (rows = [row ],subdir = _id ,** kwargs )
158
152
self .data_frames [_id ] = df
159
153
self .data_frames = pd .concat (list (self .data_frames .values ()))
160
154
@@ -164,7 +158,7 @@ def log_optimal(self,column,maximum=True,**kwargs):
164
158
"""
165
159
i = self .data_frames .loc [:,column ].astype (float ).argmax () if maximum else self .data_frames .loc [:,column ].astype (float ).argmin ()
166
160
self .df_opt = self .data_frames .iloc [i ,:]
167
- self .log_rows (rows = [self .df_opt .to_dict ()],subdir = 'opt' ,sep = ' \t ' , ** kwargs )
161
+ self .log_rows (rows = [self .df_opt .to_dict ()],subdir = 'opt' ,** kwargs )
168
162
169
163
def log_average (self , cols : list , ** kwargs ):
170
164
"""
@@ -178,38 +172,38 @@ def log_average(self, cols: list, **kwargs):
178
172
df .loc [:, cols ] = df .loc [:, cols ].astype (float )
179
173
df .elapsed_time_hours = df .elapsed_time_hours .round (decimals = 2 )
180
174
self .df_avg = df .loc [:, cols + ['elapsed_time_hours' ]].groupby ("elapsed_time_hours" ).mean ().reset_index ()
181
- self .log_rows (rows = self .df_avg , subdir = 'avg' , sep = ' \t ' , ** kwargs )
175
+ self .log_rows (rows = self .df_avg , subdir = 'avg' , ** kwargs )
182
176
183
- def cache_data (self ,rows ,path = "./cache.csv " ,** kwargs ):
177
+ def cache_data (self ,rows ,path = "./cache.jsonl " ,** kwargs ):
184
178
"""
185
- Dumps rows into a single csv .
179
+ Dumps rows into a single jsonl .
186
180
187
181
Args:
188
182
rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows.
189
- path (str): Path to the csv file.
183
+ path (str): Path to the jsonl file.
190
184
"""
191
- pd .DataFrame (rows ).T .to_csv (path ,** kwargs )
185
+ pd .DataFrame (rows ).T .to_json (path , orient = "records" , lines = True , ** kwargs )
192
186
193
- def transpose (self ,columns ,destination ,sep = ' \t ' , skip = 1 ,** kwargs ):
187
+ def transpose (self ,columns ,destination ,skip = 1 ,** kwargs ):
194
188
"""
195
- Maps reactor csv to column csvs with columns given by columns.
189
+ Maps reactor jsonl to column jsonls with columns given by columns.
196
190
197
191
Args:
198
192
columns (:obj:list of :obj:str): List of columns to extract.
199
193
destination (str): Destination path. Creates directories as needed and overwrites any existing files.
200
- sep (str, optional): Column separator. Defaults to ' \t '.
194
+
201
195
skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
202
196
"""
203
197
dfs = []
204
198
for file in self .paths :
205
- df = pd .read_csv (file ,index_col = False , sep = sep , ** kwargs )
199
+ df = pd .read_json (file , orient = "records" , lines = True , ** kwargs )
206
200
df ['FILE' ] = file
207
201
dfs .append (df .iloc [::skip ,:])
208
202
df = pd .concat (dfs )
209
203
210
204
for column in columns :
211
205
Path (destination ).mkdir (parents = True ,exist_ok = True )
212
- df .loc [:,['ID' ,'FILE' ,column ,'elapsed_time_hours' ]].to_csv (os .path .join (destination ,f"{ column } .csv " ),sep = sep )
206
+ df .loc [:,['ID' ,'FILE' ,column ,'elapsed_time_hours' ]].to_json (os .path .join (destination ,f"{ column } .jsonl " ), orient = "records" , lines = True )
213
207
214
208
215
209
class LogAggregator :
@@ -225,20 +219,19 @@ def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elap
225
219
self .glob_list = log_paths
226
220
self .timestamp_col = timestamp_col
227
221
self .elapsed_time_col = elapsed_time_col
228
- def agg (self ,destination ,skip = 1 ,sep = ' \t ' , ** kwargs ):
222
+ def agg (self ,destination ,skip = 1 ,** kwargs ):
229
223
"""
230
224
Aggregator
231
225
232
226
Args:
233
227
destination (str): Destination path. Creates directories as needed and overwrites any existing files.
234
228
skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
235
- sep (str, optional): Column separator. Defaults to '\t '.
236
229
"""
237
230
dfs = {}
238
231
for path in self .glob_list :
239
232
for file in glob (path ):
240
233
basename = os .path .basename (file )
241
- df = pd .read_csv (file ,index_col = False , sep = sep , dtype = {self .elapsed_time_col :float },** kwargs )
234
+ df = pd .read_json (file , orient = "records" , lines = True , dtype = {self .elapsed_time_col :float },** kwargs )
242
235
df = df .iloc [::skip ,:]
243
236
df ['FILE' ] = file
244
237
if dfs .get (basename ,None ) is not None :
@@ -256,5 +249,5 @@ def agg(self,destination,skip=1,sep='\t',**kwargs):
256
249
for filename , df in dfs .items ():
257
250
Path (destination ).mkdir (parents = True ,exist_ok = True )
258
251
path = os .path .join (destination ,filename )
259
- df .to_csv (path ,sep = sep , index = False )
252
+ df .to_json (path , orient = "records" , lines = True )
260
253
0 commit comments