Skip to content

Commit 9642f52

Browse files
committed
feat: proper parallelization with queue fetching
1 parent f340db6 commit 9642f52

File tree

2 files changed

+118
-130
lines changed

2 files changed

+118
-130
lines changed

lib/hrma/build/document_generator.rb

Lines changed: 59 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def generate
3838
xsd_files = load_xsd_files
3939
return if xsd_files.empty?
4040

41+
puts "Found #{xsd_files.size} XSD files to process"
4142
@progressbar = create_progressbar(xsd_files.size)
4243

4344
if options[:parallel] && ractor_supported? && !ENV["HRMA_DISABLE_RACTORS"]
@@ -85,6 +86,7 @@ def generate_sequential(xsd_files)
8586
puts "Generating documentation sequentially..."
8687

8788
xsd_files.each do |xsd_file|
89+
puts "Processing: #{xsd_file}"
8890
process_xsd_file(xsd_file)
8991
progressbar.increment
9092
end
@@ -143,88 +145,81 @@ def generate_parallel(xsd_files)
143145
failed_files = []
144146
mutex = Mutex.new
145147

146-
# Split files into chunks for each Ractor
147-
file_chunks = xsd_files.each_slice((xsd_files.size.to_f / ractor_count).ceil).to_a
148-
puts "Split #{xsd_files.size} files into #{file_chunks.size} chunks..."
149-
150148
# Pass necessary tool paths to Ractors
151149
tools_constants = {
152150
xsdvi_path: Tools::XSDVI_PATH,
153151
xsdmerge_path: Tools::XSDMERGE_PATH,
154152
xs3p_path: Tools::XS3P_PATH
155153
}
156154

157-
# Create and process Ractors
158-
process_with_ractors(file_chunks, tools_constants, mutex, failed_files)
155+
# Create a pool Ractor that will distribute work
156+
pool = Ractor.new do
157+
# This Ractor acts as a work distributor
158+
loop do
159+
# Receive a file to process and yield it to any worker that asks
160+
file = Ractor.receive
161+
puts "Queue: Received file #{file} for processing"
162+
Ractor.yield(file)
163+
end
164+
end
159165

160-
if !failed_files.empty?
161-
puts "\nFailed to process #{failed_files.size} files. See logs for details."
166+
# Create worker Ractors
167+
workers = ractor_count.times.map do |i|
168+
Ractor.new(pool, @log_dir, Dir.pwd, tools_constants, i) do |pool, log_dir, pwd, tool_paths, id|
169+
# Worker Ractor
170+
loop do
171+
begin
172+
# Take a file from the pool
173+
file = pool.take
174+
puts "Worker #{id}: Processing file #{file}"
175+
176+
# Process the file
177+
result = Hrma::Build::RactorDocumentProcessor.process_single_file(file, log_dir, pwd, tool_paths)
178+
179+
# Yield the result
180+
Ractor.yield([file, *result])
181+
rescue Ractor::ClosedError
182+
# Pool is closed, exit the loop
183+
puts "Worker #{id}: Pool closed, exiting"
184+
break
185+
end
186+
end
187+
end
162188
end
163-
end
164189

165-
# Process files using Ractors
166-
#
167-
# @param file_chunks [Array<Array<String>>] Chunks of files to process
168-
# @param tools_constants [Hash] Paths to required tools
169-
# @param mutex [Mutex] Mutex for ractor safety
170-
# @param failed_files [Array] List to store failed files
171-
# @return [void]
172-
def process_with_ractors(file_chunks, tools_constants, mutex, failed_files)
173-
# Create a Ractor for each chunk
174-
ractors = create_ractors(file_chunks, tools_constants)
190+
# Send all files to the pool
191+
xsd_files.each do |file|
192+
puts "Main: Sending file #{file} to queue"
193+
pool.send(file)
194+
end
175195

176196
# Process results as they come in
177-
process_ractor_results(ractors, mutex, failed_files)
178-
end
197+
xsd_files.size.times do
198+
# Wait for any worker to produce a result
199+
worker, result = Ractor.select(*workers)
179200

180-
# Create Ractors for processing file chunks
181-
#
182-
# @param file_chunks [Array<Array<String>>] Chunks of files to process
183-
# @param tools_constants [Hash] Paths to required tools
184-
# @return [Array<Ractor>] List of created Ractors
185-
def create_ractors(file_chunks, tools_constants)
186-
ractors = []
187-
file_chunks.each_with_index do |chunk, index|
188-
puts "Creating Ractor #{index} for #{chunk.size} files..."
189-
190-
# Create Ractor with minimal context
191-
ractor = Ractor.new(chunk, @log_dir, Dir.pwd, tools_constants) do |files, log_dir, pwd, tool_paths|
192-
# Use our dedicated processor class inside the Ractor
193-
Hrma::Build::RactorDocumentProcessor.process(files, log_dir, pwd, tool_paths)
194-
end
201+
# Process the result
202+
file, success, error_message = result
195203

196-
ractors << ractor
197-
end
198-
ractors
199-
end
204+
mutex.synchronize do
205+
progressbar.increment
200206

201-
# Process results from Ractors
202-
#
203-
# @param ractors [Array<Ractor>] List of Ractors
204-
# @param mutex [Mutex] Mutex for ractor safety
205-
# @param failed_files [Array] List to store failed files
206-
# @return [void]
207-
def process_ractor_results(ractors, mutex, failed_files)
208-
ractors.each_with_index do |ractor, index|
209-
begin
210-
puts "Waiting for results from Ractor #{index}..."
211-
results = ractor.take
212-
213-
# Process the results
214-
results.each do |file, success, error_message|
215-
mutex.synchronize do
216-
progressbar.increment
217-
218-
if !success && error_message
219-
failed_files << "#{file}: #{error_message}"
220-
puts "\nError processing #{file}: #{error_message}"
221-
end
222-
end
207+
if success
208+
puts "Main: Successfully processed #{file}"
209+
else
210+
failed_files << "#{file}: #{error_message}"
211+
puts "\nError processing #{file}: #{error_message}"
223212
end
224-
rescue => e
225-
puts "\nError getting results from Ractor #{index}: #{e.message}"
226213
end
227214
end
215+
216+
# Close the pool to signal workers to exit
217+
puts "Main: All files processed, closing pool"
218+
pool.close_outgoing
219+
220+
if !failed_files.empty?
221+
puts "\nFailed to process #{failed_files.size} files. See logs for details."
222+
end
228223
end
229224

230225
# Process a single XSD file

lib/hrma/build/documentation.rb

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def generate(options = {})
2929
xsd_files = load_xsd_files(options[:manifest_path])
3030
return if xsd_files.empty?
3131

32+
puts "Found #{xsd_files.size} XSD files to process"
3233
progressbar = create_progressbar(xsd_files.size)
3334

3435
if options[:parallel] && ractor_supported? && !ENV["HRMA_DISABLE_RACTORS"]
@@ -77,6 +78,7 @@ def generate_sequential(xsd_files, progressbar)
7778

7879
# Process each XSD file
7980
xsd_files.each do |xsd_file|
81+
puts "Processing: #{xsd_file}"
8082
process_xsd_file(xsd_file)
8183
progressbar.increment
8284
end
@@ -189,90 +191,81 @@ def generate_parallel(xsd_files, options, progressbar)
189191
failed_files = []
190192
mutex = Mutex.new
191193

192-
# Split files into chunks for each Ractor
193-
file_chunks = xsd_files.each_slice((xsd_files.size.to_f / ractor_count).ceil).to_a
194-
puts "Split #{xsd_files.size} files into #{file_chunks.size} chunks..."
195-
196194
# Pass necessary tool paths to Ractors
197195
tools_constants = {
198196
xsdvi_path: Tools::XSDVI_PATH,
199197
xsdmerge_path: Tools::XSDMERGE_PATH,
200198
xs3p_path: Tools::XS3P_PATH
201199
}
202200

203-
# Create and process Ractors
204-
process_with_ractors(file_chunks, tools_constants, mutex, progressbar, failed_files)
201+
# Create a pool Ractor that will distribute work
202+
pool = Ractor.new do
203+
# This Ractor acts as a work distributor
204+
loop do
205+
# Receive a file to process and yield it to any worker that asks
206+
file = Ractor.receive
207+
puts "Queue: Received file #{file} for processing"
208+
Ractor.yield(file)
209+
end
210+
end
205211

206-
if !failed_files.empty?
207-
puts "\nFailed to process #{failed_files.size} files. See logs for details."
212+
# Create worker Ractors
213+
workers = ractor_count.times.map do |i|
214+
Ractor.new(pool, Hrma::Config.log_dir, Dir.pwd, tools_constants, i) do |pool, log_dir, pwd, tool_paths, id|
215+
# Worker Ractor
216+
loop do
217+
begin
218+
# Take a file from the pool
219+
file = pool.take
220+
puts "Worker #{id}: Processing file #{file}"
221+
222+
# Process the file
223+
result = Hrma::Build::RactorDocumentProcessor.process_single_file(file, log_dir, pwd, tool_paths)
224+
225+
# Yield the result
226+
Ractor.yield([file, *result])
227+
rescue Ractor::ClosedError
228+
# Pool is closed, exit the loop
229+
puts "Worker #{id}: Pool closed, exiting"
230+
break
231+
end
232+
end
233+
end
208234
end
209-
end
210235

211-
# Process files using Ractors
212-
#
213-
# @param file_chunks [Array<Array<String>>] Chunks of files to process
214-
# @param tools_constants [Hash] Paths to required tools
215-
# @param mutex [Mutex] Mutex for ractor safety
216-
# @param progressbar [ProgressBar] Progress bar for tracking document generation
217-
# @param failed_files [Array] List to store failed files
218-
# @return [void]
219-
def process_with_ractors(file_chunks, tools_constants, mutex, progressbar, failed_files)
220-
# Create a Ractor for each chunk
221-
ractors = create_ractors(file_chunks, tools_constants)
236+
# Send all files to the pool
237+
xsd_files.each do |file|
238+
puts "Main: Sending file #{file} to queue"
239+
pool.send(file)
240+
end
222241

223242
# Process results as they come in
224-
process_ractor_results(ractors, mutex, progressbar, failed_files)
225-
end
243+
xsd_files.size.times do
244+
# Wait for any worker to produce a result
245+
worker, result = Ractor.select(*workers)
226246

227-
# Create Ractors for processing file chunks
228-
#
229-
# @param file_chunks [Array<Array<String>>] Chunks of files to process
230-
# @param tools_constants [Hash] Paths to required tools
231-
# @return [Array<Ractor>] List of created Ractors
232-
def create_ractors(file_chunks, tools_constants)
233-
ractors = []
234-
file_chunks.each_with_index do |chunk, index|
235-
puts "Creating Ractor #{index} for #{chunk.size} files..."
236-
237-
# Create Ractor with minimal context
238-
ractor = Ractor.new(chunk, Hrma::Config.log_dir, Dir.pwd, tools_constants) do |files, log_dir, pwd, tool_paths|
239-
# Use our dedicated processor class inside the Ractor
240-
Hrma::Build::RactorDocumentProcessor.process(files, log_dir, pwd, tool_paths)
241-
end
247+
# Process the result
248+
file, success, error_message = result
242249

243-
ractors << ractor
244-
end
245-
ractors
246-
end
250+
mutex.synchronize do
251+
progressbar.increment
247252

248-
# Process results from Ractors
249-
#
250-
# @param ractors [Array<Ractor>] List of Ractors
251-
# @param mutex [Mutex] Mutex for ractor safety
252-
# @param progressbar [ProgressBar] Progress bar for tracking document generation
253-
# @param failed_files [Array] List to store failed files
254-
# @return [void]
255-
def process_ractor_results(ractors, mutex, progressbar, failed_files)
256-
ractors.each_with_index do |ractor, index|
257-
begin
258-
puts "Waiting for results from Ractor #{index}..."
259-
results = ractor.take
260-
261-
# Process the results
262-
results.each do |file, success, error_message|
263-
mutex.synchronize do
264-
progressbar.increment
265-
266-
if !success && error_message
267-
failed_files << "#{file}: #{error_message}"
268-
puts "\nError processing #{file}: #{error_message}"
269-
end
270-
end
253+
if success
254+
puts "Main: Successfully processed #{file}"
255+
else
256+
failed_files << "#{file}: #{error_message}"
257+
puts "\nError processing #{file}: #{error_message}"
271258
end
272-
rescue => e
273-
puts "\nError getting results from Ractor #{index}: #{e.message}"
274259
end
275260
end
261+
262+
# Close the pool to signal workers to exit
263+
puts "Main: All files processed, closing pool"
264+
pool.close_outgoing
265+
266+
if !failed_files.empty?
267+
puts "\nFailed to process #{failed_files.size} files. See logs for details."
268+
end
276269
end
277270

278271
# Create a progress bar

0 commit comments

Comments
 (0)