-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add schema registry and migrator components #9710
Open
kselveliev
wants to merge
37
commits into
main
Choose a base branch
from
09260-add-schema-registry-component
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
ce8ef43
Implement MirrorNodeState
bilyana-gospodinova 804eae1
feat: Add schema registry component for reusable services
kselveliev e00abba
feat: Add service migrator similar to services
kselveliev dda0c80
Implement MirrorNodeState
bilyana-gospodinova 7b1d58d
Add dynamic service configuration
bilyana-gospodinova b67223f
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev dd72bd0
fix: Uncomment needed lines in schema registry after state changes
kselveliev c2702b7
fix: Refactor migrator and networkInfo for successful state initializ…
kselveliev 5dded46
fix: Addressing review comments
kselveliev cceb9e5
fix: Addressing review comments
kselveliev b38d57d
Implement MirrorNodeState
bilyana-gospodinova f1e940e
Add dynamic service configuration
bilyana-gospodinova ea49673
Fix code smell
bilyana-gospodinova 2b598bf
Increase code coverage
bilyana-gospodinova f0d9d34
sync with services more
bilyana-gospodinova cd96150
Update the state as in example
bilyana-gospodinova 2fede4f
Fix code smells and improve coverage
bilyana-gospodinova afcf416
Fix code smells
bilyana-gospodinova b8ae0f0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev afbf84a
Implement MirrorNodeState
bilyana-gospodinova 90e20a1
Add dynamic service configuration
bilyana-gospodinova 7917a71
Fix code smell
bilyana-gospodinova 8c730bb
Increase code coverage
bilyana-gospodinova d7b3da2
sync with services more
bilyana-gospodinova 8fe36b1
Update the state as in example
bilyana-gospodinova 532e291
Fix code smells and improve coverage
bilyana-gospodinova 5116cba
Fix code smells
bilyana-gospodinova 49cdcfb
Fix PR comments
bilyana-gospodinova bdefa11
Improve coverage
bilyana-gospodinova fa57b07
Increase coverage
bilyana-gospodinova 28a2912
Increase coverage (again)
bilyana-gospodinova b724cbb
Rename packages
bilyana-gospodinova 18e6dd0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev 9715335
Merge branch 'main' into 09260-add-schema-registry-component
kselveliev 7f32049
fix: Remove duplicated classes
kselveliev 62d40fa
fix: Fix code smells
kselveliev 7feac7d
fix: Fix code smells
kselveliev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
221 changes: 221 additions & 0 deletions
221
...mirror-web3/src/main/java/com/hedera/mirror/web3/state/components/SchemaRegistryImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
/* | ||
* Copyright (C) 2024 Hedera Hashgraph, LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.hedera.mirror.web3.state.components; | ||
|
||
import static com.hedera.node.app.state.merkle.SchemaApplicationType.MIGRATION; | ||
import static com.hedera.node.app.state.merkle.SchemaApplicationType.RESTART; | ||
import static com.hedera.node.app.state.merkle.SchemaApplicationType.STATE_DEFINITIONS; | ||
|
||
import com.hedera.hapi.node.base.SemanticVersion; | ||
import com.hedera.mirror.web3.state.MirrorNodeState; | ||
import com.hedera.mirror.web3.state.core.MapWritableStates; | ||
import com.hedera.node.app.spi.state.FilteredReadableStates; | ||
import com.hedera.node.app.spi.state.FilteredWritableStates; | ||
import com.hedera.node.app.state.merkle.SchemaApplications; | ||
import com.swirlds.config.api.Configuration; | ||
import com.swirlds.config.api.ConfigurationBuilder; | ||
import com.swirlds.state.spi.MigrationContext; | ||
import com.swirlds.state.spi.ReadableStates; | ||
import com.swirlds.state.spi.Schema; | ||
import com.swirlds.state.spi.SchemaRegistry; | ||
import com.swirlds.state.spi.WritableStates; | ||
import com.swirlds.state.spi.info.NetworkInfo; | ||
import jakarta.annotation.Nonnull; | ||
import jakarta.annotation.Nullable; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentLinkedDeque; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import lombok.Getter; | ||
import lombok.RequiredArgsConstructor; | ||
|
||
@RequiredArgsConstructor | ||
public class SchemaRegistryImpl implements SchemaRegistry { | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
public static final SemanticVersion CURRENT_VERSION = new SemanticVersion(0, 47, 0, "SNAPSHOT", ""); | ||
|
||
private final SchemaApplications schemaApplications; | ||
|
||
/** | ||
* The ordered set of all schemas registered by the service | ||
*/ | ||
@Getter | ||
private final SortedSet<Schema> schemas = new TreeSet<>(); | ||
|
||
@Override | ||
public SchemaRegistry register(@Nonnull Schema schema) { | ||
schemas.remove(schema); | ||
schemas.add(schema); | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return this; | ||
} | ||
|
||
@SuppressWarnings("rawtypes") | ||
public void migrate( | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Nonnull final String serviceName, | ||
@Nonnull final MirrorNodeState state, | ||
@Nonnull final NetworkInfo networkInfo) { | ||
migrate( | ||
serviceName, | ||
state, | ||
CURRENT_VERSION, | ||
networkInfo, | ||
ConfigurationBuilder.create().build(), | ||
new HashMap<>(), | ||
new AtomicLong()); | ||
} | ||
|
||
public void migrate( | ||
@Nonnull final String serviceName, | ||
@Nonnull final MirrorNodeState state, | ||
@Nullable final SemanticVersion previousVersion, | ||
@Nonnull final NetworkInfo networkInfo, | ||
@Nonnull final Configuration config, | ||
@Nonnull final Map<String, Object> sharedValues, | ||
@Nonnull final AtomicLong nextEntityNum) { | ||
if (schemas.isEmpty()) { | ||
return; | ||
} | ||
|
||
// For each schema, create the underlying raw data sources (maps, or lists) and the writable states that | ||
// will wrap them. Then call the schema's migrate method to populate those states, and commit each of them | ||
// to the underlying data sources. At that point, we have properly migrated the state. | ||
final var latestVersion = schemas.getLast().getVersion(); | ||
|
||
for (final var schema : schemas) { | ||
final var applications = | ||
schemaApplications.computeApplications(previousVersion, latestVersion, schema, config); | ||
final var readableStates = state.getReadableStates(serviceName); | ||
final var previousStates = new FilteredReadableStates(readableStates, readableStates.stateKeys()); | ||
final WritableStates writableStates; | ||
final WritableStates newStates; | ||
if (applications.contains(STATE_DEFINITIONS)) { | ||
final var redefinedWritableStates = applyStateDefinitions(serviceName, schema, config, state); | ||
writableStates = redefinedWritableStates.beforeStates(); | ||
newStates = redefinedWritableStates.afterStates(); | ||
} else { | ||
newStates = writableStates = state.getWritableStates(serviceName); | ||
} | ||
final var context = newMigrationContext( | ||
previousVersion, previousStates, newStates, config, networkInfo, nextEntityNum, sharedValues); | ||
if (applications.contains(MIGRATION)) { | ||
schema.migrate(context); | ||
} | ||
if (applications.contains(RESTART)) { | ||
schema.restart(context); | ||
} | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (writableStates instanceof MapWritableStates mws) { | ||
mws.commit(); | ||
} | ||
|
||
// And finally we can remove any states we need to remove | ||
schema.statesToRemove().forEach(stateKey -> state.removeServiceState(serviceName, stateKey)); | ||
} | ||
} | ||
|
||
public MigrationContext newMigrationContext( | ||
@Nullable final SemanticVersion previousVersion, | ||
@Nonnull final ReadableStates previousStates, | ||
@Nonnull final WritableStates writableStates, | ||
@Nonnull final Configuration config, | ||
@Nonnull final NetworkInfo networkInfo, | ||
@Nonnull final AtomicLong nextEntityNum, | ||
@Nonnull final Map<String, Object> sharedValues) { | ||
return new MigrationContext() { | ||
@Override | ||
public void copyAndReleaseOnDiskState(String stateKey) { | ||
// No-op | ||
} | ||
Check warning on line 146 in hedera-mirror-web3/src/main/java/com/hedera/mirror/web3/state/components/SchemaRegistryImpl.java Codecov / codecov/patchhedera-mirror-web3/src/main/java/com/hedera/mirror/web3/state/components/SchemaRegistryImpl.java#L146
|
||
|
||
@Override | ||
public SemanticVersion previousVersion() { | ||
return previousVersion; | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public ReadableStates previousStates() { | ||
return previousStates; | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public WritableStates newStates() { | ||
return writableStates; | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public Configuration configuration() { | ||
return config; | ||
} | ||
|
||
@Override | ||
public NetworkInfo networkInfo() { | ||
return networkInfo; | ||
} | ||
|
||
@Override | ||
public long newEntityNum() { | ||
return nextEntityNum.getAndIncrement(); | ||
} | ||
|
||
@Override | ||
public Map<String, Object> sharedValues() { | ||
return sharedValues; | ||
} | ||
}; | ||
} | ||
|
||
private RedefinedWritableStates applyStateDefinitions( | ||
@Nonnull final String serviceName, | ||
@Nonnull final Schema schema, | ||
@Nonnull final Configuration configuration, | ||
@Nonnull final MirrorNodeState state) { | ||
final Map<String, Object> stateDataSources = new HashMap<>(); | ||
schema.statesToCreate(configuration).forEach(def -> { | ||
if (def.singleton()) { | ||
stateDataSources.put(def.stateKey(), new AtomicReference<>()); | ||
} else if (def.queue()) { | ||
stateDataSources.put(def.stateKey(), new ConcurrentLinkedDeque<>()); | ||
} else { | ||
stateDataSources.put(def.stateKey(), new ConcurrentHashMap<>()); | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}); | ||
|
||
state.addService(serviceName, stateDataSources); | ||
|
||
final var statesToRemove = schema.statesToRemove(); | ||
final var writableStates = state.getWritableStates(serviceName); | ||
final var remainingStates = new HashSet<>(writableStates.stateKeys()); | ||
remainingStates.removeAll(statesToRemove); | ||
final var newStates = new FilteredWritableStates(writableStates, remainingStates); | ||
return new RedefinedWritableStates(writableStates, newStates); | ||
} | ||
|
||
/** | ||
* Encapsulates the writable states before and after applying a schema's state definitions. | ||
* | ||
* @param beforeStates the writable states before applying the schema's state definitions | ||
* @param afterStates the writable states after applying the schema's state definitions | ||
*/ | ||
private record RedefinedWritableStates(WritableStates beforeStates, WritableStates afterStates) {} | ||
} |
99 changes: 99 additions & 0 deletions
99
...irror-web3/src/main/java/com/hedera/mirror/web3/state/components/ServiceMigratorImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright (C) 2024 Hedera Hashgraph, LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.hedera.mirror.web3.state.components; | ||
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
import com.hedera.hapi.block.stream.output.StateChanges.Builder; | ||
import com.hedera.hapi.node.base.SemanticVersion; | ||
import com.hedera.mirror.web3.state.MirrorNodeState; | ||
import com.hedera.node.app.services.ServiceMigrator; | ||
import com.hedera.node.app.services.ServicesRegistry; | ||
import com.hedera.node.config.data.HederaConfig; | ||
import com.swirlds.config.api.Configuration; | ||
import com.swirlds.metrics.api.Metrics; | ||
import com.swirlds.platform.system.SoftwareVersion; | ||
import com.swirlds.state.State; | ||
import com.swirlds.state.spi.info.NetworkInfo; | ||
import jakarta.annotation.Nonnull; | ||
import jakarta.annotation.Nullable; | ||
import jakarta.inject.Named; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
@Named | ||
public class ServiceMigratorImpl implements ServiceMigrator { | ||
steven-sheehy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Override | ||
public List<Builder> doMigrations( | ||
@Nonnull State state, | ||
@Nonnull ServicesRegistry servicesRegistry, | ||
@Nullable SoftwareVersion previousVersion, | ||
@Nonnull SoftwareVersion currentVersion, | ||
@Nonnull Configuration config, | ||
@Nonnull NetworkInfo networkInfo, | ||
@Nonnull Metrics metrics) { | ||
requireNonNull(state); | ||
requireNonNull(servicesRegistry); | ||
requireNonNull(currentVersion); | ||
requireNonNull(config); | ||
requireNonNull(networkInfo); | ||
requireNonNull(metrics); | ||
|
||
if (!(state instanceof MirrorNodeState mirrorNodeState)) { | ||
throw new IllegalArgumentException("Can only be used with MirrorNodeState instances"); | ||
} | ||
|
||
if (!(servicesRegistry instanceof ServicesRegistryImpl registry)) { | ||
throw new IllegalArgumentException("Can only be used with ServicesRegistryImpl instances"); | ||
} | ||
|
||
final AtomicLong prevEntityNum = | ||
new AtomicLong(config.getConfigData(HederaConfig.class).firstUserEntity() - 1); | ||
final Map<String, Object> sharedValues = new HashMap<>(); | ||
final var deserializedPbjVersion = Optional.ofNullable(previousVersion) | ||
.map(SoftwareVersion::getPbjSemanticVersion) | ||
.orElse(null); | ||
|
||
registry.registrations().stream().forEach(registration -> { | ||
if (!(registration.registry() instanceof SchemaRegistryImpl schemaRegistry)) { | ||
throw new IllegalArgumentException("Can only be used with SchemaRegistryImpl instances"); | ||
} | ||
schemaRegistry.migrate( | ||
registration.serviceName(), | ||
mirrorNodeState, | ||
deserializedPbjVersion, | ||
networkInfo, | ||
config, | ||
sharedValues, | ||
prevEntityNum); | ||
}); | ||
return List.of(); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public SemanticVersion creationVersionOf(@Nonnull State state) { | ||
if (!(state instanceof MirrorNodeState)) { | ||
throw new IllegalArgumentException("Can only be used with MirrorNodeState instances"); | ||
} | ||
return null; | ||
} | ||
} |
57 changes: 57 additions & 0 deletions
57
...rror-web3/src/main/java/com/hedera/mirror/web3/state/components/ServicesRegistryImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright (C) 2024 Hedera Hashgraph, LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.hedera.mirror.web3.state.components; | ||
|
||
import com.hedera.node.app.services.ServicesRegistry; | ||
import com.hedera.node.app.state.merkle.SchemaApplications; | ||
import com.swirlds.state.spi.Service; | ||
import jakarta.annotation.Nonnull; | ||
import jakarta.inject.Named; | ||
import java.util.Collections; | ||
import java.util.Set; | ||
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
|
||
@Named | ||
public class ServicesRegistryImpl implements ServicesRegistry { | ||
|
||
private final SortedSet<Registration> entries = new TreeSet<>(); | ||
|
||
@Nonnull | ||
@Override | ||
public Set<Registration> registrations() { | ||
return Collections.unmodifiableSortedSet(entries); | ||
} | ||
|
||
@Override | ||
public void register(@Nonnull Service service) { | ||
final var registry = new SchemaRegistryImpl(new SchemaApplications()); | ||
service.registerSchemas(registry); | ||
entries.add(new ServicesRegistryImpl.Registration(service, registry)); | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public ServicesRegistry subRegistryFor(@Nonnull String... serviceNames) { | ||
final var selections = Set.of(serviceNames); | ||
final var subRegistry = new ServicesRegistryImpl(); | ||
subRegistry.entries.addAll(entries.stream() | ||
.filter(registration -> selections.contains(registration.serviceName())) | ||
.toList()); | ||
return subRegistry; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will be used when we build the state. That is why this change is in the PR