Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,4 @@ AccumuloClient getClient(String userDN, Collection<String> proxyServers, String
* @return A map representation
*/
Map<String,String> getTrackingMap(StackTraceElement[] stackTrace);

/**
* Utility method to unwrap the ClientContext instance within {@link WrappedAccumuloClient} as needed
*
* @param accumuloClient
* {@link AccumuloClient} instance
* @return {@link WrappedAccumuloClient#getReal()}, if applicable; accumuloClient itself, if it implements {@link ClientContext}; otherwise returns null
*/
static ClientContext getClientContext(AccumuloClient accumuloClient) {
ClientContext cc = null;
if (accumuloClient instanceof WrappedAccumuloClient) {
cc = (ClientContext) ((WrappedAccumuloClient) accumuloClient).getReal();
} else if (accumuloClient instanceof ClientContext) {
cc = (ClientContext) accumuloClient;
}
return cc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand All @@ -77,8 +78,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import datawave.accumulo.inmemory.impl.InMemoryTabletLocator;

public class InMemoryTableOperations extends TableOperationsHelper {
private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class);
private static final byte[] ZERO = {0};
Expand Down Expand Up @@ -560,11 +559,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab
@Override
public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
InMemoryTabletLocator locator = new InMemoryTabletLocator();
List<Range> ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges);
List<Range> ignore = binRanges(new ArrayList<>(ranges), binnedRanges);
return new LocationsImpl(binnedRanges);
}

private List<Range> binRanges(ArrayList<Range> ranges, Map<String, Map<KeyExtent, List<Range>>> binnedRanges) {
binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges));
return Collections.emptyList();
}

private static class LocationsImpl implements Locations {

private Map<Range,List<TabletId>> groupedByRanges;
Expand Down
65 changes: 15 additions & 50 deletions warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
Expand All @@ -37,10 +37,7 @@
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientConfConverter;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.TabletLocator;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
Expand All @@ -50,15 +47,12 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.util.format.DateFormatSupplier;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -83,7 +77,6 @@

import datawave.accumulo.inmemory.InMemoryAccumuloClient;
import datawave.accumulo.inmemory.InMemoryInstance;
import datawave.accumulo.inmemory.impl.InMemoryTabletLocator;
import datawave.common.util.ArgumentChecker;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.mr.bulk.split.DefaultLocationStrategy;
Expand Down Expand Up @@ -1047,27 +1040,10 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) {
return new DefaultLocationStrategy();
}

/**
* Initializes an Accumulo {@link TabletLocator} based on the configuration.
*
* @param conf
* the Hadoop configuration object
* @return an accumulo tablet locator
* @throws TableNotFoundException
* if the table name set on the configuration doesn't exist
* @throws IOException
* if the input format is unable to read the password file from the FileSystem
*/
protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException {
if (conf.getBoolean(MOCK, false))
return new InMemoryTabletLocator();
String tableName = getTablename(conf);
Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS))
.as(getUsername(conf), new PasswordToken(getPassword(conf))).build();
ClientInfo info = ClientInfo.from(props);
ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()),
Threads.UEH);
return TabletLocator.getLocator(context, context.getTableId(tableName));
public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges));
return Collections.emptyList();
}

/**
Expand All @@ -1090,7 +1066,7 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {

// get the metadata information for these ranges
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
TabletLocator tl;

try {
if (isOfflineScan(job.getConfiguration())) {
binnedRanges = binOfflineTable(job, tableName, ranges);
Expand All @@ -1101,26 +1077,19 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
}
} else {
try (AccumuloClient client = getClient(job.getConfiguration())) {
TableId tableId = null;
tl = getTabletLocator(job.getConfiguration());
// its possible that the cache could contain complete, but old information about a tables tablets... so clear it
tl.invalidateCache();
ClientInfo info = ClientInfo.from(cbHelper.newClientProperties());
ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info,
ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH);
while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
String tableId = null;
while (!binRanges(ranges, binnedRanges).isEmpty()) {
if (!(client instanceof InMemoryAccumuloClient)) {
if (tableId == null)
tableId = context.getTableId(tableName);
if (!context.tableNodeExists(tableId))
throw new TableDeletedException(tableId.canonical());
if (context.getTableState(tableId) == TableState.OFFLINE)
throw new TableOfflineException("Table (" + tableId.canonical() + ") is offline");
tableId = client.tableOperations().tableIdMap().get(tableName);
if (!client.tableOperations().exists(tableId))
throw new TableDeletedException(tableId);
if (!client.tableOperations().isOnline(tableId))
throw new TableOfflineException("Table (" + tableId + ") is offline");
}
binnedRanges.clear();
log.warn("Unable to locate bins for specified ranges. Retrying.");
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200));
tl.invalidateCache();
}

clipRanges(binnedRanges);
Expand Down Expand Up @@ -1166,13 +1135,9 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
log.info("There are approximately {} values ", map.size());

for (RangeSplit split : map.keySet()) {
// Iterable<List<Range>> rangeIter = splitter.partition(map.get(split));
// for (List<Range> rangeList : rangeIter) {
// RangeSplit newSplit = (RangeSplit) split.clone();
// newSplit.addRanges(rangeList);

split.addRanges(map.get(split));
splits.add(split);
// }

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ private Path distCpDirectory(Path jobDirectory) throws Exception {
}

/**
* Determines whether or not it is safe to bring map files online. This asks Accumulo for its stats for major compaction (running and queued), and will
* Determines whether it is safe to bring map files online. This asks Accumulo for its stats for major compaction (running and queued), and will
* return false if either "too many" compactions are running/queued.
*
* @param lastOnlineTime
Expand All @@ -703,15 +703,24 @@ private Path distCpDirectory(Path jobDirectory) throws Exception {
* log info
* @return boolean flag
*/
public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) {
public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) throws AccumuloException, AccumuloSecurityException {
Level level = (logInfo ? Level.INFO : Level.DEBUG);
int majC = getMajorCompactionCount();
log.log(level, "There are " + majC + " compactions currently running or queued.");

// SETH NOTE: This is the only place that .getMajorCompactionCount() is used.
// It may be reasonable to remove the logic entirely and use another metric
// for deciding if we can bring the map files online.
// I'm deciding to replace it with a comparison against all active compactions, since that's
// at least as strict as all majC.

// You're just waiting on the updates on the Accumulo side, they said they were going to make a PR for this.
int majminC = accumuloClient.instanceOperations().getActiveCompactions().size();

log.log(level, "There are " + majminC + " compactions currently running or queued.");

long delta = System.currentTimeMillis() - lastOnlineTime;
log.log(level, "Time since map files last brought online: " + (delta / 1000) + "s");

return (delta > MAJC_WAIT_TIMEOUT) && (majC < MAJC_THRESHOLD);
return (delta > MAJC_WAIT_TIMEOUT) && (majminC < MAJC_THRESHOLD);
}

private int getMajorCompactionCount() {
Expand All @@ -722,7 +731,7 @@ private int getMajorCompactionCount() {
try {
client = ThriftClientTypes.MANAGER.getConnection(context);
ManagerMonitorInfo mmi = client.getManagerStats(null, context.rpcCreds());
Map<String,TableInfo> tableStats = mmi.getTableMap();
Map<String, TableInfo> tableStats = mmi.getTableMap();

for (java.util.Map.Entry<String,TableInfo> e : tableStats.entrySet()) {
majC += e.getValue().getMajors().getQueued();
Expand Down
Loading