diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 7c879b2305d..628df0b460c 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -77,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; - public class InMemoryTableOperations extends TableOperationsHelper { private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class); private static final byte[] ZERO = {0}; @@ -560,11 +559,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab @Override public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Map>> binnedRanges = new HashMap<>(); - InMemoryTabletLocator locator = new InMemoryTabletLocator(); - List ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges); + List ignore = binRanges(new ArrayList<>(ranges), binnedRanges); return new LocationsImpl(binnedRanges); } + private List binRanges(ArrayList ranges, Map>> binnedRanges) { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); + } + private static class LocationsImpl implements Locations { private Map> groupedByRanges; diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..7b70c238a7a 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -9,6 +9,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -16,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -40,7 +40,6 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -83,7 +82,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; @@ -1047,27 +1045,10 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); + public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); } /** @@ -1090,7 +1071,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; + try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1102,13 +1083,10 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + while (!binRanges(context, ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) tableId = context.getTableId(tableName); @@ -1120,7 +1098,6 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); } clipRanges(binnedRanges); @@ -1166,13 +1143,9 @@ public List getSplits(JobContext job) throws IOException { log.info("There are approximately {} values ", map.size()); for (RangeSplit split : map.keySet()) { - // Iterable> rangeIter = splitter.partition(map.get(split)); - // for (List rangeList : rangeIter) { - // RangeSplit newSplit = (RangeSplit) split.clone(); - // newSplit.addRanges(rangeList); + split.addRanges(map.get(split)); splits.add(split); - // } }