Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions cmd/syncThrottler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ const (
goroutineReleaseThreshold = 85 // Release at 85% of goroutine limit

// Defaults
defaultPhysicalMemoryGB uint64 = 8 // Default physical memory in GB, used if sysinfo fails
defaultNumCores int32 = 4 // Default number of CPU cores, used if runtime.NumCPU() fails
defaultPhysicalMemoryGB uint64 = 8 // Default physical memory in GB, used if sysinfo fails
defaultPhysicalMemoryBytes uint64 = 8 * 1024 * 1024 * 1024 // Default physical memory in bytes, used if sysinfo fails
defaultNumCores int32 = 4 // Default number of CPU cores, used if runtime.NumCPU() fails

// Traversal concurrency settings
defaultTargetSlotRatio float64 = 0.25 // Target slots = 25% of source slots by default
Expand Down Expand Up @@ -144,7 +145,17 @@ func initializeLimits(orchestratorOptions *SyncOrchestratorOptions) {
syncOrchestratorLog(
common.LogWarning,
fmt.Sprintf("Failed to get total physical memory: %v. Using default - 8GB", err))
memory = int64(defaultPhysicalMemoryGB)
memory = int64(defaultPhysicalMemoryBytes)
} else {
syncOrchestratorLog(common.LogInfo, fmt.Sprintf("Total physical memory: %d bytes", memory))

// We noticed it can be 0 if dynamic memory is set on the virtual machine
if memory == 0 {
syncOrchestratorLog(
common.LogWarning,
"Total physical memory returned as 0 bytes. Using default - 8GB")
memory = int64(defaultPhysicalMemoryBytes)
}
}

memoryGB := memory / gbToBytesMultiplier // Convert to GB
Expand All @@ -158,7 +169,7 @@ func initializeLimits(orchestratorOptions *SyncOrchestratorOptions) {
crawlParallelism = orchestratorOptions.parallelTraversers
}

if maxDirectoryDirectChildCount > maxActiveFiles {
if maxDirectoryDirectChildCount > maxActiveFiles && maxActiveFiles > 0 {
syncOrchestratorLog(
common.LogWarning,
fmt.Sprintf("Max directory direct child count (%d) exceeds max active files (%d), adjusting to prevent OOM", maxDirectoryDirectChildCount, maxActiveFiles))
Expand Down Expand Up @@ -190,7 +201,15 @@ func initializeLimits(orchestratorOptions *SyncOrchestratorOptions) {
// --- END Throttling and Concurrency Configuration ---

func GetSafeParallelismLimit(maxActiveFiles, maxChildCount int64, fromTo common.FromTo) int32 {
limit := int32(max(maxActiveFiles/maxChildCount, 1))

limit := int32(100) // Force safe default
if maxActiveFiles <= 0 || maxChildCount <= 0 {
syncOrchestratorLog(
common.LogError,
"Max active files or max child count is non-positive, using safe default parallelism of 100")
} else {
limit = int32(max(maxActiveFiles/maxChildCount, 1))
}

switch fromTo {
case common.EFromTo.LocalBlob(), common.EFromTo.LocalBlobFS(), common.EFromTo.LocalFile():
Expand Down