diff --git a/core/in-memory-accumulo/pom.xml b/core/in-memory-accumulo/pom.xml index 39731e50bcb..3704ff093ca 100644 --- a/core/in-memory-accumulo/pom.xml +++ b/core/in-memory-accumulo/pom.xml @@ -105,6 +105,30 @@ org.mockito mockito-core + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java index bcb097e396e..4c64ee48087 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java @@ -17,6 +17,7 @@ package datawave.accumulo.inmemory; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -30,35 +31,31 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SystemPermission; -import org.apache.accumulo.core.singletons.SingletonReservation; -public class InMemoryAccumuloClient extends ClientContext implements AccumuloClient { +// Remove `extends ClientContext` since it's deprecated. +// Need a workaround for the features that are no longer supported. Not sure if we can just throw them away atm. + +public class InMemoryAccumuloClient implements AccumuloClient { String username; private final InMemoryAccumulo acu; + private ConditionalWriterConfig conditionalWriterConfig; + private volatile boolean closed = false; + public InMemoryAccumuloClient(String username, InMemoryInstance instance) throws AccumuloSecurityException { - this(new Credentials(username, new PasswordToken(new byte[0])), instance.acu); - } - public InMemoryAccumuloClient(Credentials credentials, InMemoryAccumulo acu) throws AccumuloSecurityException { - super(SingletonReservation.noop(), new InMemoryClientInfo(credentials), DefaultConfiguration.getInstance(), null); - if (credentials.getToken().isDestroyed()) - throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); - this.username = credentials.getPrincipal(); - this.acu = acu; + this.username = username; + this.acu = instance.acu; if (!acu.users.containsKey(username)) { InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY); user.permissions.add(SystemPermission.SYSTEM); @@ -66,6 +63,22 @@ public InMemoryAccumuloClient(Credentials credentials, InMemoryAccumulo acu) thr } } + @Override + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { + // TODO add implementation + throw new UnsupportedOperationException(); + } + + public ConditionalWriter createConditionalWriter(String tableName) throws TableNotFoundException { + return this.createConditionalWriter(tableName, (ConditionalWriterConfig)null); + } + + private void ensureOpen() { + if (this.closed) { + throw new IllegalStateException("This client was closed."); + } + } + @Override public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) @@ -156,12 +169,6 @@ public NamespaceOperations namespaceOperations() { return new InMemoryNamespaceOperations(acu, username); } - @Override - public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) { - // TODO add implementation - throw new UnsupportedOperationException(); - } - @Override public ReplicationOperations replicationOperations() { // TODO add implementation diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java deleted file mode 100644 index 4f172a55401..00000000000 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java +++ /dev/null @@ -1,23 +0,0 @@ -package datawave.accumulo.inmemory; - -import java.util.Properties; - -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientInfoImpl; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.conf.ClientProperty; - -public class InMemoryClientInfo extends ClientInfoImpl { - public InMemoryClientInfo(Credentials credentials) { - super(toProperties(credentials), credentials.getToken()); - } - - private static Properties toProperties(Credentials credentials) { - Properties props = new Properties(); - props.put(ClientProperty.INSTANCE_NAME.getKey(), new InMemoryInstance().instanceName); - props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), credentials.getPrincipal()); - props.put(ClientProperty.AUTH_TOKEN.getKey(), new String(((PasswordToken) (credentials.getToken())).getPassword())); - props.put(ClientProperty.AUTH_TYPE.getKey(), "password"); - return props; - } -} diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java index 5708d0cfa84..37164b070a6 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java @@ -25,47 +25,39 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; -import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SystemPermission; -public class InMemoryConnector extends Connector { +public class InMemoryConnector { String username; private final InMemoryAccumulo acu; - private final Instance instance; - InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException { - this(new Credentials(username, new PasswordToken(new byte[0])), new InMemoryAccumulo(InMemoryInstance.getDefaultFileSystem()), instance); - } + InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException { - InMemoryConnector(Credentials credentials, InMemoryAccumulo acu, InMemoryInstance instance) throws AccumuloSecurityException { - if (credentials.getToken().isDestroyed()) - throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); - this.username = credentials.getPrincipal(); - this.acu = acu; - this.instance = instance; + this.username = username; + this.acu = instance.acu; + if (!acu.users.containsKey(username)) { + InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY); + user.permissions.add(SystemPermission.SYSTEM); + acu.users.put(user.name, user); + } } - @Override public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) throw new TableNotFoundException(tableName, tableName, "no such table"); return acu.createBatchScanner(tableName, authorizations); } - @Override public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) @@ -73,36 +65,30 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz return new InMemoryBatchDeleter(acu, tableName, authorizations); } - @Override public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException { return createBatchDeleter(tableName, authorizations, numQueryThreads, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) throw new TableNotFoundException(tableName, tableName, "no such table"); return new InMemoryBatchWriter(acu, tableName); } - @Override public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException { return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { return new InMemoryMultiTableBatchWriter(acu); } - @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { InMemoryTable table = acu.tables.get(tableName); if (table == null) @@ -110,46 +96,29 @@ public Scanner createScanner(String tableName, Authorizations authorizations) th return new InMemoryScanner(table, authorizations); } - @Override - public Instance getInstance() { - return instance; - } - - @Override public String whoami() { return username; } - @Override public TableOperations tableOperations() { return new InMemoryTableOperations(acu, username); } - @Override public SecurityOperations securityOperations() { return new InMemorySecurityOperations(acu); } - @Override public InstanceOperations instanceOperations() { return new InMemoryInstanceOperations(acu); } - @Override public NamespaceOperations namespaceOperations() { return new InMemoryNamespaceOperations(acu, username); } - @Override public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { // TODO add implementation throw new UnsupportedOperationException(); } - @Override - public ReplicationOperations replicationOperations() { - // TODO add implementation - throw new UnsupportedOperationException(); - } - } diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java index 62589f01c54..e6599165ccd 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java @@ -20,22 +20,17 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; + import java.util.List; import java.util.Map; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; /** * InMemory Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly @@ -48,7 +43,7 @@ * Accumulo. * */ -public class InMemoryInstance implements Instance { +public class InMemoryInstance { static final String genericAddress = "localhost:1234"; static final Map instances = new HashMap<>(); @@ -85,61 +80,30 @@ public InMemoryInstance(String instanceName, FileSystem fs) { this.instanceName = instanceName; } - @Override public String getRootTabletLocation() { return genericAddress; } - @Override public List getMasterLocations() { return Collections.singletonList(genericAddress); } - @Override public String getInstanceID() { return "mock-instance-id"; } - @Override public String getInstanceName() { return instanceName; } - @Override public String getZooKeepers() { return "localhost"; } - @Override public int getZooKeepersSessionTimeOut() { return 30 * 1000; } - @Override - public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, new PasswordToken(pass)); - } - - @Override - public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, ByteBufferUtil.toBytes(pass)); - } - - @Override - public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); - } - - @Override - public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { - Connector conn = new InMemoryConnector(new Credentials(principal, token), acu, this); - if (!acu.users.containsKey(principal)) - conn.securityOperations().createLocalUser(principal, (PasswordToken) token); - else if (!acu.users.get(principal).token.equals(token)) - throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS); - return conn; - } - public static class CachedConfiguration { private static Configuration configuration = null; diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 7c879b2305d..628df0b460c 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -77,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; - public class InMemoryTableOperations extends TableOperationsHelper { private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class); private static final byte[] ZERO = {0}; @@ -560,11 +559,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab @Override public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Map>> binnedRanges = new HashMap<>(); - InMemoryTabletLocator locator = new InMemoryTabletLocator(); - List ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges); + List ignore = binRanges(new ArrayList<>(ranges), binnedRanges); return new LocationsImpl(binnedRanges); } + private List binRanges(ArrayList ranges, Map>> binnedRanges) { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); + } + private static class LocationsImpl implements Locations { private Map> groupedByRanges; diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..1263c5df154 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -9,6 +9,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -16,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -37,10 +37,9 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; + import org.apache.accumulo.core.clientImpl.ClientConfConverter; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -83,7 +82,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; @@ -1047,27 +1045,10 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); + public List binRanges(List ranges, Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); } /** @@ -1090,7 +1071,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; + try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1102,25 +1083,19 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, - ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + + while (!binRanges(ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) - tableId = context.getTableId(tableName); - if (!context.tableNodeExists(tableId)) + tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName));//TABLE ID todo:ensure this is correct + if (!client.tableOperations().tableIdMap().containsKey(tableName))// CHECK IF TABLE ID EXISTS //todo same throw new TableDeletedException(tableId.canonical()); - if (context.getTableState(tableId) == TableState.OFFLINE) + if (!client.tableOperations().isOnline(tableName)) //CHECK IF TABLE ID OFFLInE //todo same throw new TableOfflineException("Table (" + tableId.canonical() + ") is offline"); } binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); } clipRanges(binnedRanges); @@ -1166,13 +1141,9 @@ public List getSplits(JobContext job) throws IOException { log.info("There are approximately {} values ", map.size()); for (RangeSplit split : map.keySet()) { - // Iterable> rangeIter = splitter.partition(map.get(split)); - // for (List rangeList : rangeIter) { - // RangeSplit newSplit = (RangeSplit) split.clone(); - // newSplit.addRanges(rangeList); + split.addRanges(map.get(split)); splits.add(split); - // } }