From f39670e6b10791a24b7e208f3acfbe703d910f0b Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Tue, 28 Jan 2025 10:49:50 -0500 Subject: [PATCH 1/4] added Pause option into minor compactions --- .../tserver/tablet/MinorCompactionTask.java | 25 +++++++++++++++++++ .../accumulo/tserver/tablet/Tablet.java | 11 ++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java index 3b3a838f56b..edc58c06ace 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java @@ -18,6 +18,10 @@ */ package org.apache.accumulo.tserver.tablet; +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -41,6 +45,8 @@ class MinorCompactionTask implements Runnable { private final long flushId; private final MinorCompactionReason mincReason; + private final long pauseLimit; + MinorCompactionTask(Tablet tablet, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { this.tablet = tablet; @@ -49,6 +55,17 @@ class MinorCompactionTask implements Runnable { this.commitSession = commitSession; this.flushId = flushId; this.mincReason = mincReason; + this.pauseLimit = tablet.getContext().getTableConfiguration(tablet.extent.tableId()) + .getCount(Property.TABLE_FILE_PAUSE); + } + + private static void checkMinorCompactionFiles(String tableId, long pauseLimit, + long currentFileCount) throws AcceptableThriftTableOperationException { + if (pauseLimit > 0 && currentFileCount > pauseLimit) { + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.COMPACT, + TableOperationExceptionType.OTHER, "Attempted to perform minor compaction with " + + currentFileCount + " files, exceeding the configured limit of " + pauseLimit); + } } @Override @@ -56,6 +73,14 @@ public void run() { tablet.minorCompactionStarted(); try { Span span = TraceUtil.startSpan(this.getClass(), "minorCompaction"); + try { + long currentFileCount = tablet.getTabletMemory().getNumEntries(); + checkMinorCompactionFiles(tablet.extent.tableId().canonical(), pauseLimit, + currentFileCount); // Add pause check here + } catch (AcceptableThriftTableOperationException e) { + log.warn("Minor compaction paused due to file count for tablet {}", tablet.extent); + return; + } try (Scope scope = span.makeCurrent()) { Span span2 = TraceUtil.startSpan(this.getClass(), "waitForCommits"); try (Scope scope2 = span2.makeCurrent()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 549331b247a..ae1673d0507 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -382,7 +382,8 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil boolean failed = false; long start = System.currentTimeMillis(); timer.incrementStatusMinor(); - + long pauseLimit = + getContext().getTableConfiguration(extent.tableId()).getCount(Property.TABLE_FILE_PAUSE); long count = 0; String oldName = Thread.currentThread().getName(); @@ -392,7 +393,13 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil Span span = TraceUtil.startSpan(this.getClass(), "minorCompact::write"); try (Scope scope = span.makeCurrent()) { count = memTable.getNumEntries(); - + if (count > pauseLimit && pauseLimit > 0) { + log.debug( + "tablet {} has {} entries, which exceeds the pause limit of {}, pausing minor compaction.", + this.extent, count, pauseLimit); + failed = true; // Set the failed flag + return null; + } MinorCompactor compactor = new MinorCompactor(tabletServer, this, memTable, tmpDatafile, mincReason, tableConfiguration); stats = compactor.call(); From 6a3fe920c7bee754dd4006dc7406450917aa742b Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Tue, 18 Mar 2025 11:22:02 -0400 Subject: [PATCH 2/4] Moved pause check to createMinorCompactionTask() --- .../tserver/tablet/MinorCompactionTask.java | 20 ------------------ .../accumulo/tserver/tablet/Tablet.java | 21 +++++++------------ 2 files changed, 7 insertions(+), 34 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java index edc58c06ace..2326f37c965 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java @@ -18,9 +18,6 @@ */ package org.apache.accumulo.tserver.tablet; -import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; -import org.apache.accumulo.core.clientImpl.thrift.TableOperation; -import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.metadata.ReferencedTabletFile; @@ -59,28 +56,11 @@ class MinorCompactionTask implements Runnable { .getCount(Property.TABLE_FILE_PAUSE); } - private static void checkMinorCompactionFiles(String tableId, long pauseLimit, - long currentFileCount) throws AcceptableThriftTableOperationException { - if (pauseLimit > 0 && currentFileCount > pauseLimit) { - throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.COMPACT, - TableOperationExceptionType.OTHER, "Attempted to perform minor compaction with " - + currentFileCount + " files, exceeding the configured limit of " + pauseLimit); - } - } - @Override public void run() { tablet.minorCompactionStarted(); try { Span span = TraceUtil.startSpan(this.getClass(), "minorCompaction"); - try { - long currentFileCount = tablet.getTabletMemory().getNumEntries(); - checkMinorCompactionFiles(tablet.extent.tableId().canonical(), pauseLimit, - currentFileCount); // Add pause check here - } catch (AcceptableThriftTableOperationException e) { - log.warn("Minor compaction paused due to file count for tablet {}", tablet.extent); - return; - } try (Scope scope = span.makeCurrent()) { Span span2 = TraceUtil.startSpan(this.getClass(), "waitForCommits"); try (Scope scope2 = span2.makeCurrent()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 76ae61b13e0..0225e6d43bc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -384,8 +384,6 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil boolean failed = false; long start = System.currentTimeMillis(); timer.incrementStatusMinor(); - long pauseLimit = - getContext().getTableConfiguration(extent.tableId()).getCount(Property.TABLE_FILE_PAUSE); long count = 0; String oldName = Thread.currentThread().getName(); @@ -395,13 +393,6 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil Span span = TraceUtil.startSpan(this.getClass(), "minorCompact::write"); try (Scope scope = span.makeCurrent()) { count = memTable.getNumEntries(); - if (count > pauseLimit && pauseLimit > 0) { - log.debug( - "tablet {} has {} entries, which exceeds the pause limit of {}, pausing minor compaction.", - this.extent, count, pauseLimit); - failed = true; // Set the failed flag - return null; - } MinorCompactor compactor = new MinorCompactor(tabletServer, this, memTable, tmpDatafile, mincReason, tableConfiguration); stats = compactor.call(); @@ -592,7 +583,8 @@ private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) { MinorCompactionTask mct; long t1, t2; - + long pauseLimit = + getContext().getTableConfiguration(extent.tableId()).getCount(Property.TABLE_FILE_PAUSE); StringBuilder logMessage = null; try { @@ -600,7 +592,8 @@ private MinorCompactionTask createMinorCompactionTask(long flushId, t1 = System.currentTimeMillis(); if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC() - || getTabletMemory().getMemTable().getNumEntries() == 0 || updatingFlushID) { + || updatingFlushID || (mincReason == MinorCompactionReason.SYSTEM + && getDatafiles().size() >= pauseLimit)) { logMessage = new StringBuilder(); @@ -610,9 +603,9 @@ private MinorCompactionTask createMinorCompactionTask(long flushId, logMessage.append(" tabletMemory.memoryReservedForMinC() " + getTabletMemory().memoryReservedForMinC()); } - if (getTabletMemory() != null && getTabletMemory().getMemTable() != null) { - logMessage.append(" tabletMemory.getMemTable().getNumEntries() " - + getTabletMemory().getMemTable().getNumEntries()); + if (mincReason == MinorCompactionReason.SYSTEM) { + logMessage.append(" fileCount " + getDatafiles().size()); + logMessage.append(" fileCountPauseLimit " + pauseLimit); } logMessage.append(" updatingFlushID " + updatingFlushID); From 2de2a8323b4cf78c1a47a7b63f77bae65c53d92f Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Mon, 7 Apr 2025 18:49:57 -0400 Subject: [PATCH 3/4] added changes --- .../apache/accumulo/tserver/tablet/MinorCompactionTask.java | 5 ----- .../java/org/apache/accumulo/tserver/tablet/Tablet.java | 6 +++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java index 2326f37c965..3b3a838f56b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactionTask.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.tserver.tablet; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -42,8 +41,6 @@ class MinorCompactionTask implements Runnable { private final long flushId; private final MinorCompactionReason mincReason; - private final long pauseLimit; - MinorCompactionTask(Tablet tablet, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { this.tablet = tablet; @@ -52,8 +49,6 @@ class MinorCompactionTask implements Runnable { this.commitSession = commitSession; this.flushId = flushId; this.mincReason = mincReason; - this.pauseLimit = tablet.getContext().getTableConfiguration(tablet.extent.tableId()) - .getCount(Property.TABLE_FILE_PAUSE); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 0225e6d43bc..7c988f6928d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -603,7 +603,11 @@ && getDatafiles().size() >= pauseLimit)) { logMessage.append(" tabletMemory.memoryReservedForMinC() " + getTabletMemory().memoryReservedForMinC()); } - if (mincReason == MinorCompactionReason.SYSTEM) { + if (getTabletMemory() != null && getTabletMemory().getMemTable() != null) { + logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + + getTabletMemory().getMemTable().getNumEntries()); + } + if (mincReason == MinorCompactionReason.SYSTEM && getDatafiles().size() >= pauseLimit) { logMessage.append(" fileCount " + getDatafiles().size()); logMessage.append(" fileCountPauseLimit " + pauseLimit); } From 05f2c72194d6b5e21780848d4c9efb31db6785ca Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 1 May 2025 18:14:12 -0400 Subject: [PATCH 4/4] fixed conditional --- .../main/java/org/apache/accumulo/tserver/tablet/Tablet.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 7c988f6928d..e0f47f0ac34 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -592,7 +592,8 @@ private MinorCompactionTask createMinorCompactionTask(long flushId, t1 = System.currentTimeMillis(); if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC() - || updatingFlushID || (mincReason == MinorCompactionReason.SYSTEM + || getTabletMemory().getMemTable().getNumEntries() == 0 || updatingFlushID + || (mincReason == MinorCompactionReason.SYSTEM && getDatafiles().size() >= pauseLimit)) { logMessage = new StringBuilder();