Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Following dependencies are downloaded automatically:
* Aerospike Java client 7.2.2 or greater
* Apache commons cli 1.7.0
* Log4j 2.22.1
* Junit 4.13.1
* Junit 5.11.4
* system-lambda 1.2.1
* Json-simple 1.1.1

<a name="Usage"></a>
Expand Down
22 changes: 19 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,29 @@
<artifactId>log4j-core</artifactId>
<version>2.22.1</version>
</dependency>

<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.11.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.11.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
<version>1.2.1</version>
<scope>test</scope>
</dependency>

<!-- JSON -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
Expand Down
275 changes: 139 additions & 136 deletions src/main/java/com/aerospike/load/AerospikeLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -104,143 +105,145 @@ private static void printVersion()
}
}

public static void main(String[] args) throws IOException {
long processStart = System.currentTimeMillis();
public static void main(String[] args) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be formatting issue with this PR. Please double check.

int exitCode = -1;
try {
CommandLine cl = parseArgs(args);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the parseArgs to launch and let launch take full responsibility for the functionality. In that way, we don't need to include system-lambda for testing.

int exitCode = AerospikeLoad.launch(new String[]{"-h", host, "-p", port, "-n", ns, "-ec", error_count, "-wa", write_action, "-c", "/non/existing/config.json", dataFile});
assertEquals(-1, exitCode);

exitCode = launch(cl);
} catch (Exception e) {
if (log.isDebugEnabled()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's ensure we print the exception error as before.

e.printStackTrace();
}
}
System.exit(exitCode);
}

public static int launch(CommandLine cl) throws Exception {
long processStart = System.currentTimeMillis();

AerospikeClient client = null;
statPrinter = new Thread(new PrintStat(counters));
// Create Abstract derived params from provided commandline params.
params = Utils.parseParameters(cl);
if (params.verbose) {
Configurator.setAllLevels(LogManager.getRootLogger().getName(), Level.DEBUG);
}

initReadWriteThreadCnt(cl);

// Get and validate user roles for client.
client = getAerospikeClient(cl);
if (client == null) {
return -1;
}

List<String> dataFileNames = new ArrayList<String>();
initDataFileNameList(cl, dataFileNames);
if (dataFileNames.size() == 0) {
return -1;
}

// Remove column definition file from list. if directory containing config file is passed.
String columnDefinitionFileName = cl.getOptionValue("c", "");
dataFileNames.remove(columnDefinitionFileName);
log.info("Number of data files:" + dataFileNames.size());

initBytesToRead(dataFileNames);

log.info("Aerospike loader started");
// Perform main Read Write job.
runLoader(client, columnDefinitionFileName, dataFileNames);

// Stop statistic printer thread.
statPrinter.interrupt();
log.info("Aerospike loader completed");
if (client != null) {
client.close();
}

long processStop = System.currentTimeMillis();
log.info(String.format("Loader completed in %.3fsec", (float) (processStop - processStart) / 1000));
return 0;
}

public static CommandLine parseArgs(String[] args) throws ParseException {
counters = new Counter();
CommandLine cl;

Options options = new Options();
options.addOption("h", "hosts", true,
"List of seed hosts in format:\n" +
"hostname1[:tlsname][:port1],...\n" +
"The tlsname is only used when connecting with a secure TLS enabled server. " +
"If the port is not specified, the default port is used.\n" +
"IPv6 addresses must be enclosed in square brackets.\n" +
"Default: localhost\n" +
"Examples:\n" +
"host1\n" +
"host1:3000,host2:3000\n" +
"192.168.1.10:cert1:3000,[2001::1111]:cert2:3000\n"
);
options.addOption("V", "version", false, "Aerospike Loader Version");
options.addOption("p", "port", true, "Server port (default: 3000)");
options.addOption("U", "user", true, "User name");
options.addOption("P", "password", true, "Password");
options.addOption("n", "namespace", true, "Namespace (default: test)");
options.addOption("c", "config", true, "Column definition file name");
options.addOption("g", "max-throughput", true, "It limit numer of writes/sec in aerospike.");
options.addOption("T", "transaction-timeout", true, "write transaction timeout in milliseconds(default: No timeout)");
options.addOption("e", "expirationTime", true,
"Set expiration time of each record in seconds." +
" -1: Never expire, " +
" 0: Default to namespace," +
" >0: Actual given expiration time"
);
options.addOption("tz", "timezone", true, "Timezone of source where data dump is taken (default: local timezone)");
options.addOption("ec", "abort-error-count", true, "Error count to abort (default: 0)");
options.addOption("wa", "write-action", true, "Write action if key already exists (default: update)");
options.addOption("sa", "services_alternate", false, "Enable alternate services.");
options.addOption("tls", "tls-enable", false, "Use TLS/SSL sockets");
options.addOption("tp", "tls-protocols", true,
"Allow TLS protocols\n" +
"Values: TLSv1,TLSv1.1,TLSv1.2 separated by comma\n" +
"Default: TLSv1.2"
);
options.addOption("tlsCiphers", "tls-cipher-suite", true,
"Allow TLS cipher suites\n" +
"Values: cipher names defined by JVM separated by comma\n" +
"Default: null (default cipher list provided by JVM)"
);
options.addOption("tr", "tlsRevoke", true,
"Revoke certificates identified by their serial number\n" +
"Values: serial numbers separated by comma\n" +
"Default: null (Do not revoke certificates)"
);

options.addOption("tlsLoginOnly", false, "Use TLS/SSL sockets on node login only");
options.addOption("auth", true, "Authentication mode. Values: " + Arrays.toString(AuthMode.values()));

options.addOption("uk", "send-user-key", false,
"Send user defined key in addition to hash digest to store on the server. (default: userKey is not sent to reduce meta-data overhead)"
);
options.addOption("v", "verbose", false, "Logging all");
options.addOption("um", "unorderdMaps", false, "Write all maps as unorderd maps");
options.addOption("u", "usage", false, "Print usage.");

CommandLineParser parser = new PosixParser();
cl = parser.parse(options, args, false);

if (args.length == 0 || cl.hasOption("u")) {
printUsage(options);
return cl;
}

if (cl.hasOption("V")) {
printVersion();
return cl;
}

return cl;
}

AerospikeClient client = null;
counters = new Counter();
CommandLine cl;

try {
Options options = new Options();
options.addOption("h", "hosts", true,
"List of seed hosts in format:\n" +
"hostname1[:tlsname][:port1],...\n" +
"The tlsname is only used when connecting with a secure TLS enabled server. " +
"If the port is not specified, the default port is used.\n" +
"IPv6 addresses must be enclosed in square brackets.\n" +
"Default: localhost\n" +
"Examples:\n" +
"host1\n" +
"host1:3000,host2:3000\n" +
"192.168.1.10:cert1:3000,[2001::1111]:cert2:3000\n"
);
options.addOption("V", "version", false, "Aerospike Loader Version");
options.addOption("p", "port", true, "Server port (default: 3000)");
options.addOption("U", "user", true, "User name");
options.addOption("P", "password", true, "Password");
options.addOption("n", "namespace", true, "Namespace (default: test)");
options.addOption("c", "config", true, "Column definition file name");
options.addOption("g", "max-throughput", true, "It limit numer of writes/sec in aerospike.");
options.addOption("T", "transaction-timeout", true, "write transaction timeout in milliseconds(default: No timeout)");
options.addOption("e", "expirationTime", true,
"Set expiration time of each record in seconds." +
" -1: Never expire, " +
" 0: Default to namespace," +
" >0: Actual given expiration time"
);
options.addOption("tz", "timezone", true, "Timezone of source where data dump is taken (default: local timezone)");
options.addOption("ec", "abort-error-count", true, "Error count to abort (default: 0)");
options.addOption("wa", "write-action", true, "Write action if key already exists (default: update)");
options.addOption("sa", "services_alternate", false, "Enable alternate services.");
options.addOption("tls", "tls-enable", false, "Use TLS/SSL sockets");
options.addOption("tp", "tls-protocols", true,
"Allow TLS protocols\n" +
"Values: TLSv1,TLSv1.1,TLSv1.2 separated by comma\n" +
"Default: TLSv1.2"
);
options.addOption("tlsCiphers", "tls-cipher-suite", true,
"Allow TLS cipher suites\n" +
"Values: cipher names defined by JVM separated by comma\n" +
"Default: null (default cipher list provided by JVM)"
);
options.addOption("tr", "tlsRevoke", true,
"Revoke certificates identified by their serial number\n" +
"Values: serial numbers separated by comma\n" +
"Default: null (Do not revoke certificates)"
);

options.addOption("tlsLoginOnly", false, "Use TLS/SSL sockets on node login only");
options.addOption("auth", true, "Authentication mode. Values: " + Arrays.toString(AuthMode.values()));

options.addOption("uk", "send-user-key", false,
"Send user defined key in addition to hash digest to store on the server. (default: userKey is not sent to reduce meta-data overhead)"
);
options.addOption("v", "verbose", false, "Logging all");
options.addOption("um", "unorderdMaps", false, "Write all maps as unorderd maps");
options.addOption("u", "usage", false, "Print usage.");

CommandLineParser parser = new PosixParser();
cl = parser.parse(options, args, false);

if (args.length == 0 || cl.hasOption("u")) {
printUsage(options);
return;
}

if (cl.hasOption("V")) {
printVersion();
return;
}
} catch (Exception e) {
log.error(e);
if (log.isDebugEnabled()) {
e.printStackTrace();
}
return;
}

try {
statPrinter = new Thread(new PrintStat(counters));
// Create Abstract derived params from provided commandline params.
params = Utils.parseParameters(cl);
if (params.verbose) {
Configurator.setAllLevels(LogManager.getRootLogger().getName(), Level.DEBUG);
}

initReadWriteThreadCnt(cl);

// Get and validate user roles for client.
client = getAerospikeClient(cl);
if (client == null) {
return;
}

List<String> dataFileNames = new ArrayList<String>();
initDataFileNameList(cl, dataFileNames);
if (dataFileNames.size() == 0) {
return;
}

// Remove column definition file from list. if directory containing config file is passed.
String columnDefinitionFileName = cl.getOptionValue("c", "");
dataFileNames.remove(columnDefinitionFileName);
log.info("Number of data files:" + dataFileNames.size());

initBytesToRead(dataFileNames);

log.info("Aerospike loader started");
// Perform main Read Write job.
runLoader(client, columnDefinitionFileName, dataFileNames);

} catch (Exception e) {
log.error(e);
if (log.isDebugEnabled()) {
e.printStackTrace();
}
} finally {
// Stop statistic printer thread.
statPrinter.interrupt();
log.info("Aerospike loader completed");
if (client != null) {
client.close();
}
}

long processStop = System.currentTimeMillis();
log.info(String.format("Loader completed in %.3fsec", (float) (processStop - processStart) / 1000));
}

private static AerospikeClient getAerospikeClient(CommandLine cl) {
ClientPolicy clientPolicy = new ClientPolicy();

Expand Down
Loading