Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand All @@ -77,8 +78,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import datawave.accumulo.inmemory.impl.InMemoryTabletLocator;

public class InMemoryTableOperations extends TableOperationsHelper {
private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class);
private static final byte[] ZERO = {0};
Expand Down Expand Up @@ -560,11 +559,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab
@Override
public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
InMemoryTabletLocator locator = new InMemoryTabletLocator();
List<Range> ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges);
List<Range> ignore = binRanges(new ArrayList<>(ranges), binnedRanges);
return new LocationsImpl(binnedRanges);
}

private List<Range> binRanges(ArrayList<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges));
return Collections.emptyList();
}

private static class LocationsImpl implements Locations {

private Map<Range,List<TabletId>> groupedByRanges;
Expand Down
43 changes: 8 additions & 35 deletions warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
Expand All @@ -40,7 +40,6 @@
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
Expand Down Expand Up @@ -83,7 +82,6 @@

import datawave.accumulo.inmemory.InMemoryAccumuloClient;
import datawave.accumulo.inmemory.InMemoryInstance;
import datawave.accumulo.inmemory.impl.InMemoryTabletLocator;
import datawave.common.util.ArgumentChecker;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.mr.bulk.split.DefaultLocationStrategy;
Expand Down Expand Up @@ -1047,27 +1045,10 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) {
return new DefaultLocationStrategy();
}

/**
* Initializes an Accumulo {@link TabletLocator} based on the configuration.
*
* @param conf
* the Hadoop configuration object
* @return an accumulo tablet locator
* @throws TableNotFoundException
* if the table name set on the configuration doesn't exist
* @throws IOException
* if the input format is unable to read the password file from the FileSystem
*/
protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException {
if (conf.getBoolean(MOCK, false))
return new InMemoryTabletLocator();
String tableName = getTablename(conf);
Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS))
.as(getUsername(conf), new PasswordToken(getPassword(conf))).build();
ClientInfo info = ClientInfo.from(props);
ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()),
Threads.UEH);
return TabletLocator.getLocator(context, context.getTableId(tableName));
public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges));
return Collections.emptyList();
}

/**
Expand All @@ -1090,7 +1071,7 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {

// get the metadata information for these ranges
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
TabletLocator tl;

try {
if (isOfflineScan(job.getConfiguration())) {
binnedRanges = binOfflineTable(job, tableName, ranges);
Expand All @@ -1102,13 +1083,10 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
} else {
try (AccumuloClient client = getClient(job.getConfiguration())) {
TableId tableId = null;
tl = getTabletLocator(job.getConfiguration());
// its possible that the cache could contain complete, but old information about a tables tablets... so clear it
tl.invalidateCache();
ClientInfo info = ClientInfo.from(cbHelper.newClientProperties());
ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info,
ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH);
while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
while (!binRanges(context, ranges, binnedRanges).isEmpty()) {
if (!(client instanceof InMemoryAccumuloClient)) {
if (tableId == null)
tableId = context.getTableId(tableName);
Expand All @@ -1120,7 +1098,6 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
binnedRanges.clear();
log.warn("Unable to locate bins for specified ranges. Retrying.");
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200));
tl.invalidateCache();
}

clipRanges(binnedRanges);
Expand Down Expand Up @@ -1166,13 +1143,9 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
log.info("There are approximately {} values ", map.size());

for (RangeSplit split : map.keySet()) {
// Iterable<List<Range>> rangeIter = splitter.partition(map.get(split));
// for (List<Range> rangeList : rangeIter) {
// RangeSplit newSplit = (RangeSplit) split.clone();
// newSplit.addRanges(rangeList);

split.addRanges(map.get(split));
splits.add(split);
// }

}

Expand Down
Loading