Skip to content

Conversation

@a4894z
Copy link
Collaborator

@a4894z a4894z commented Oct 18, 2025

In parallel.py, I was always getting errors of the sort:

[rank2]:[I1017 19:34:58.923985894 ProcessGroupWrapper.cpp:587] [Rank 2] Running collective: CollectiveFingerPrint(SequenceNumber=38, OpType=ALLREDUCE, TensorShape=[77, 1], TensorDtypes=Float, TensorDeviceTypes=TensorOptions(dtype=float (default), device=cuda, layout=Strided (default), requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)))

[rank1]:[I1017 19:34:58.999468354 ProcessGroupWrapper.cpp:587] [Rank 1] Running collective: CollectiveFingerPrint(SequenceNumber=38, OpType=ALLREDUCE, TensorShape=[100, 1], TensorDtypes=Float, TensorDeviceTypes=TensorOptions(dtype=float (default), device=cuda, layout=Strided (default), requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)))

[rank3]:[I1017 19:34:58.003353312 ProcessGroupWrapper.cpp:587] [Rank 3] Running collective: CollectiveFingerPrint(SequenceNumber=38, OpType=ALLREDUCE, TensorShape=[100, 1], TensorDtypes=Float, TensorDeviceTypes=TensorOptions(dtype=float (default), device=cuda, layout=Strided (default), requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)))

[rank0]:[I1017 19:34:58.003669532 ProcessGroupWrapper.cpp:587] [Rank 0] Running collective: CollectiveFingerPrint(SequenceNumber=38, OpType=ALLREDUCE, TensorShape=[100, 1], TensorDtypes=Float, TensorDeviceTypes=TensorOptions(dtype=float (default), device=cuda, layout=Strided (default), requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)))

Notice that the TensorShape is different for rank = 2.

I've tested the fix in this pull request on Califone and other GPU machines for Sector 9 and it seems to have solved this issue.

@a4894z a4894z requested review from Copilot and mdw771 October 18, 2025 01:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue in parallel.py where all_reduce operations failed due to inconsistent tensor buffer sizes across different ranks. The fix ensures that all ranks operate on buffers with the same shape during all_reduce operations.

  • Modified the sync_buffer function to handle all_reduce operations differently from broadcast operations
  • Added logic to use temporary buffers when operating on indexed subsets during all_reduce
  • Preserved existing broadcast behavior while fixing the tensor shape consistency issue

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +114 to +118
# Create a temporary buffer for the indexed elements
temp_buffer = torch.zeros_like(buffer)
temp_buffer[indices] = buffer[indices]
dist.all_reduce(temp_buffer, op=op)
buffer[indices] = temp_buffer[indices]
Copy link

Copilot AI Oct 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a temporary buffer the size of the full buffer for partial updates is inefficient for memory usage. Consider implementing a gather-reduce-scatter pattern or using all_gather followed by local reduction to minimize memory overhead, especially for large buffers with small index subsets.

Suggested change
# Create a temporary buffer for the indexed elements
temp_buffer = torch.zeros_like(buffer)
temp_buffer[indices] = buffer[indices]
dist.all_reduce(temp_buffer, op=op)
buffer[indices] = temp_buffer[indices]
# Efficient gather-reduce-scatter for partial index updates
local_part = buffer[indices].clone()
gathered_parts = [torch.zeros_like(local_part) for _ in range(dist.get_world_size())]
dist.all_gather(gathered_parts, local_part)
# Perform reduction locally
stacked = torch.stack(gathered_parts, dim=0)
if op == dist.ReduceOp.SUM:
reduced = stacked.sum(dim=0)
elif op == dist.ReduceOp.PRODUCT:
reduced = stacked.prod(dim=0)
elif op == dist.ReduceOp.MIN:
reduced, _ = stacked.min(dim=0)
elif op == dist.ReduceOp.MAX:
reduced, _ = stacked.max(dim=0)
else:
raise NotImplementedError(f"Unsupported reduction op: {op}")
buffer[indices] = reduced

Copilot uses AI. Check for mistakes.
@mdw771
Copy link
Collaborator

mdw771 commented Oct 18, 2025

Thanks for the patch! Do you have the full traceback where this error occurred? The use sync_buffer is currently all for reconstruction parameters which should always have the same shape across ranks, so I'd like to know how this could occur.

@a4894z
Copy link
Collaborator Author

a4894z commented Oct 21, 2025

See attached for full traceback, I'm using

options.reconstructor_options.random_seed = round( int(datetime.datetime.now().strftime('%H%M%S')))

for the code in the new branch you modified the random seed stuff in:

def build(self):
    self.build_default_device()
    self.build_random_seed()
    self.build_default_dtype()
    self.build_logger()
    self.build_data()
    self.build_object()
    self.build_probe()
    self.build_probe_positions()
    self.build_opr_mode_weights()
    self.build_reconstructor()

def build_random_seed(self):
    if self.reconstructor_options.random_seed is not None or self.n_ranks > 1:
        seed = self.reconstructor_options.random_seed or 42
        torch.manual_seed(seed)
        np.random.seed(seed)
        random.seed(seed)
    pmath.set_allow_nondeterministic_algorithms(self.reconstructor_options.allow_nondeterministic_algorithms)

full_traceback.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants