Skip to content

Commit 7b2bcd1

Browse files
Support multithreads for TriOS raw triplets
1 parent 6a1a5f8 commit 7b2bcd1

File tree

2 files changed

+43
-25
lines changed

2 files changed

+43
-25
lines changed

Source/Controller.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,15 @@
88
from Source.HDFRoot import HDFRoot
99
from Source.MainConfig import MainConfig
1010
from Source.ConfigFile import ConfigFile
11-
# from Source.ProcessL1a import ProcessL1a
1211
from Source.ProcessL1aSeaBird import ProcessL1aSeaBird
1312
from Source.ProcessL1aDALEC import ProcessL1aDALEC
14-
# from Source.TriosL1A import TriosL1A
1513
from Source.ProcessL1aTriOS import ProcessL1aTriOS
1614
from Source.ProcessL1aSoRad import ProcessL1aSoRad
1715
from Source.AncillaryReader import AncillaryReader
1816
from Source.ProcessL1aqc import ProcessL1aqc
1917
from Source.CalibrationFileReader import CalibrationFileReader
2018
from Source.CalibrationFile import CalibrationFile
2119
from Source.ProcessL1b import ProcessL1b
22-
# from Source.TriosL1B import TriosL1B
2320
from Source.ProcessL1bTriOS import ProcessL1bTriOS
2421
from Source.ProcessL1bDALEC import ProcessL1bDALEC
2522
from Source.ProcessL1bqc import ProcessL1bqc
@@ -563,7 +560,7 @@ def processSingleLevel(pathOut, inFilePath, calibrationMap, level):
563560
if flag_Trios and level == "L1A":
564561
# inFilePath is a list of filepath strings at L1A
565562
# Grab input name and extension of first file
566-
inFileName = os.path.split(inFilePath[0])[1]
563+
inFileName = os.path.split(inFilePath[0])[1]
567564
else:
568565
# inFilePath is a singleton filepath string
569566
inFilePath = os.path.abspath(inFilePath)

run_Sample_Data.py

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,24 @@
2121
# have been provided in the HyperCP repository. The configuration file (./Config/[sample].cfg) can
2222
# also be edited by hand.
2323
# NOTE: Multithreading is available to run multiple files simulataneously.
24-
# BUG: Multithreading not yet available for manually acquired TriOS (.mlb) raw files (e.g., multi-level)
24+
# Multithreading for manually acquired TriOS (.mlb) raw files (e.g., multi-level) is now supported
2525
# NOTE: This script cannot be run on the same repository simultaneously with alternate configurations.
2626
# NOTE: By default this processes all files in the PROC_LEVEL -1 level directory to PROC_LEVEL directory.
2727
#
2828
# D. Aurin NASA/GSFC Aug 2024
2929

3030
################################################### CUSTOM SET UP ###################################################
3131
# Batch options
32-
MULTI_TASK = False # Multiple threads for HyperSAS (any level) or TriOS (only L1A and up)
32+
MULTI_TASK = True # Multiple threads for HyperSAS (any level) or TriOS (only L1A and up)
3333
MULTI_LEVEL = False # Process raw (L0) to Level-2 (L2)
3434
CLOBBER = True # True overwrites existing files
3535
PROC_LEVEL = "L1A" # Process to this level: L1A, L1AQC, L1B, LBQC, L2 (ignored for MULTI_LEVEL)
3636

3737
# Dataset options
38-
PLATFORM = "pySAS"
39-
# PLATFORM = "Manual_TriOS"
40-
INST_TYPE = "SEABIRD" # SEABIRD or TRIOS; defines raw file naming
41-
# INST_TYPE = "TRIOS"
38+
# PLATFORM = "pySAS"
39+
PLATFORM = "Manual_TriOS"
40+
# INST_TYPE = "SEABIRD" # SEABIRD or TRIOS; defines raw file naming
41+
INST_TYPE = "TRIOS"
4242
CRUISE = "FICE22"
4343
# L1B_REGIME: Optional. [Default, Class, Full]
4444
# Denote FRM processing regime and use appropriately named subdirectories.
@@ -56,7 +56,7 @@
5656
PATH_DATA = os.path.join(PATH_HCP,'Data','Sample_Data',PLATFORM)
5757
##################################
5858

59-
if PLATFORM.lower == "Manual_TriOS":
59+
if PLATFORM.lower() == "manual_trios":
6060
PATH_ANC = os.path.join(
6161
PATH_DATA, f"{CRUISE}_TriOS_Ancillary.sb",
6262
)
@@ -67,7 +67,7 @@
6767

6868
if MULTI_LEVEL or PROC_LEVEL == "L1A":
6969
PATH_INPUT = PATH_DATA
70-
else:
70+
else:
7171
PATH_INPUT = os.path.join(PATH_DATA, L1B_REGIME)
7272

7373
# PATH_OUTPUT does not require folder names of data levels. HyperCP will automate that.
@@ -125,22 +125,21 @@ def run_Command(fp_input_files):
125125
# One or more files. (fp_input_files is a list of one or more files)
126126
from_level = FROM_LEVELS[0]
127127
to_level = "L1A"
128-
inputFileBase = fp_input_files # Full-path file
128+
# inputFileBase = fp_input_files # Full-path file list of all in L1A
129129
test = [
130-
os.path.exists(inputFileBase[i])
130+
os.path.exists(fp_input_files[i])
131131
for i, x in enumerate(fp_input_files)
132132
if os.path.exists(x)
133133
]
134134
if not test:
135135
print("***********************************")
136-
print(f"*** [{inputFileBase}] STOPPED PROCESSING ***")
136+
print(f"*** [{fp_input_files}] STOPPED PROCESSING ***")
137137
print(f"Bad input path: {fp_input_files}")
138138
print("***********************************")
139139
return
140-
inputFileBase = os.path.splitext(os.path.basename(fp_input_files[0]))[0] # 'FRM4SOC2_FICE22_NASA_20220715_120000_L1BQC'
141-
if (INST_TYPE.lower() == "seabird"
142-
and inputFileBase in to_skip[to_level]
143-
and not CLOBBER):
140+
inputFileBase = os.path.splitext(os.path.basename(fp_input_files[0]))[0] # single file no path
141+
test = [v for v in to_skip[to_level] if v in inputFileBase]
142+
if (test and not CLOBBER):
144143
print("************************************************")
145144
print(f"*** [{inputFileBase}] ALREADY PROCESSED TO {to_level} ***")
146145
print("************************************************")
@@ -162,15 +161,24 @@ def run_Command(fp_input_files):
162161
else:
163162
# One file at a time with or without multithread. (fp_input_files is a string of one file)
164163
for from_level, to_level, ext in zip(FROM_LEVELS, TO_LEVELS, FILE_EXT):
165-
inputFileBase = os.path.splitext(os.path.basename(fp_input_files))[0]
166-
test = os.path.exists(fp_input_files)
164+
if from_level == 'RAW' and INST_TYPE.lower() == 'trios':
165+
inputFileBase = os.path.splitext(os.path.basename(fp_input_files[0]))[0] # single file no path
166+
test = [
167+
os.path.exists(fp_input_files[i])
168+
for i, x in enumerate(fp_input_files)
169+
if os.path.exists(x)
170+
]
171+
else:
172+
inputFileBase = os.path.splitext(os.path.basename(fp_input_files))[0]
173+
test = os.path.exists(fp_input_files)
167174
if not test:
168175
print("***********************************")
169176
print(f"*** [{inputFileBase}] STOPPED PROCESSING ***")
170177
print(f"Bad input path: {fp_input_files}")
171178
print("***********************************")
172179
break
173-
if inputFileBase in to_skip[to_level] and not CLOBBER:
180+
test = [v for v in to_skip[to_level] if v in inputFileBase]
181+
if test and not CLOBBER:
174182
print("************************************************")
175183
print(f"*** [{inputFileBase}] ALREADY PROCESSED TO {to_level} ***")
176184
print("************************************************")
@@ -193,7 +201,7 @@ def run_Command(fp_input_files):
193201
def worker(fp_input_files):
194202
# fp_input_files is a list unless multitasking, in which case it's a string, unless it's TriOS RAW
195203
if isinstance(fp_input_files, list):
196-
if INST_TYPE.lower() == "trios" and MULTI_LEVEL:
204+
if INST_TYPE.lower() == "trios" and (MULTI_LEVEL or 'RAW' in FROM_LEVELS):
197205
print(f"### Processing {fp_input_files} ...")
198206
run_Command(fp_input_files)
199207
else:
@@ -225,8 +233,21 @@ def worker(fp_input_files):
225233
# memory is used (~3GB) for each process so you may not be able to
226234
# use all cores of the system with problems.
227235
with multiprocessing.Pool(4) as pool:
228-
# One file (string) at a time to worker
229-
pool.map(worker, fpf_input)
236+
if INST_TYPE.lower() == 'trios' and FROM_LEVELS[0] == 'RAW':
237+
# Here we need a list of three files for each raw collection, or maybe a list of list triplets
238+
fpf_input_triplets = []
239+
for item in fpf_input:
240+
inputFileBase = os.path.splitext(os.path.basename(item))[0]
241+
timeStamp = inputFileBase[len(inputFileBase)-15:-1] # Subject to string error for non-compliant filenames
242+
index = [i for i,x in enumerate(fpf_input) if timeStamp in x]
243+
fpf_input_triplet = [fpf_input[x] for x in index]
244+
fpf_input_triplets.append(fpf_input_triplet)
245+
246+
unique_fpf_input_triplets = [list(x) for x in set(tuple(x) for x in fpf_input_triplets)]
247+
pool.map(worker, unique_fpf_input_triplets)
248+
else:
249+
# One file (string) at a time to worker
250+
pool.map(worker, fpf_input)
230251
else:
231252
# List of one or more files
232253
worker(fpf_input)

0 commit comments

Comments
 (0)