Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dd1d6ee
wip
dlmarion Jul 28, 2023
64d3541
Merge branch 'elasticity' into generic-task-api
dlmarion Jul 31, 2023
d1a3c8a
wip
dlmarion Aug 1, 2023
ca7015c
Merge branch 'elasticity' into generic-task-api
dlmarion Aug 1, 2023
400cd6c
wip
dlmarion Aug 1, 2023
e4a0e95
wip
dlmarion Aug 2, 2023
e81dfa5
Merge branch 'elasticity' into generic-task-api
dlmarion Aug 4, 2023
2a62560
Merge branch 'elasticity' into generic-task-api
dlmarion Aug 15, 2023
53d46b7
abandoned cross-language and cross-platform support, strictly gson ov…
dlmarion Aug 16, 2023
34e01ac
Merge branch 'elasticity' into generic-task-api
dlmarion Aug 17, 2023
7d90b1c
wip
dlmarion Aug 18, 2023
f95897f
replaced Thrift API between CompactionCoordinator and Compactor
dlmarion Aug 21, 2023
69c6264
reduced boilerplate code by moving into utility method
dlmarion Aug 21, 2023
5849631
Add missing annotations
dlmarion Aug 21, 2023
c6b9d82
wip
dlmarion Aug 25, 2023
bb7c50e
Merge branch 'elasticity' into generic-task-api
dlmarion Sep 7, 2023
8bd4cea
Merge branch 'elasticity' into generic-task-api
dlmarion Sep 7, 2023
5db1431
Merge branch 'elasticity' into generic-task-api
dlmarion Sep 19, 2023
764bd49
Merge branch 'elasticity' into generic-task-api
dlmarion Sep 29, 2023
de93216
Merge branch 'elasticity' into generic-task-api
dlmarion Sep 29, 2023
db97342
Build working...
dlmarion Oct 2, 2023
dedf47c
Fixup script and accumulo-cluster
dlmarion Oct 2, 2023
70887ca
Changes to ExternalDoNothingCompactor to get ITs working
dlmarion Oct 3, 2023
121707f
Renamed CompactionCoordinator to TaskManager
dlmarion Oct 3, 2023
a89bc4d
Add override annotations
dlmarion Oct 3, 2023
8cb4441
Add override annotation
dlmarion Oct 3, 2023
04e9294
Resolve spotbugs issue
dlmarion Oct 3, 2023
38774d7
Rename variables
dlmarion Oct 3, 2023
da85761
More renames, handle message types better
dlmarion Oct 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions assemble/bin/accumulo-cluster
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ function start_all() {
echo "Starting compactors for group $group"
Q="COMPACTOR_HOSTS_${group}"
for compactor in ${!Q}; do
start_service "$compactor" compactor "-o" "compactor.group=$group"
start_service "$compactor" compactor "-o" "compactor.group=$group" "-o" "task.runner.worker.type=COMPACTION"
done
done

Expand Down Expand Up @@ -295,7 +295,7 @@ function start_here() {
Q="COMPACTOR_HOSTS_${group}"
for compactor in ${!Q}; do
if echo "$compactor" | grep -q "^${host}\$"; then
start_service "$compactor" compactor "-o" "compactor.group=$group"
start_service "$compactor" compactor "-o" "compactor.group=$group" "-o" "task.runner.worker.type=COMPACTION"
fi
done
done
Expand Down Expand Up @@ -525,6 +525,16 @@ tserver:
default:
- localhost

# ELASTICITY_TODO: Add these new TaskRunner types to MAC, ClusterConfigParser, etc.
log_sorters:
default:
- localhost

# ELASTICITY_TODO: Add these new TaskRunner types to MAC, ClusterConfigParser, etc.
split_calculators:
default:
- localhost

compactor:
accumulo_meta:
- localhost
Expand Down
10 changes: 5 additions & 5 deletions assemble/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@
<artifactId>jakarta.xml.bind-api</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-compactor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
Expand Down Expand Up @@ -242,6 +237,11 @@
<artifactId>accumulo-start</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-task-runner</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-tserver</artifactId>
Expand Down
93 changes: 38 additions & 55 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1104,65 +1104,48 @@ public enum Property {
+ "also consider configuring the `" + NoDeleteConstraint.class.getName() + "` "
+ "constraint.",
"2.0.0"),
// Compactor properties
@Experimental
COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
@Experimental
COMPACTOR_PORTSEARCH("compactor.port.search", "true", PropertyType.BOOLEAN,
"If the compactor.port.client is in use, search higher ports until one is available",
"2.1.0"),
@Experimental
COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the compactor servers", "2.1.0"),
COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
"The minimum amount of time to wait between checks for the next compaction job, backing off"
+ "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
// ELASTICITY_TODO: Deprecate CompactionCoordinator properties in a 3.x release
TASK_MANAGER_PREFIX("task.manager.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the Accumulo TaskManager component.",
"4.0.0"),
TASK_MANAGER_DEAD_COMPACTOR_CHECK_INTERVAL("task.manager.compactor.dead.check.interval", "5m",
PropertyType.TIMEDURATION, "The interval at which to check for dead compactors.", "4.0.0"),
TASK_MANAGER_TSERVER_COMPACTION_CHECK_INTERVAL("task.manager.tserver.check.interval", "1m",
PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.", "4.0.0"),
// ELASTICITY_TODO: Deprecate Compactor properties in a 3.x release
TASK_RUNNER_PREFIX("task.runner.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo TaskRunner server.",
"4.0.0"),
TASK_RUNNER_CLIENTPORT("task.runner.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the TaskRunner servers", "4.0.0"),
TASK_RUNNER_GROUP_NAME("task.runner.group", Constants.DEFAULT_RESOURCE_GROUP_NAME,
PropertyType.STRING, "Resource group name for this Compactor.", "4.0.0"),
TASK_RUNNER_MAX_MESSAGE_SIZE("task.runner.message.size.max", "10M", PropertyType.BYTES,
"The maximum size of a message that can be sent to a tablet server.", "4.0.0"),
TASK_RUNNER_MIN_JOB_WAIT_TIME("task.runner.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
"The minimum amount of time to wait between checks for the next job, backing off"
+ "exponentially until TASK_RUNNER_MAX_JOB_WAIT_TIME is reached.",
"4.0.0"),
COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", PropertyType.TIMEDURATION,
"Compactors do exponential backoff when their request for work repeatedly come back empty. "
TASK_RUNNER_MAX_JOB_WAIT_TIME("task.runner.wait.time.job.max", "5m", PropertyType.TIMEDURATION,
"TaskRunners do exponential backoff when their request for work repeatedly come back empty. "
+ "This is the maximum amount of time to wait between checks for the next compaction job.",
"4.0.0"),
@Experimental
COMPACTOR_MINTHREADS("compactor.threads.minimum", "1", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "2.1.0"),
@Experimental
COMPACTOR_MINTHREADS_TIMEOUT("compactor.threads.timeout", "0s", PropertyType.TIMEDURATION,
TASK_RUNNER_MINTHREADS("task.runner.threads.minimum", "1", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "4.0.0"),
TASK_RUNNER_MINTHREADS_TIMEOUT("task.runner.threads.timeout", "0s", PropertyType.TIMEDURATION,
"The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely.",
"2.1.0"),
@Experimental
COMPACTOR_THREADCHECK("compactor.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool.", "2.1.0"),
@Experimental
COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", PropertyType.BYTES,
"The maximum size of a message that can be sent to a tablet server.", "2.1.0"),
@Experimental
COMPACTOR_GROUP_NAME("compactor.group", Constants.DEFAULT_RESOURCE_GROUP_NAME,
PropertyType.STRING, "Resource group name for this Compactor.", "3.0.0"),
// CompactionCoordinator properties
@Experimental
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compaction coordinator server.",
"2.1.0"),
@Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,
"The interval at which to check for dead compactors.", "2.1.0"),
@Experimental
COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS(
"compaction.coordinator.compaction.finalizer.threads.maximum", "5", PropertyType.COUNT,
"The maximum number of threads to use for notifying tablet servers that an external compaction has completed.",
"2.1.0"),
@Experimental
COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL(
"compaction.coordinator.compaction.finalizer.check.interval", "60s",
PropertyType.TIMEDURATION,
"The interval at which to check for external compaction final state markers in the metadata table.",
"2.1.0"),
@Experimental
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.", "2.1.0");
"4.0.0"),
TASK_RUNNER_PORTSEARCH("task.runner.port.search", "true", PropertyType.BOOLEAN,
"If the task.runner.port.client is in use, search higher ports until one is available",
"4.0.0"),
TASK_RUNNER_THREADCHECK("task.runner.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool.", "4.0.0"),
TASK_RUNNER_WORKER_TYPE("task.runner.worker.type", "", PropertyType.STRING,
"Property used by the TaskWorker "
+ "processes that determines which type of tasks it will perform. Valid values are COMPACTION,"
+ "LOG_SORTING, SPLIT_POINT_CALCULATION",
"4.0.0");

private final String key;
private final String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public class ServiceLockData implements Comparable<ServiceLockData> {
*/
public static enum ThriftService {
CLIENT,
COORDINATOR,
COMPACTOR,
FATE,
GC,
MANAGER,
NONE,
TABLET_INGEST,
TABLET_MANAGEMENT,
TABLET_SCAN,
TASK_RUNNER,
TASK_MANAGER,
TSERV
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.tasks.thrift.TaskManager.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManagerThriftClient extends ThriftClientTypes<Client>
implements ManagerClient<Client> {

private static Logger LOG = LoggerFactory.getLogger(TaskManagerThriftClient.class);

public TaskManagerThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
}

@Override
public Client getConnection(ClientContext context) {
return getManagerConnection(LOG, this, context);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.compaction.thrift.CompactorService.Client;
import org.apache.accumulo.core.tasks.thrift.TaskRunner.Client;

public class CompactorServiceThriftClient extends ThriftClientTypes<Client> {
public class TaskRunnerThriftClient extends ThriftClientTypes<Client> {

CompactorServiceThriftClient(String serviceName) {
TaskRunnerThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ public abstract class ThriftClientTypes<C extends TServiceClient> {

public static final ClientServiceThriftClient CLIENT = new ClientServiceThriftClient("client");

public static final CompactorServiceThriftClient COMPACTOR =
new CompactorServiceThriftClient("compactor");

public static final CompactionCoordinatorServiceThriftClient COORDINATOR =
new CompactionCoordinatorServiceThriftClient("coordinator");

public static final FateThriftClient FATE = new FateThriftClient("fate");

public static final GCMonitorServiceThriftClient GC = new GCMonitorServiceThriftClient("gc");
Expand All @@ -58,6 +52,12 @@ public abstract class ThriftClientTypes<C extends TServiceClient> {
public static final TabletManagementClientServiceThriftClient TABLET_MGMT =
new TabletManagementClientServiceThriftClient("tablet");

public static final TaskRunnerThriftClient TASK_RUNNER =
new TaskRunnerThriftClient("task_runner");

public static final TaskManagerThriftClient TASK_MANAGER =
new TaskManagerThriftClient("task_mgr");

/**
* execute method with supplied client returning object of type R
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@
* ]}
* </pre>
*
* Note that the use of 'external' requires that the CompactionCoordinator and at least one
* Compactor for Queue1 is running.
* Note that the use of 'external' at least one Compactor for Queue1 is running.
* <li>{@code tserver.compaction.major.service.<service>.opts.maxOpen} This determines the maximum
* number of files that will be included in a single compaction.
* </ul>
Expand Down
84 changes: 84 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/tasks/TaskMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.tasks;

import org.apache.accumulo.core.tasks.thrift.Task;
import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter;
import org.apache.accumulo.core.util.json.GsonIgnoreExclusionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

/**
* Generic object that can be used to transport TaskMessage subclasses over Thrift using the Thrift
* Task type. Implementations of this class are serialized to JSON then transported via Task and
* deserialized on the other side.
*/
public abstract class TaskMessage {

private static final Logger LOG = LoggerFactory.getLogger(TaskMessage.class);

@SuppressWarnings("unchecked")
public static <T extends TaskMessage> T fromThiftTask(Task task, TaskMessageType expectedType) {
TaskMessageType type = TaskMessageType.valueOf(task.getMessageType());
Preconditions.checkState(type == expectedType,
"Task is of type: " + type + ", expected: " + expectedType);
T decodedMsg = (T) TaskMessage.GSON_FOR_TASKS.fromJson(task.getMessage(), type.getTaskClass());
LOG.trace("Received {}", TaskMessage.GSON_FOR_TASKS.toJson(decodedMsg));
return decodedMsg;
}

private static final Gson GSON_FOR_TASKS =
new GsonBuilder().setExclusionStrategies(new GsonIgnoreExclusionStrategy())
.registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();

private String taskId;
private TaskMessageType type;

public TaskMessage() {}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}

void setMessageType(TaskMessageType type) {
this.type = type;
}

public TaskMessageType getMessageType() {
return type;
}

public Task toThriftTask() {
Task t = new Task();
t.setTaskId(getTaskId());
t.setMessageType(getMessageType().name());
t.setMessage(TaskMessage.GSON_FOR_TASKS.toJson(this));
LOG.trace("Sending {}", TaskMessage.GSON_FOR_TASKS.toJson(this));
return t;
}

}
Loading