Skip to content

Conversation

dlmarion
Copy link
Contributor

@dlmarion dlmarion commented Oct 2, 2023

No description provided.

@dlmarion dlmarion self-assigned this Oct 2, 2023
@dlmarion
Copy link
Contributor Author

dlmarion commented Oct 2, 2023

There is still more work to be done here, but wanted to get some feedback on:

  1. The Thrift messaging / RPC changes
  2. The changes to Compactor (which needs to be renamed to TaskWorker) around the TASK_RUNNER_WORKER_TYPE property.

The basic gist of the design here is:

  1. A TaskWorker process (currently known as Compactor) is started with the property TASK_RUNNER_WORKER_TYPE, which has values COMPACTION, LOG_SORTING, SPLIT_POINT_CALCULATION. That instance of the TaskWorker only perform jobs of the configured type.
  2. The TaskWorker gets the next job from the TaskManager (currently known as the CompactionCoordinator) by making a call via the Tasks Thrift API to the Manager.
  3. The TaskManager returns the next highest priority task of the given type (COMPACTION, LOG_SORTING, SPLIT_CALCULATION).
  4. The TaskWorker executes the Task and reports status back the TaskManager.

If I had to guess, I'm probably about 80-85% done at this point.

public void start() {
long interval = this.context.getConfiguration()
.getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
.getTimeInMillis(Property.TASK_MANAGER_DEAD_COMPACTOR_CHECK_INTERVAL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing to do in this PR, but I think in general we may need reconsider some of the property prefixes. Opened #3803 for this. For this property maybe it should have a another prefix that is not related to where its running, but is more related to the compaction functionality. Not sure though, would be best to consider all properties are once for this instead of looking at this one in isolation.

/*
* Called by the Monitor to get progress information
*/
Task getRunningTasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is soley for discussion, not recommending any changes. Thinking it would be best to push as much serialization into thrift as possible for things that are common. Took a stab at this below, but not sure if its workable.

// renamed from WorkerType
enum TaskType {
  COMPACTION
  LOG_SORTING
  SPLIT_POINT_CALCULATION
}

enum TaskState {
   RUNNING,
   COMPLETE,
   FAILED
}

// used to report the status of a task
struct TaskStatus {
    string taskId
    long startTime  // time a worker started running the task
    TaskType taskType
    TaskState taskState
    byte[] data // status data specific to the taskType and taskState
}

list<TaskStatus> getRunningTasks(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no issue renaming WorkerType to something like TaskRunnerType. I'm open to discussing any changes.

@dlmarion dlmarion marked this pull request as ready for review October 3, 2023 17:37
@keith-turner
Copy link
Contributor

@dlmarion if you have some time to chat sometime I would like to discuss some questions I have. Wondering about the following.

Where things run? Maybe instead of having a task runner executable, task runner is more of an internal code library that user facing executable components instantiate. For example thinking through what the user facing accumulo commands could look like and what they could do.

accumulo compactor  -- runs compactions, instantiates a task runner in its impl to make this happen
accumulo tserver    -- hosts tablets and does log sorts, it runs a task runner inside of a tablet server process to do log sorts
accumulo manager    -- runs fate, assigns tablets, does split calculations... non-primary manager processes can run a task runner process to do split calculations

What task return? Maybe nothing, could we structure all task such that there is no return of data to manager? A task runner gets a task from the manager, runs it, and when done gets another task. It never reports completion to the manager or status.

  • compaction task run the compaction and commit it to the metadata table as part of the task
  • compaction task do not report stats back to the manager, but only to the metrics system. Could the monitor get the data it needs? Maybe the monitor contacts task runners directly if it wants info?
  • log sort task sort the logs and create the appropriate dirs in hdfs when done
  • split task could update the metadata table with the needed information instead of reporting back

If task do not return anything, then that simplifies the thrift API and the manager possibly. Would not need to worry about keeping info in memory in the manager and keeping that info consistent and avoiding using to much memory.

@dlmarion
Copy link
Contributor Author

dlmarion commented Oct 6, 2023

@dlmarion if you have some time to chat sometime I would like to discuss some questions I have. Wondering about the following.

Yeah, let's find a time.

Where things run? Maybe instead of having a task runner executable, task runner is more of an internal code library that user facing executable components instantiate. For example thinking through what the user facing accumulo commands could look like and what they could do.

accumulo compactor  -- runs compactions, instantiates a task runner in its impl to make this happen
accumulo tserver    -- hosts tablets and does log sorts, it runs a task runner inside of a tablet server process to do log sorts
accumulo manager    -- runs fate, assigns tablets, does split calculations... non-primary manager processes can run a task runner process to do split calculations

With on-demand tablets it's possible that a user only has enough TabletServers running to host the root and metadata tables - that would be my only concern with doing log sorting in the TabletServer. I do like the idea of non-primary managers doing some of this work (which BTW I had working a while back in #3262 ). If we could leverage the non-primary managers, then this PR could likely be closed leaving the compactors the way that they are today. I think we should explore that idea sooner rather than later. There was another item mentioned in #3796 - compaction selection functionality. If we performed compaction selection and split calculations in the non-primary manager, left compactors the way they are today, then we just need a server component to run log sorting. IIRC from #3262 you can have multiple non-primary managers.

What task return? Maybe nothing, could we structure all task such that there is no return of data to manager? A task runner gets a task from the manager, runs it, and when done gets another task. It never reports completion to the manager or status.

  • compaction task run the compaction and commit it to the metadata table as part of the task
  • compaction task do not report stats back to the manager, but only to the metrics system. Could the monitor get the data it needs? Maybe the monitor contacts task runners directly if it wants info?
  • log sort task sort the logs and create the appropriate dirs in hdfs when done
  • split task could update the metadata table with the needed information instead of reporting back

If task do not return anything, then that simplifies the thrift API and the manager possibly. Would not need to worry about keeping info in memory in the manager and keeping that info consistent and avoiding using to much memory.

@keith-turner
Copy link
Contributor

keith-turner commented Oct 6, 2023

If we could leverage the non-primary managers, then this PR could likely be closed leaving the compactors the way that they are today.

I think this PR is still, useful but maybe scoped down. Thinking of this at a really high level and seeing the following. What I am trying to puzzle out is where actual code links in to these high level concepts. Also, is the following the complete high level picture?

Generalizing distributed work in Accumulo

  1. Work must be found, currently TGW+TabletMgtmIterator for log sort and compaction
  2. Work must partitioned and prioritized (multiple in memory bounded prio queues that can replace stuff)
  3. Workers need to find/request work (thrift task RPCs in this PR)
  4. Work needs to be done (may need to emit status and/or metrics about work).
  5. Results of work needs to be committed (this is highly dependent on the work)

@dlmarion
Copy link
Contributor Author

dlmarion commented Oct 6, 2023

If we could leverage the non-primary managers, then this PR could likely be closed leaving the compactors the way that they are today.

I think this PR is still, useful but maybe scoped down. Thinking of this at a really high level and seeing the following. What I am trying to puzzle out is where actual code links in to these high level concepts. Also, is the following the complete high level picture?

Generalizing distributed work in Accumulo

  1. Work must be found, currently TGW+TabletMgtmIterator for log sort and compaction
  2. Work must partitioned and prioritized (multiple in memory bounded prio queues that can replace stuff)
  3. Workers need to find/request work (thrift task RPCs in this PR)
  4. Work needs to be done (may need to emit status and/or metrics about work).
  5. Results of work needs to be committed (this is highly dependent on the work)

There is also the issue of checking for dead workers so that the task can be queued up again. We have this today in the Manager with the DeadCompactionDetector.

@ctubbsii ctubbsii added this to the 4.0.0 milestone Jul 12, 2024
@dlmarion dlmarion changed the base branch from elasticity to main August 26, 2024 12:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Convert Compactor/Coordinator into generic distributed job execution service

3 participants