Skip to content

Conversation

dlmarion
Copy link
Contributor

@dlmarion dlmarion commented Mar 27, 2023

I have modified the Manager code in the following ways.

  1. Modified each Manager to put its address in ZooKeeper (at ZMANAGERS/<>), just like the other services do
  2. Modified the Manager such that the instance that grabs the lock at ZMANAGER_LOCK is the "primary" manager (see MultipleManagerLockIT, which passes)
  3. The "primary" manager will be responsible for FaTE transactions, compaction coordination, and root and metadata table assignment/balancing
  4. Modified the TabletGroupWatcher, in the user table case, to try and distribute management of user tables tablets evenly, but at table boundaries (see TabletGroupWatcher, MultipleManagerUtil).

@dlmarion dlmarion linked an issue Apr 14, 2023 that may be closed by this pull request
@dlmarion dlmarion self-assigned this Jul 20, 2023
@dlmarion dlmarion marked this pull request as ready for review July 20, 2023 21:00
@dlmarion dlmarion mentioned this pull request Nov 22, 2023
Comment on lines +76 to +78
if (!sa[0].startsWith("/")) {
path += "/";
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this because for some reason sa[0] starts with a / for the MANAGERS locks.

try {
List<String> locations = getContext().getManagerLocations();
if (locations.isEmpty()) {
String location = getContext().getPrimaryManagerLocation();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be any Manager?

@dlmarion
Copy link
Contributor Author

Need to wire up the multiple managers in accumulo-cluster. I may have lost that in the rebase.

@dlmarion
Copy link
Contributor Author

Need to wire up the multiple managers in accumulo-cluster. I may have lost that in the rebase.

Looked into this, accumulo-cluster already supports multiple managers - active/backup pair capability that already exists.

return -1 * result;
}
});
tables.forEach(tid -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the set of managers and tables are steady for a bit, all manager processes need to arrive at the same decisions for partitioning tables into buckets. With the algorithm in this method different manager processes may see different counts for the same tables at different times and end up partitioning tables into different buckets. This could lead to overlap in the partitions or in the worst case a table that no manager processes. We could start with a deterministic hash partitioning of tables and open a follow in issue to improve. One possible way to improve would be to have a single manager process run this algorithm and publish the partitioning information, with all other manager just using it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fluo partitions work among workers using an approach of having one process determines the partitions and puts those in ZK. Then all other workers use those partitions from ZK to know what to work on. This approach allows all of the workers to eventually settle on the same partitions which is what is needed here. Posting the Fluo code to show that its not a lot of code and encapsulates nicely. We could have a TabletManagementPartitioner that is created and tested as a stand alone task in its own PR that does this.

// Override the set of table ranges that this manager will manage
Optional<List<Range>> ranges = Optional.empty();
if (store.getLevel() == Ample.DataLevel.USER) {
ranges = calculateRangesForMultipleManagers(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this, was getting a bit lost. I think this may return Optional.empty() with the expectation that the last computation will be used when empty is returned, but I am not sure that is actually being done.

@keith-turner
Copy link
Contributor

keith-turner commented Nov 28, 2023

Was anything done re upgrade in this PR? If not I think we need to open a follow in issue for that.

This would be a follow on issue, thinking we could distribute the compaction coordinator by having it hash parition queue names. among manager processes. TGW could make an RPC to add a job to a remote queue. Compaction coordinators could hash the name to find the manager process to ask for work.

In this PR it seems like the TGW is adding compaction jobs to a local queue in the process. What do compactors do to find jobs?

We may need to make the EventCoordinator use the same partitioning as the TGW and send events to other manager processes via a new async RPC. Need to analyze the EventCoordinator, may make sense to pull it in to the TGW conceptually. Every manager uses it local TGW instance to signal events and internally the TGW code knows how to route that in the cluster to other TGW instances.

@dlmarion
Copy link
Contributor Author

This PR isn't close to being merge-able. I wanted to get feedback on the approach before spending a larger amount of time making it ready. W/r/t the approach, I'm talking about the locking, the concept of a primary manager, how work is distributed across that managers, how the Monitor is supposed to work, etc.

@dlmarion
Copy link
Contributor Author

When the set of managers and tables are steady for a bit, all manager processes need to arrive at the same decisions for partitioning tables into buckets. With the algorithm in this method different manager processes may see different counts for the same tables at different times and end up partitioning tables into different buckets. This could lead to overlap in the partitions or in the worst case a table that no manager processes. We could start with a deterministic hash partitioning of tables and open a follow in issue to improve. One possible way to improve would be to have a single manager process run this algorithm and publish the partitioning information, with all other manager just using it.

This would be a follow on issue, thinking we could distribute the compaction coordinator by having it hash parition queue names. among manager processes. TGW could make an RPC to add a job to a remote queue. Compaction coordinators could hash the name to find the manager process to ask for work.

We may need to make the EventCoordinator use the same partitioning as the TGW and send events to other manager processes via a new async RPC. Need to analyze the EventCoordinator, may make sense to pull it in to the TGW conceptually. Every manager uses it local TGW instance to signal events and internally the TGW code knows how to route that in the cluster to other TGW instances.

I'm now concerned that this is going to be overly complex - lot's of moving parts with the potential for multiple managers to claim ownership of the same object, or using some external process (ZK) to coordinate which Manager is responsible for a specific object. The Multiple Manager implementation in this PR is based off this design, which has multiple managers try to manage everything.

I think there may be a simpler way as we have already introduced a natural partitioning mechanism - resource groups. I went back and looked in the wiki and you (@keith-turner ) had a very similar idea at the bottom of this page. So, instead of having a single set of Managers try to manage everything, you would have a single Manager manage tablets, compactions, and Fate for all of the tables that map to a specific resource group. We could continue to have the active/backup Manager feature that we have today, but per resource group. This also solves the Monitor problem. If we look at this using the cluster.yaml file it would go from what we have today:

manager:
  - localhost

monitor:
  - localhost

gc:
  - localhost

tserver:
  default:
    - localhost
  group1:
    - localhost

compactor:
  accumulo_meta:
    - localhost
  user_small:
    - localhost
  user_large:
    - localhost

sserver:
  default:
    - localhost
  group1:
    - localhost    

to something like:

default:
  manager:
    - localhost
  monitor:
    - localhost
  gc:
    - localhost
  tserver:
    - localhost
  compactor:
    accumulo_meta:
      - localhost
    user_small:
      - localhost
    user_large:
      - localhost
  sserver:
    default:
      - localhost
      
group1:
  manager:
    - localhost
  monitor:
    - localhost
  gc:
    - localhost
  tserver:
    - localhost
  compactor:
    accumulo_meta:
      - localhost
    user_small:
      - localhost
    user_large:
      - localhost
  sserver:
    default:
      - localhost

@keith-turner
Copy link
Contributor

So, instead of having a single set of Managers try to manage everything, you would have a single Manager manage tablets, compactions, and Fate for all of the tables that map to a specific resource group.

Thats an interesting concept that I think would be good to explore further. I have alot of questions about specifics. Like how will fate data be partitioned in storage? How will user assign tables to manager resource groups? What manager would RPC operations that are not table related use? Thinking confluence documents would be a better place to explore this rather than here in issue.

This proposed design precludes scale out for metadata processing for a single table. The experimentation I was doing in #3964 allows FATE to scale across many processes. I was thinking if a single table creates 10K fate operations all of a sudden, then if FATE is running on many manager processes they all could all start working on them. I would like to explore scaling out more for the managers different fucntional components, I can work on exploring that further and post what I find in confluence documents. Would like to determine what all of the hurdles are to scaling out and what the possible solutions are before deciding not pursue it.

@dlmarion
Copy link
Contributor Author

How will user assign tables to manager resource groups?

Regarding this, the current mechanism is described in the Resource Groups section of https://cwiki.apache.org/confluence/display/ACCUMULO/A+More+Cost+Efficient+Accumulo. I was thinking that it would make sense to promote the table.custom.assignment.group property to a fixed property. Users would set this property on a table and would need to define a resource group with the same name so that the Manager could manage its tablets. Clients that connect to the Manager to perform table operations or initiate user Fate transactions would need to use the table property to determine which Manager to connect to (using the Resource Group name). The resource group name is already part of the ServiceLock object, so for example, we would modify ClientContext.getManagerLocations to iterate over the Managers in ZooKeeper and return the one with the correct resource group. Another option, if resource groups are a first class citizen, is to put the resource group name in the ZK path although that could lead to needing cleanup if a resource group was created and then removed. I haven't thought of everything, but this approach does solve some problems.

What manager would RPC operations that are not table related use?

I think for instance-level operations and maybe even namespace level operations, any Manager could be used.

Thinking confluence documents would be a better place to explore this rather than here in issue.

I can start a new Confluence page

@dlmarion
Copy link
Contributor Author

Feedback requested on https://cwiki.apache.org/confluence/display/ACCUMULO/Using+Resource+Groups+as+an+implementation+of+Multiple+Managers. Are there other options to consider, what other information should I add?

@keith-turner
Copy link
Contributor

Feedback requested on https://cwiki.apache.org/confluence/display/ACCUMULO/Using+Resource+Groups+as+an+implementation+of+Multiple+Managers. Are there other options to consider, what other information should I add?

I updated the above document with another possible solution. Thinking that this PR and #3964 are already heading in the direction of that solution. I still have a lot of uncertainty and I was thinking about how to reduce that. Thought of the following.

  1. We can start running scale test now that abuse the current code. By doing this we may learn new things that help us make more informed decisions. I opened Create a split scaling test accumulo-testing#266 for this and created other items for scale test as TODOs on the elasticity board.
  2. We can reorganize the manager code to make the functional services in the manager more explicit. I opened Reorganize manager code #4005 for this. I am going to take a shot are reorganizing just one thing int he manager as described in that issue to see what it looks like.
  3. Would be good to chat sometime as mentioned in slack

Warning this is not a fully formed thought. #3964 took a bottom up approach to scaling the manager and this PR is taking a top down approach. Was wondering about taking some of the things in the PR and creating something more focused on just distributing tablet management like #3964 is just for distributing FATE. However, tablet managment is not as cleanly self contained in the code as FATE is, so its harder to do that. That is one reason I opened #4005. It would be nice to have an IT test that creates multiple tablet management objects each with different partitions and verifies that. #3694 has test like this for FATE.

@ctubbsii ctubbsii added this to the 4.0.0 milestone Jul 12, 2024
@dlmarion dlmarion changed the base branch from elasticity to main August 26, 2024 12:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Multiple Managers: Create the ability to have multiple Managers

3 participants