Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/in-memory-accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>2.1.4-5792fed3-bulkv2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>2.1.4-5792fed3-bulkv2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>2.1.4-5792fed3-bulkv2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>2.1.4-5792fed3-bulkv2</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datawave.accumulo.inmemory;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
Expand All @@ -30,42 +31,54 @@
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;

import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.core.singletons.SingletonReservation;

public class InMemoryAccumuloClient extends ClientContext implements AccumuloClient {
// Remove `extends ClientContext` since it's deprecated.
// Need a workaround for the features that are no longer supported. Not sure if we can just throw them away atm.

public class InMemoryAccumuloClient implements AccumuloClient {

String username;
private final InMemoryAccumulo acu;

private ConditionalWriterConfig conditionalWriterConfig;
private volatile boolean closed = false;

public InMemoryAccumuloClient(String username, InMemoryInstance instance) throws AccumuloSecurityException {
this(new Credentials(username, new PasswordToken(new byte[0])), instance.acu);
}

public InMemoryAccumuloClient(Credentials credentials, InMemoryAccumulo acu) throws AccumuloSecurityException {
super(SingletonReservation.noop(), new InMemoryClientInfo(credentials), DefaultConfiguration.getInstance(), null);
if (credentials.getToken().isDestroyed())
throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED);
this.username = credentials.getPrincipal();
this.acu = acu;
this.username = username;
this.acu = instance.acu;
if (!acu.users.containsKey(username)) {
InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY);
user.permissions.add(SystemPermission.SYSTEM);
acu.users.put(user.name, user);
}
}

@Override
public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
// TODO add implementation
throw new UnsupportedOperationException();
}

public ConditionalWriter createConditionalWriter(String tableName) throws TableNotFoundException {
return this.createConditionalWriter(tableName, (ConditionalWriterConfig)null);
}

private void ensureOpen() {
if (this.closed) {
throw new IllegalStateException("This client was closed.");
}
}

@Override
public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException {
if (acu.tables.get(tableName) == null)
Expand Down Expand Up @@ -156,12 +169,6 @@ public NamespaceOperations namespaceOperations() {
return new InMemoryNamespaceOperations(acu, username);
}

@Override
public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) {
// TODO add implementation
throw new UnsupportedOperationException();
}

@Override
public ReplicationOperations replicationOperations() {
// TODO add implementation
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,131 +25,100 @@
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.ReplicationOperations;
import org.apache.accumulo.core.client.admin.SecurityOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SystemPermission;

public class InMemoryConnector extends Connector {
public class InMemoryConnector {

String username;
private final InMemoryAccumulo acu;
private final Instance instance;

InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException {
this(new Credentials(username, new PasswordToken(new byte[0])), new InMemoryAccumulo(InMemoryInstance.getDefaultFileSystem()), instance);
}
InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException {

InMemoryConnector(Credentials credentials, InMemoryAccumulo acu, InMemoryInstance instance) throws AccumuloSecurityException {
if (credentials.getToken().isDestroyed())
throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED);
this.username = credentials.getPrincipal();
this.acu = acu;
this.instance = instance;
this.username = username;
this.acu = instance.acu;
if (!acu.users.containsKey(username)) {
InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY);
user.permissions.add(SystemPermission.SYSTEM);
acu.users.put(user.name, user);
}
}

@Override
public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException {
if (acu.tables.get(tableName) == null)
throw new TableNotFoundException(tableName, tableName, "no such table");
return acu.createBatchScanner(tableName, authorizations);
}

@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency,
int maxWriteThreads) throws TableNotFoundException {
if (acu.tables.get(tableName) == null)
throw new TableNotFoundException(tableName, tableName, "no such table");
return new InMemoryBatchDeleter(acu, tableName, authorizations);
}

@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
throws TableNotFoundException {
return createBatchDeleter(tableName, authorizations, numQueryThreads, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS),
config.getMaxWriteThreads());
}

@Override
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
if (acu.tables.get(tableName) == null)
throw new TableNotFoundException(tableName, tableName, "no such table");
return new InMemoryBatchWriter(acu, tableName);
}

@Override
public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
}

@Override
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
return new InMemoryMultiTableBatchWriter(acu);
}

@Override
public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
}

@Override
public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException {
InMemoryTable table = acu.tables.get(tableName);
if (table == null)
throw new TableNotFoundException(tableName, tableName, "no such table");
return new InMemoryScanner(table, authorizations);
}

@Override
public Instance getInstance() {
return instance;
}

@Override
public String whoami() {
return username;
}

@Override
public TableOperations tableOperations() {
return new InMemoryTableOperations(acu, username);
}

@Override
public SecurityOperations securityOperations() {
return new InMemorySecurityOperations(acu);
}

@Override
public InstanceOperations instanceOperations() {
return new InMemoryInstanceOperations(acu);
}

@Override
public NamespaceOperations namespaceOperations() {
return new InMemoryNamespaceOperations(acu, username);
}

@Override
public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
// TODO add implementation
throw new UnsupportedOperationException();
}

@Override
public ReplicationOperations replicationOperations() {
// TODO add implementation
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,17 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;

import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;

/**
* InMemory Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly
Expand All @@ -48,7 +43,7 @@
* Accumulo.
*
*/
public class InMemoryInstance implements Instance {
public class InMemoryInstance {

static final String genericAddress = "localhost:1234";
static final Map<String,InMemoryAccumulo> instances = new HashMap<>();
Expand Down Expand Up @@ -85,61 +80,30 @@ public InMemoryInstance(String instanceName, FileSystem fs) {
this.instanceName = instanceName;
}

@Override
public String getRootTabletLocation() {
return genericAddress;
}

@Override
public List<String> getMasterLocations() {
return Collections.singletonList(genericAddress);
}

@Override
public String getInstanceID() {
return "mock-instance-id";
}

@Override
public String getInstanceName() {
return instanceName;
}

@Override
public String getZooKeepers() {
return "localhost";
}

@Override
public int getZooKeepersSessionTimeOut() {
return 30 * 1000;
}

@Override
public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
return getConnector(user, new PasswordToken(pass));
}

@Override
public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
return getConnector(user, ByteBufferUtil.toBytes(pass));
}

@Override
public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
}

@Override
public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
Connector conn = new InMemoryConnector(new Credentials(principal, token), acu, this);
if (!acu.users.containsKey(principal))
conn.securityOperations().createLocalUser(principal, (PasswordToken) token);
else if (!acu.users.get(principal).token.equals(token))
throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS);
return conn;
}

public static class CachedConfiguration {
private static Configuration configuration = null;

Expand Down
Loading
Loading