From c68d779b60e5ad8fb3301733eb7028eaf986d458 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 14 Oct 2025 11:42:01 -0400 Subject: [PATCH 1/7] Move bin ranges to InMemoryTableOperations --- .../accumulo/inmemory/InMemoryTableOperations.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 7c879b2305d..a23d9635c75 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,12 +49,14 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -560,11 +562,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab @Override public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Map>> binnedRanges = new HashMap<>(); - InMemoryTabletLocator locator = new InMemoryTabletLocator(); - List ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges); + List ignore = binRanges(new ArrayList<>(ranges), binnedRanges); return new LocationsImpl(binnedRanges); } + private List binRanges(ArrayList ranges, Map>> binnedRanges) { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); + } + private static class LocationsImpl implements Locations { private Map> groupedByRanges; From f69071c6ff090f68637b8d2b173257c660f0e4a2 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 20 Oct 2025 12:30:48 -0400 Subject: [PATCH 2/7] Remove unused imports --- .../datawave/accumulo/inmemory/InMemoryTableOperations.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index a23d9635c75..26fab5f2dcc 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,7 +49,6 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; @@ -79,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; - public class InMemoryTableOperations extends TableOperationsHelper { private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class); private static final byte[] ZERO = {0}; From 82423b319497a774ba84d177173ff0befff12a0c Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 21 Oct 2025 11:31:49 -0400 Subject: [PATCH 3/7] wip? --- .../datawave/mr/bulk/BulkInputFormat.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..26e49264b34 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -9,6 +9,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -1047,29 +1048,36 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); +// /** +// * Initializes an Accumulo {@link TabletLocator} based on the configuration. +// * +// * @param conf +// * the Hadoop configuration object +// * @return an accumulo tablet locator +// * @throws TableNotFoundException +// * if the table name set on the configuration doesn't exist +// * @throws IOException +// * if the input format is unable to read the password file from the FileSystem +// */ +// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { +// if (conf.getBoolean(MOCK, false)) +// return new InMemoryTabletLocator(); +// String tableName = getTablename(conf); +// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) +// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); +// ClientInfo info = ClientInfo.from(props); +// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), +// Threads.UEH); +// return TabletLocator.getLocator(context, context.getTableId(tableName)); +// } + + public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); } + /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1090,7 +1098,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; +// TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1102,13 +1110,13 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); +// tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); +// tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + while (!binRanges(context, ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) tableId = context.getTableId(tableName); @@ -1120,7 +1128,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); +// tl.invalidateCache(); } clipRanges(binnedRanges); From 63b1c5fdd13a33140eb8ef488135e388eba095ad Mon Sep 17 00:00:00 2001 From: Seth Date: Thu, 23 Oct 2025 16:06:17 -0400 Subject: [PATCH 4/7] update --- .../datawave/mr/bulk/BulkInputFormat.java | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 26e49264b34..a4fec671af5 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -41,7 +40,6 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -84,7 +82,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; @@ -1048,36 +1045,35 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } -// /** -// * Initializes an Accumulo {@link TabletLocator} based on the configuration. -// * -// * @param conf -// * the Hadoop configuration object -// * @return an accumulo tablet locator -// * @throws TableNotFoundException -// * if the table name set on the configuration doesn't exist -// * @throws IOException -// * if the input format is unable to read the password file from the FileSystem -// */ -// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { -// if (conf.getBoolean(MOCK, false)) -// return new InMemoryTabletLocator(); -// String tableName = getTablename(conf); -// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) -// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); -// ClientInfo info = ClientInfo.from(props); -// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), -// Threads.UEH); -// return TabletLocator.getLocator(context, context.getTableId(tableName)); -// } + // /** + // * Initializes an Accumulo {@link TabletLocator} based on the configuration. + // * + // * @param conf + // * the Hadoop configuration object + // * @return an accumulo tablet locator + // * @throws TableNotFoundException + // * if the table name set on the configuration doesn't exist + // * @throws IOException + // * if the input format is unable to read the password file from the FileSystem + // */ + // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { + // if (conf.getBoolean(MOCK, false)) + // return new InMemoryTabletLocator(); + // String tableName = getTablename(conf); + // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) + // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); + // ClientInfo info = ClientInfo.from(props); + // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), + // Threads.UEH); + // return TabletLocator.getLocator(context, context.getTableId(tableName)); + // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); } - /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1098,7 +1094,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); -// TabletLocator tl; + // TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1110,9 +1106,9 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; -// tl = getTabletLocator(job.getConfiguration()); + // tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it -// tl.invalidateCache(); + // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1128,7 +1124,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); -// tl.invalidateCache(); + // tl.invalidateCache(); } clipRanges(binnedRanges); From 3f84d0c480290d238a1141505339b9fcccfbda5d Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 24 Oct 2025 14:02:06 -0400 Subject: [PATCH 5/7] wip --- .../datawave/mr/bulk/BulkInputFormat.java | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index a4fec671af5..4219c66c409 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,28 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - // /** - // * Initializes an Accumulo {@link TabletLocator} based on the configuration. - // * - // * @param conf - // * the Hadoop configuration object - // * @return an accumulo tablet locator - // * @throws TableNotFoundException - // * if the table name set on the configuration doesn't exist - // * @throws IOException - // * if the input format is unable to read the password file from the FileSystem - // */ - // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - // if (conf.getBoolean(MOCK, false)) - // return new InMemoryTabletLocator(); - // String tableName = getTablename(conf); - // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - // ClientInfo info = ClientInfo.from(props); - // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - // Threads.UEH); - // return TabletLocator.getLocator(context, context.getTableId(tableName)); - // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -1094,7 +1072,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - // TabletLocator tl; + try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1106,9 +1084,6 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - // tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1124,7 +1099,6 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - // tl.invalidateCache(); } clipRanges(binnedRanges); @@ -1170,13 +1144,9 @@ public List getSplits(JobContext job) throws IOException { log.info("There are approximately {} values ", map.size()); for (RangeSplit split : map.keySet()) { - // Iterable> rangeIter = splitter.partition(map.get(split)); - // for (List rangeList : rangeIter) { - // RangeSplit newSplit = (RangeSplit) split.clone(); - // newSplit.addRanges(rangeList); + split.addRanges(map.get(split)); splits.add(split); - // } } From fd8e4d17c0c49119d44e2258d56f6393201395ce Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 27 Oct 2025 11:47:12 -0400 Subject: [PATCH 6/7] formatting?: --- .../core/src/main/java/datawave/mr/bulk/BulkInputFormat.java | 1 - 1 file changed, 1 deletion(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 4219c66c409..7b70c238a7a 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,7 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); From 810154861035f8ebb96ba2afa5c3a7ba41e9cdba Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 27 Oct 2025 12:02:46 -0400 Subject: [PATCH 7/7] format --- .../datawave/accumulo/inmemory/InMemoryTableOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 26fab5f2dcc..628df0b460c 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -563,7 +563,7 @@ public Locations locate(String tableName, Collection ranges) throws Accum return new LocationsImpl(binnedRanges); } - private List binRanges(ArrayList ranges, Map>> binnedRanges) { + private List binRanges(ArrayList ranges, Map>> binnedRanges) { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); }