Skip to content

Commit 61812c3

Browse files
Improve async memory usage (apt-sim#344)
This PR passes the debug levels properly to the async particle transport again, now `adept/verbose` yields good debug printouts in the async mode. The handling of the GPUsteps is improved: instead of copying out the full vector and making a shared pointer from each thread point to it (this could cause issues if one worker was already done with its work, not releasing the buffer), now just the steps that need to be processed by that worker are copied to the queue of that worker. Note that this is not the final implementation, since it is still too slow and the GPU can easily run out of hitslots because the buffer is not ready yet to be swapped back in, therefore a helper function is left in the code and a few changes are not done to allow for flexibility to test out the other approaches. Nonetheless, I already opened this PR since the OOM crash when one worker finishes earlier than the rest might be relevant right away.
1 parent a0ac9bb commit 61812c3

File tree

4 files changed

+92
-40
lines changed

4 files changed

+92
-40
lines changed

include/AdePT/core/AsyncAdePTTransport.cuh

+30-22
Original file line numberDiff line numberDiff line change
@@ -585,9 +585,18 @@ void HitProcessingLoop(HitProcessingContext *const context, GPUstate &gpuState,
585585
std::unique_lock lock(context->mutex);
586586
context->cv.wait(lock);
587587

588+
// Possible timing
589+
// auto start = std::chrono::high_resolution_clock::now();
588590
gpuState.fHitScoring->TransferHitsToHost(context->hitTransferStream);
589591
const bool haveNewHits = gpuState.fHitScoring->ProcessHits();
590592

593+
// auto end = std::chrono::high_resolution_clock::now();
594+
// std::chrono::duration<double> elapsed = end - start;
595+
596+
// if (haveNewHits) {
597+
// std::cout << "HIT Processing time: " << elapsed.count() << " seconds" << std::endl;
598+
// }
599+
591600
if (haveNewHits) {
592601
AdvanceEventStates(EventState::FlushingHits, EventState::HitsFlushed, eventStates);
593602
cvG4Workers.notify_all();
@@ -597,7 +606,7 @@ void HitProcessingLoop(HitProcessingContext *const context, GPUstate &gpuState,
597606

598607
void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer &trackBuffer, GPUstate &gpuState,
599608
std::vector<std::atomic<EventState>> &eventStates, std::condition_variable &cvG4Workers,
600-
std::vector<AdePTScoring> &scoring, int adeptSeed)
609+
std::vector<AdePTScoring> &scoring, int adeptSeed, int debugLevel)
601610
{
602611
// NVTXTracer tracer{"TransportLoop"};
603612

@@ -661,10 +670,9 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
661670
std::this_thread::sleep_for(10ms);
662671
}
663672

664-
// TODO: Pass debug level here
665-
// if (fDebugLevel > 2) {
666-
// G4cout << "GPU transport starting" << std::endl;
667-
// }
673+
if (debugLevel > 2) {
674+
G4cout << "GPU transport starting" << std::endl;
675+
}
668676

669677
COPCORE_CUDA_CHECK(cudaStreamSynchronize(gpuState.stream));
670678

@@ -727,8 +735,7 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
727735
const auto nInject = std::min(toDevice.nTrack.load(), toDevice.maxTracks);
728736
toDevice.nTrack = 0;
729737

730-
// TODO: Pass debug level here
731-
// if (fDebugLevel > 3) std::cout << "Injecting " << nInject << " to GPU\n";
738+
if (debugLevel > 3) std::cout << "Injecting " << nInject << " to GPU\n";
732739

733740
// copy buffer of tracks to device
734741
COPCORE_CUDA_CHECK(cudaMemcpyAsync(trackBuffer.toDevice_dev.get(), toDevice.tracks,
@@ -963,8 +970,10 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
963970
hitProcessing->cv.notify_one();
964971
} else {
965972
if (gpuState.stats->hitBufferOccupancy >= gpuState.fHitScoring->HitCapacity() / 2 ||
966-
std::any_of(eventStates.begin(), eventStates.end(),
967-
[](const auto &state) { return state == EventState::RequestHitFlush; })) {
973+
gpuState.stats->hitBufferOccupancy >= 10000 ||
974+
std::any_of(eventStates.begin(), eventStates.end(), [](const auto &state) {
975+
return state.load(std::memory_order_acquire) == EventState::RequestHitFlush;
976+
})) {
968977
AdvanceEventStates(EventState::RequestHitFlush, EventState::FlushingHits, eventStates);
969978
gpuState.fHitScoring->SwapDeviceBuffers(gpuState.stream);
970979
hitProcessing->cv.notify_one();
@@ -978,10 +987,7 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
978987
cvG4Workers.notify_all();
979988
}
980989

981-
// TODO: get fDebugLevel correctly and put prints back in.
982-
int fDebugLevel = 0;
983-
int fNThread = numThreads;
984-
if (fDebugLevel >= 3 && inFlight > 0 || (fDebugLevel >= 2 && iteration % 500 == 0)) {
990+
if (debugLevel >= 3 && inFlight > 0 || (debugLevel >= 2 && iteration % 500 == 0)) {
985991
std::cerr << inFlight << " in flight ";
986992
std::cerr << "(" << gpuState.stats->inFlight[ParticleType::Electron] << " "
987993
<< gpuState.stats->inFlight[ParticleType::Positron] << " "
@@ -992,10 +998,11 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
992998
std::cerr << "\t slots:" << gpuState.stats->slotFillLevel << ", " << numLeaked << " leaked."
993999
<< "\tInjectState: " << static_cast<unsigned int>(gpuState.injectState.load())
9941000
<< "\tExtractState: " << static_cast<unsigned int>(gpuState.extractState.load())
995-
<< "\tHitBuffer: " << gpuState.stats->hitBufferOccupancy;
996-
if (fDebugLevel >= 4) {
1001+
<< "\tHitBuffer: " << gpuState.stats->hitBufferOccupancy
1002+
<< "\tHitBufferReadyToSwap: " << gpuState.fHitScoring->ReadyToSwapBuffers();
1003+
if (debugLevel >= 4) {
9971004
std::cerr << "\n\tper event: ";
998-
for (unsigned int i = 0; i < fNThread; ++i) {
1005+
for (unsigned int i = 0; i < numThreads; ++i) {
9991006
std::cerr << i << ": " << gpuState.stats->perEventInFlight[i]
10001007
<< " (s=" << static_cast<unsigned short>(eventStates[i].load(std::memory_order_acquire)) << ")\t";
10011008
}
@@ -1041,8 +1048,7 @@ void TransportLoop(int trackCapacity, int scoringCapacity, int numThreads, Track
10411048
ClearAllQueues<<<1, 1, 0, gpuState.stream>>>(queues);
10421049
COPCORE_CUDA_CHECK(cudaStreamSynchronize(gpuState.stream));
10431050

1044-
// TODO
1045-
// if (fDebugLevel > 2) std::cout << "End transport loop.\n";
1051+
if (debugLevel > 2) std::cout << "End transport loop.\n";
10461052
}
10471053

10481054
hitProcessing->keepRunning = false;
@@ -1063,11 +1069,13 @@ std::shared_ptr<const std::vector<GPUHit>> GetGPUHits(unsigned int threadId, GPU
10631069
// separate init function that will compile here and be called from the .icc
10641070
std::thread LaunchGPUWorker(int trackCapacity, int scoringCapacity, int numThreads, TrackBuffer &trackBuffer,
10651071
GPUstate &gpuState, std::vector<std::atomic<EventState>> &eventStates,
1066-
std::condition_variable &cvG4Workers, std::vector<AdePTScoring> &scoring, int adeptSeed)
1072+
std::condition_variable &cvG4Workers, std::vector<AdePTScoring> &scoring, int adeptSeed,
1073+
int debugLevel)
10671074
{
1068-
return std::thread{&TransportLoop, trackCapacity, scoringCapacity, numThreads,
1069-
std::ref(trackBuffer), std::ref(gpuState), std::ref(eventStates), std::ref(cvG4Workers),
1070-
std::ref(scoring), adeptSeed};
1075+
return std::thread{
1076+
&TransportLoop, trackCapacity, scoringCapacity, numThreads, std::ref(trackBuffer),
1077+
std::ref(gpuState), std::ref(eventStates), std::ref(cvG4Workers), std::ref(scoring), adeptSeed,
1078+
debugLevel};
10711079
}
10721080

10731081
void FreeGPU(std::unique_ptr<AsyncAdePT::GPUstate, AsyncAdePT::GPUstateDeleter> &gpuState, G4HepEmState &g4hepem_state,

include/AdePT/core/AsyncAdePTTransport.hh

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private:
4444
unsigned short fNThread{0}; ///< Number of G4 workers
4545
unsigned int fTrackCapacity{0}; ///< Number of track slots to allocate on device
4646
unsigned int fScoringCapacity{0}; ///< Number of hit slots to allocate on device
47-
int fDebugLevel{1}; ///< Debug level
47+
int fDebugLevel{0}; ///< Debug level
4848
int fCUDAStackLimit{0}; ///< CUDA device stack limit
4949
std::vector<IntegrationLayer> fIntegrationLayerObjects;
5050
std::unique_ptr<GPUstate, GPUstateDeleter> fGPUstate{nullptr}; ///< CUDA state placeholder

include/AdePT/core/AsyncAdePTTransport.icc

+7-11
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ void FlushScoring(AdePTScoring &);
4646
std::shared_ptr<const std::vector<GPUHit>> GetGPUHits(unsigned int, AsyncAdePT::GPUstate &);
4747
std::thread LaunchGPUWorker(int, int, int, AsyncAdePT::TrackBuffer &, AsyncAdePT::GPUstate &,
4848
std::vector<std::atomic<AsyncAdePT::EventState>> &, std::condition_variable &,
49-
std::vector<AdePTScoring> &, int);
49+
std::vector<AdePTScoring> &, int, int);
5050
std::unique_ptr<AsyncAdePT::GPUstate, AsyncAdePT::GPUstateDeleter> InitializeGPU(int trackCapacity, int scoringCapacity,
5151
int numThreads,
5252
AsyncAdePT::TrackBuffer &trackBuffer,
@@ -79,10 +79,10 @@ template <typename IntegrationLayer>
7979
AsyncAdePTTransport<IntegrationLayer>::AsyncAdePTTransport(AdePTConfiguration &configuration)
8080
: fNThread{(ushort)configuration.GetNumThreads()},
8181
fTrackCapacity{(uint)(1024 * 1024 * configuration.GetMillionsOfTrackSlots())},
82-
fScoringCapacity{(uint)(1024 * 1024 * configuration.GetMillionsOfHitSlots())}, fDebugLevel{0},
83-
fIntegrationLayerObjects(fNThread), fEventStates(fNThread), fGPUNetEnergy(fNThread, 0.0),
84-
fTrackInAllRegions{configuration.GetTrackInAllRegions()}, fGPURegionNames{configuration.GetGPURegionNames()},
85-
fCUDAStackLimit{configuration.GetCUDAStackLimit()}
82+
fScoringCapacity{(uint)(1024 * 1024 * configuration.GetMillionsOfHitSlots())},
83+
fDebugLevel{configuration.GetVerbosity()}, fIntegrationLayerObjects(fNThread), fEventStates(fNThread),
84+
fGPUNetEnergy(fNThread, 0.0), fTrackInAllRegions{configuration.GetTrackInAllRegions()},
85+
fGPURegionNames{configuration.GetGPURegionNames()}, fCUDAStackLimit{configuration.GetCUDAStackLimit()}
8686
{
8787
if (fNThread > kMaxThreads)
8888
throw std::invalid_argument("AsyncAdePTTransport limited to " + std::to_string(kMaxThreads) + " threads");
@@ -233,7 +233,7 @@ void AsyncAdePTTransport<IntegrationLayer>::Initialize()
233233

234234
fGPUstate = async_adept_impl::InitializeGPU(fTrackCapacity, fScoringCapacity, fNThread, *fBuffer, fScoring);
235235
fGPUWorker = async_adept_impl::LaunchGPUWorker(fTrackCapacity, fScoringCapacity, fNThread, *fBuffer, *fGPUstate,
236-
fEventStates, fCV_G4Workers, fScoring, fAdePTSeed);
236+
fEventStates, fCV_G4Workers, fScoring, fAdePTSeed, fDebugLevel);
237237
}
238238

239239
template <typename IntegrationLayer>
@@ -263,11 +263,7 @@ void AsyncAdePTTransport<IntegrationLayer>::Flush(G4int threadId, G4int eventId)
263263

264264
std::shared_ptr<const std::vector<GPUHit>> gpuHits;
265265
while ((gpuHits = async_adept_impl::GetGPUHits(threadId, *fGPUstate)) != nullptr) {
266-
GPUHit dummy;
267-
dummy.fEventId = eventId;
268-
auto range = std::equal_range(gpuHits->begin(), gpuHits->end(), dummy,
269-
[](const GPUHit &lhs, const GPUHit &rhs) { return lhs.fEventId < rhs.fEventId; });
270-
for (auto it = range.first; it != range.second; ++it) {
266+
for (auto it = gpuHits->begin(); it != gpuHits->end(); ++it) {
271267
assert(it->threadId == threadId);
272268
integrationInstance.ProcessGPUHit(*it);
273269
}

include/AdePT/core/PerEventScoringImpl.cuh

+54-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
// Comparison for sorting tracks into events on device:
2626
struct CompareGPUHits {
27-
__device__ bool operator()(const GPUHit &lhs, const GPUHit &rhs) const { return lhs.fEventId < rhs.fEventId; }
27+
__device__ bool operator()(const GPUHit &lhs, const GPUHit &rhs) const { return lhs.threadId < rhs.threadId; }
2828
};
2929

3030
namespace AsyncAdePT {
@@ -71,18 +71,58 @@ class HitScoring {
7171
std::vector<std::deque<std::shared_ptr<const std::vector<GPUHit>>>> fHitQueues;
7272
mutable std::shared_mutex fProcessingHitsMutex;
7373

74+
using GPUHitVectorPtr = std::shared_ptr<const std::vector<GPUHit>>;
75+
using HitDeque = std::deque<GPUHitVectorPtr>;
76+
using HitQueueVector = std::vector<HitDeque>;
77+
78+
inline size_t calculateMemoryUsage(const HitQueueVector &fHitQueues)
79+
{
80+
size_t totalMemory = 0;
81+
82+
for (const auto &dq : fHitQueues) {
83+
for (const auto &ptr : dq) {
84+
if (ptr) {
85+
totalMemory += sizeof(*ptr);
86+
totalMemory += ptr->size() * sizeof(GPUHit); // Actual GPUHit data
87+
}
88+
}
89+
}
90+
return totalMemory;
91+
}
92+
7493
void ProcessBuffer(BufferHandle &handle)
7594
{
7695
// We are assuming that the caller holds a lock on fProcessingHitsMutex.
7796
if (handle.state == BufferHandle::State::NeedHostProcessing) {
78-
auto hitVector = std::make_shared<std::vector<GPUHit>>();
79-
hitVector->assign(handle.hostBuffer, handle.hostBuffer + handle.hitScoringInfo.fSlotCounter);
97+
98+
// std::cout << "Total Memory Used in fHitQueues: " << calculateMemoryUsage(fHitQueues) / 1024.0 / 1024.0 / 1024.0
99+
// << " GB" << std::endl;
100+
auto begin = handle.hostBuffer;
101+
auto end = handle.hostBuffer + handle.hitScoringInfo.fSlotCounter;
102+
103+
while (begin != end) {
104+
short threadId = begin->threadId; // Get threadId of first hit in the range
105+
106+
// linear search, slower, doesn't require a sorted array
107+
// auto threadEnd = std::find_if(begin, end,
108+
// [threadId](const GPUHit &hit) { return threadId != hit.threadId; });
109+
110+
// binary search, faster but requires a sorted array
111+
auto threadEnd =
112+
std::upper_bound(begin, end, threadId, [](short id, const GPUHit &hit) { return id < hit.threadId; });
113+
114+
// Copy hits into a unique pointer and push it to workers queue
115+
auto HitsPerThread = std::make_unique<std::vector<GPUHit>>(begin, threadEnd);
116+
fHitQueues[threadId].push_back(std::move(HitsPerThread));
117+
118+
begin = threadEnd; // set begin to start of the threadId
119+
}
120+
80121
handle.hitScoringInfo.fSlotCounter = 0;
81122
handle.state = BufferHandle::State::Free;
82123

83-
for (auto &hitQueue : fHitQueues) {
84-
hitQueue.push_back(hitVector);
85-
}
124+
// std::cout << "After pushing hitVector: Total Memory Used in fHitQueues: " << calculateMemoryUsage(fHitQueues)
125+
// / 1024.0 / 1024.0 / 1024.0 << " GB" << std::endl;
86126
}
87127
}
88128

@@ -163,7 +203,15 @@ public:
163203
if (handle.state == BufferHandle::State::NeedHostProcessing) {
164204
if (!lock) lock.lock();
165205
haveNewHits = true;
206+
207+
// Possible timing
208+
// auto start = std::chrono::high_resolution_clock::now();
166209
ProcessBuffer(handle);
210+
// auto end = std::chrono::high_resolution_clock::now();
211+
// std::chrono::duration<double> elapsed = end - start;
212+
// std::cout << "BUFFER Processing time: " << elapsed.count() << " seconds" << std::endl;
213+
214+
// lock.unlock();
167215
}
168216
}
169217
}

0 commit comments

Comments
 (0)