From c68d779b60e5ad8fb3301733eb7028eaf986d458 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 14 Oct 2025 11:42:01 -0400 Subject: [PATCH 1/9] Move bin ranges to InMemoryTableOperations --- .../accumulo/inmemory/InMemoryTableOperations.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 7c879b2305d..a23d9635c75 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,12 +49,14 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -560,11 +562,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab @Override public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Map>> binnedRanges = new HashMap<>(); - InMemoryTabletLocator locator = new InMemoryTabletLocator(); - List ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges); + List ignore = binRanges(new ArrayList<>(ranges), binnedRanges); return new LocationsImpl(binnedRanges); } + private List binRanges(ArrayList ranges, Map>> binnedRanges) { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); + } + private static class LocationsImpl implements Locations { private Map> groupedByRanges; From f69071c6ff090f68637b8d2b173257c660f0e4a2 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 20 Oct 2025 12:30:48 -0400 Subject: [PATCH 2/9] Remove unused imports --- .../datawave/accumulo/inmemory/InMemoryTableOperations.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index a23d9635c75..26fab5f2dcc 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,7 +49,6 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; @@ -79,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; - public class InMemoryTableOperations extends TableOperationsHelper { private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class); private static final byte[] ZERO = {0}; From 82423b319497a774ba84d177173ff0befff12a0c Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 21 Oct 2025 11:31:49 -0400 Subject: [PATCH 3/9] wip? --- .../datawave/mr/bulk/BulkInputFormat.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..26e49264b34 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -9,6 +9,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -1047,29 +1048,36 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); +// /** +// * Initializes an Accumulo {@link TabletLocator} based on the configuration. +// * +// * @param conf +// * the Hadoop configuration object +// * @return an accumulo tablet locator +// * @throws TableNotFoundException +// * if the table name set on the configuration doesn't exist +// * @throws IOException +// * if the input format is unable to read the password file from the FileSystem +// */ +// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { +// if (conf.getBoolean(MOCK, false)) +// return new InMemoryTabletLocator(); +// String tableName = getTablename(conf); +// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) +// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); +// ClientInfo info = ClientInfo.from(props); +// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), +// Threads.UEH); +// return TabletLocator.getLocator(context, context.getTableId(tableName)); +// } + + public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); } + /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1090,7 +1098,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; +// TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1102,13 +1110,13 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); +// tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); +// tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + while (!binRanges(context, ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) tableId = context.getTableId(tableName); @@ -1120,7 +1128,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); +// tl.invalidateCache(); } clipRanges(binnedRanges); From 63b1c5fdd13a33140eb8ef488135e388eba095ad Mon Sep 17 00:00:00 2001 From: Seth Date: Thu, 23 Oct 2025 16:06:17 -0400 Subject: [PATCH 4/9] update --- .../datawave/mr/bulk/BulkInputFormat.java | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 26e49264b34..a4fec671af5 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -41,7 +40,6 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -84,7 +82,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; @@ -1048,36 +1045,35 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } -// /** -// * Initializes an Accumulo {@link TabletLocator} based on the configuration. -// * -// * @param conf -// * the Hadoop configuration object -// * @return an accumulo tablet locator -// * @throws TableNotFoundException -// * if the table name set on the configuration doesn't exist -// * @throws IOException -// * if the input format is unable to read the password file from the FileSystem -// */ -// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { -// if (conf.getBoolean(MOCK, false)) -// return new InMemoryTabletLocator(); -// String tableName = getTablename(conf); -// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) -// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); -// ClientInfo info = ClientInfo.from(props); -// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), -// Threads.UEH); -// return TabletLocator.getLocator(context, context.getTableId(tableName)); -// } + // /** + // * Initializes an Accumulo {@link TabletLocator} based on the configuration. + // * + // * @param conf + // * the Hadoop configuration object + // * @return an accumulo tablet locator + // * @throws TableNotFoundException + // * if the table name set on the configuration doesn't exist + // * @throws IOException + // * if the input format is unable to read the password file from the FileSystem + // */ + // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { + // if (conf.getBoolean(MOCK, false)) + // return new InMemoryTabletLocator(); + // String tableName = getTablename(conf); + // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) + // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); + // ClientInfo info = ClientInfo.from(props); + // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), + // Threads.UEH); + // return TabletLocator.getLocator(context, context.getTableId(tableName)); + // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); } - /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1098,7 +1094,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); -// TabletLocator tl; + // TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1110,9 +1106,9 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; -// tl = getTabletLocator(job.getConfiguration()); + // tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it -// tl.invalidateCache(); + // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1128,7 +1124,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); -// tl.invalidateCache(); + // tl.invalidateCache(); } clipRanges(binnedRanges); From 3f84d0c480290d238a1141505339b9fcccfbda5d Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 24 Oct 2025 14:02:06 -0400 Subject: [PATCH 5/9] wip --- .../datawave/mr/bulk/BulkInputFormat.java | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index a4fec671af5..4219c66c409 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,28 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - // /** - // * Initializes an Accumulo {@link TabletLocator} based on the configuration. - // * - // * @param conf - // * the Hadoop configuration object - // * @return an accumulo tablet locator - // * @throws TableNotFoundException - // * if the table name set on the configuration doesn't exist - // * @throws IOException - // * if the input format is unable to read the password file from the FileSystem - // */ - // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - // if (conf.getBoolean(MOCK, false)) - // return new InMemoryTabletLocator(); - // String tableName = getTablename(conf); - // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - // ClientInfo info = ClientInfo.from(props); - // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - // Threads.UEH); - // return TabletLocator.getLocator(context, context.getTableId(tableName)); - // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -1094,7 +1072,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - // TabletLocator tl; + try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1106,9 +1084,6 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - // tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1124,7 +1099,6 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - // tl.invalidateCache(); } clipRanges(binnedRanges); @@ -1170,13 +1144,9 @@ public List getSplits(JobContext job) throws IOException { log.info("There are approximately {} values ", map.size()); for (RangeSplit split : map.keySet()) { - // Iterable> rangeIter = splitter.partition(map.get(split)); - // for (List rangeList : rangeIter) { - // RangeSplit newSplit = (RangeSplit) split.clone(); - // newSplit.addRanges(rangeList); + split.addRanges(map.get(split)); splits.add(split); - // } } From fd8e4d17c0c49119d44e2258d56f6393201395ce Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 27 Oct 2025 11:47:12 -0400 Subject: [PATCH 6/9] formatting?: --- .../core/src/main/java/datawave/mr/bulk/BulkInputFormat.java | 1 - 1 file changed, 1 deletion(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 4219c66c409..7b70c238a7a 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,7 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); From 9052b280b73c9b7d746383c873710440836d1878 Mon Sep 17 00:00:00 2001 From: Seth Date: Wed, 29 Oct 2025 10:15:39 -0400 Subject: [PATCH 7/9] attempt .getMajorCompactionCount replacement --- .../connection/AccumuloConnectionFactory.java | 17 ------- .../datawave/mr/bulk/BulkInputFormat.java | 26 ++++------- .../job/BulkIngestMapFileLoader.java | 44 +++++-------------- 3 files changed, 21 insertions(+), 66 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java index eb6f9ef2f99..44911be9bdc 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java @@ -110,21 +110,4 @@ AccumuloClient getClient(String userDN, Collection proxyServers, String * @return A map representation */ Map getTrackingMap(StackTraceElement[] stackTrace); - - /** - * Utility method to unwrap the ClientContext instance within {@link WrappedAccumuloClient} as needed - * - * @param accumuloClient - * {@link AccumuloClient} instance - * @return {@link WrappedAccumuloClient#getReal()}, if applicable; accumuloClient itself, if it implements {@link ClientContext}; otherwise returns null - */ - static ClientContext getClientContext(AccumuloClient accumuloClient) { - ClientContext cc = null; - if (accumuloClient instanceof WrappedAccumuloClient) { - cc = (ClientContext) ((WrappedAccumuloClient) accumuloClient).getReal(); - } else if (accumuloClient instanceof ClientContext) { - cc = (ClientContext) accumuloClient; - } - return cc; - } } diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 7b70c238a7a..988520da169 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -37,9 +37,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientConfConverter; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.ClientInfo; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -49,15 +47,12 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.VersioningIterator; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.util.format.DateFormatSupplier; -import org.apache.accumulo.core.util.threads.Threads; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -1045,7 +1040,7 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + public List binRanges(List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); @@ -1082,18 +1077,15 @@ public List getSplits(JobContext job) throws IOException { } } else { try (AccumuloClient client = getClient(job.getConfiguration())) { - TableId tableId = null; - ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, - ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!binRanges(context, ranges, binnedRanges).isEmpty()) { + String tableId = null; + while (!binRanges(ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) - tableId = context.getTableId(tableName); - if (!context.tableNodeExists(tableId)) - throw new TableDeletedException(tableId.canonical()); - if (context.getTableState(tableId) == TableState.OFFLINE) - throw new TableOfflineException("Table (" + tableId.canonical() + ") is offline"); + tableId = client.tableOperations().tableIdMap().get(tableName); + if (!client.tableOperations().exists(tableId)) + throw new TableDeletedException(tableId); + if (!client.tableOperations().isOnline(tableId)) + throw new TableOfflineException("Table (" + tableId + ") is offline"); } binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index a1ac89f1f08..2b09746602d 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -694,7 +694,7 @@ private Path distCpDirectory(Path jobDirectory) throws Exception { } /** - * Determines whether or not it is safe to bring map files online. This asks Accumulo for its stats for major compaction (running and queued), and will + * Determines whether it is safe to bring map files online. This asks Accumulo for its stats for major compaction (running and queued), and will * return false if either "too many" compactions are running/queued. * * @param lastOnlineTime @@ -703,42 +703,22 @@ private Path distCpDirectory(Path jobDirectory) throws Exception { * log info * @return boolean flag */ - public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) { + public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) throws AccumuloException, AccumuloSecurityException { Level level = (logInfo ? Level.INFO : Level.DEBUG); - int majC = getMajorCompactionCount(); - log.log(level, "There are " + majC + " compactions currently running or queued."); - long delta = System.currentTimeMillis() - lastOnlineTime; - log.log(level, "Time since map files last brought online: " + (delta / 1000) + "s"); - - return (delta > MAJC_WAIT_TIMEOUT) && (majC < MAJC_THRESHOLD); - } - - private int getMajorCompactionCount() { - int majC = 0; + // SETH NOTE: This is the only place that .getMajorCompactionCount() is used. + // It may be reasonable to remove the logic entirely and use another metric + // for deciding if we can bring the map files online. + // I'm deciding to replace it with a comparison against all active compactions, since that's + // at least as strict as all majC. + int majminC = accumuloClient.instanceOperations().getActiveCompactions().size(); - ManagerClientService.Client client = null; - ClientContext context = (ClientContext) accumuloClient; - try { - client = ThriftClientTypes.MANAGER.getConnection(context); - ManagerMonitorInfo mmi = client.getManagerStats(null, context.rpcCreds()); - Map tableStats = mmi.getTableMap(); + log.log(level, "There are " + majminC + " compactions currently running or queued."); - for (java.util.Map.Entry e : tableStats.entrySet()) { - majC += e.getValue().getMajors().getQueued(); - majC += e.getValue().getMajors().getRunning(); - } - } catch (Exception e) { - // Accumulo API changed, catch exception for now until we redeploy - // accumulo on lightning. - log.error("Unable to retrieve major compaction stats: " + e.getMessage()); - } finally { - if (client != null) { - ThriftUtil.close(client, context); - } - } + long delta = System.currentTimeMillis() - lastOnlineTime; + log.log(level, "Time since map files last brought online: " + (delta / 1000) + "s"); - return majC; + return (delta > MAJC_WAIT_TIMEOUT) && (majminC < MAJC_THRESHOLD); } /** From 31159024d06a1f53b54397664a8eb034b6ac1884 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 24 Nov 2025 13:06:21 -0500 Subject: [PATCH 8/9] Add notes --- .../ingest/mapreduce/job/BulkIngestMapFileLoader.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index 2b09746602d..105187d1f98 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -33,13 +33,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.LoadPlan; -import org.apache.accumulo.core.manager.thrift.ManagerClientService; -import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -711,6 +705,8 @@ public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) thro // for deciding if we can bring the map files online. // I'm deciding to replace it with a comparison against all active compactions, since that's // at least as strict as all majC. + + // You're just waiting on the updates on the Accumulo side, they said they were going to make a PR for this. int majminC = accumuloClient.instanceOperations().getActiveCompactions().size(); log.log(level, "There are " + majminC + " compactions currently running or queued."); From e7490a0d764b04e912746968bfffd2b0a721f12a Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 2 Dec 2025 11:09:23 -0500 Subject: [PATCH 9/9] wip --- .../job/BulkIngestMapFileLoader.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index 105187d1f98..94774086fd2 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -33,7 +33,13 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.manager.thrift.ManagerClientService; +import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -717,6 +723,33 @@ public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) thro return (delta > MAJC_WAIT_TIMEOUT) && (majminC < MAJC_THRESHOLD); } + private int getMajorCompactionCount() { + int majC = 0; + + ManagerClientService.Client client = null; + ClientContext context = (ClientContext) accumuloClient; + try { + client = ThriftClientTypes.MANAGER.getConnection(context); + ManagerMonitorInfo mmi = client.getManagerStats(null, context.rpcCreds()); + Map tableStats = mmi.getTableMap(); + + for (java.util.Map.Entry e : tableStats.entrySet()) { + majC += e.getValue().getMajors().getQueued(); + majC += e.getValue().getMajors().getRunning(); + } + } catch (Exception e) { + // Accumulo API changed, catch exception for now until we redeploy + // accumulo on lightning. + log.error("Unable to retrieve major compaction stats: " + e.getMessage()); + } finally { + if (client != null) { + ThriftUtil.close(client, context); + } + } + + return majC; + } + /** * Gets a list of job directories that are marked with pathPattern. *