From 8f93c3173f1a9d05b05d6817e51eb1f015a91505 Mon Sep 17 00:00:00 2001 From: Andrea Boriero Date: Thu, 23 Jan 2025 16:34:47 +0100 Subject: [PATCH] [#1905] Reactive find with lock in Quarkus with reactive hibernate --- .../ReactivePersistenceContextAdapter.java | 93 +++++++++++++++++++ .../engine/impl/ReactiveCallbackImpl.java | 72 ++++++++++++++ .../DefaultReactivePostLoadEventListener.java | 1 + .../ReactiveNaturalIdLoaderDelegate.java | 4 +- .../internal/ReactiveSingleIdLoadPlan.java | 24 +++-- ...veSingleUniqueKeyEntityLoaderStandard.java | 4 +- .../spi/ReactiveAbstractSelectionQuery.java | 11 +++ .../sql/internal/ReactiveNativeQueryImpl.java | 6 ++ .../sqm/internal/ReactiveQuerySqmImpl.java | 6 ++ .../ReactiveSqmSelectionQueryImpl.java | 6 ++ .../reactive/session/ReactiveSession.java | 2 + .../session/impl/ReactiveSessionImpl.java | 6 ++ .../ReactiveDeferredResultSetAccess.java | 52 ++++++++++- .../spi/ReactiveListResultsConsumer.java | 32 +++++-- .../spi/ReactiveSingleResultConsumer.java | 11 ++- 15 files changed, 305 insertions(+), 25 deletions(-) create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveCallbackImpl.java diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/engine/internal/ReactivePersistenceContextAdapter.java b/hibernate-reactive-core/src/main/java/org/hibernate/engine/internal/ReactivePersistenceContextAdapter.java index bc5a1f09f..dc6f0b528 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/engine/internal/ReactivePersistenceContextAdapter.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/engine/internal/ReactivePersistenceContextAdapter.java @@ -12,19 +12,27 @@ import org.hibernate.HibernateException; import org.hibernate.collection.spi.PersistentCollection; +import org.hibernate.engine.spi.EntityHolder; import org.hibernate.engine.spi.EntityKey; import org.hibernate.engine.spi.PersistenceContext; import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.event.service.spi.EventListenerGroup; +import org.hibernate.event.spi.PostLoadEvent; +import org.hibernate.event.spi.PostLoadEventListener; import org.hibernate.persister.entity.EntityPersister; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; import org.hibernate.reactive.session.ReactiveSession; +import org.hibernate.sql.exec.spi.Callback; +import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingState; import static java.lang.invoke.MethodHandles.lookup; import static org.hibernate.pretty.MessageHelper.infoString; import static org.hibernate.reactive.logging.impl.LoggerFactory.make; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.loop; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; /** @@ -130,4 +138,89 @@ public Object removeEntity(EntityKey key) { } return result; } + + @Override + public void postLoad(JdbcValuesSourceProcessingState processingState, Consumer holderConsumer) { + throw LOG.nonReactiveMethodCall( "reactivePostLoad(JdbcValuesSourceProcessingState, Consumer) )" ); + } + + /** + * Reactive version of {@link StatefulPersistenceContext#postLoad(JdbcValuesSourceProcessingState, Consumer)} + * + */ + public CompletionStage reactivePostLoad(JdbcValuesSourceProcessingState processingState, Consumer holderConsumer) { + final ReactiveCallbackImpl callback = (ReactiveCallbackImpl) processingState.getExecutionContext().getCallback(); + + if ( processingState.getLoadingEntityHolders() != null ) { + final EventListenerGroup listenerGroup = + getSession().getFactory().getEventListenerGroups().eventListenerGroup_POST_LOAD; + final PostLoadEvent postLoadEvent = processingState.getPostLoadEvent(); + return loop( + processingState.getLoadingEntityHolders(), entityHolder -> + processLoadedEntityHolder( + entityHolder, + listenerGroup, + postLoadEvent, + callback, + holderConsumer + ) + ).thenAccept( unused -> processingState.getLoadingEntityHolders().clear() ); + } + if ( processingState.getReloadedEntityHolders() != null ) { + return loop( + processingState.getLoadingEntityHolders(), entityHolder -> + processLoadedEntityHolder( + entityHolder, + null, + null, + callback, + holderConsumer + ) + ).thenAccept( unused -> processingState.getLoadingEntityHolders().clear() ); + } + return voidFuture(); + } + + /** + * Reactive version of {@link StatefulPersistenceContext#processLoadedEntityHolder(EntityHolder, EventListenerGroup, PostLoadEvent, Callback, Consumer)} + */ + private CompletionStage processLoadedEntityHolder( + EntityHolder holder, + EventListenerGroup listenerGroup, + PostLoadEvent postLoadEvent, + ReactiveCallbackImpl callback, + Consumer holderConsumer) { + if ( holderConsumer != null ) { + holderConsumer.accept( holder ); + } + if ( holder.getEntity() == null ) { + // It's possible that we tried to load an entity and found out it doesn't exist, + // in which case we added an entry with a null proxy and entity. + // Remove that empty entry on post load to avoid unwanted side effects + getEntitiesByKey().remove( holder.getEntityKey() ); + } + else { + if ( postLoadEvent != null ) { + postLoadEvent.reset(); + postLoadEvent.setEntity( holder.getEntity() ) + .setId( holder.getEntityKey().getIdentifier() ) + .setPersister( holder.getDescriptor() ); + listenerGroup.fireEventOnEachListener( + postLoadEvent, + PostLoadEventListener::onPostLoad + ); + if ( callback != null ) { + return callback.invokeReactiveLoadActions( + holder.getEntity(), + holder.getDescriptor(), + getSession() + ).thenAccept( v -> + holder.resetEntityInitialier() + ); + } + } + + } + return voidFuture(); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveCallbackImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveCallbackImpl.java new file mode 100644 index 000000000..6af5cf55f --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveCallbackImpl.java @@ -0,0 +1,72 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.loader.ast.spi.AfterLoadAction; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.sql.exec.spi.Callback; + +import static java.lang.invoke.MethodHandles.lookup; +import static org.hibernate.reactive.logging.impl.LoggerFactory.make; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; + +/** + * Reactive equivalent of {@link org.hibernate.sql.exec.internal.CallbackImpl} + */ +public class ReactiveCallbackImpl implements Callback { + private static final Log LOG = make( Log.class, lookup() ); + + private final List afterLoadActions; + + public ReactiveCallbackImpl() { + this.afterLoadActions = new ArrayList<>( 1 ); + } + + @Override + public void registerAfterLoadAction(AfterLoadAction afterLoadAction) { + throw LOG.nonReactiveMethodCall( "registerReactiveAfterLoadAction(ReactiveCallbackImpl)" ); + } + + public void registerReactiveAfterLoadAction(ReactiveAfterLoadAction afterLoadAction) { + afterLoadActions.add( afterLoadAction ); + } + + @Override + public void invokeAfterLoadActions( + Object entity, + EntityMappingType entityMappingType, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)" ); + } + + public CompletionStage invokeReactiveLoadActions( + Object entity, + EntityMappingType entityMappingType, + SharedSessionContractImplementor session) { + for ( int i = 0; i < afterLoadActions.size(); i++ ) { + afterLoadActions.get( i ).reactiveAfterLoad( entity, entityMappingType, session ); + } + return voidFuture(); + } + + @Override + public boolean hasAfterLoadActions() { + return !afterLoadActions.isEmpty(); + } + + public interface ReactiveAfterLoadAction { + /** + * The action trigger - the {@code entity} is being loaded + */ + CompletionStage reactiveAfterLoad(Object entity, EntityMappingType entityMappingType, SharedSessionContractImplementor session); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java index e1af57441..ae050f079 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java @@ -5,6 +5,7 @@ */ package org.hibernate.reactive.event.impl; + import org.hibernate.AssertionFailure; import org.hibernate.engine.spi.EntityEntry; import org.hibernate.event.spi.EventSource; diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveNaturalIdLoaderDelegate.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveNaturalIdLoaderDelegate.java index ede1c6cf7..08b83cd47 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveNaturalIdLoaderDelegate.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveNaturalIdLoaderDelegate.java @@ -27,6 +27,7 @@ import org.hibernate.metamodel.mapping.NaturalIdMapping; import org.hibernate.query.internal.SimpleQueryOptions; import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.loader.ast.spi.ReactiveNaturalIdLoader; import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor; import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer; @@ -38,7 +39,6 @@ import org.hibernate.sql.ast.tree.select.QuerySpec; import org.hibernate.sql.ast.tree.select.SelectStatement; import org.hibernate.sql.exec.internal.BaseExecutionContext; -import org.hibernate.sql.exec.internal.CallbackImpl; import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl; import org.hibernate.sql.exec.spi.Callback; import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect; @@ -335,7 +335,7 @@ public NaturalIdLoaderWithOptionsExecutionContext( QueryOptions queryOptions) { super( session ); this.queryOptions = queryOptions; - callback = new CallbackImpl(); + callback = new ReactiveCallbackImpl(); } @Override diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleIdLoadPlan.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleIdLoadPlan.java index 458cdf518..79994adc8 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleIdLoadPlan.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleIdLoadPlan.java @@ -20,17 +20,19 @@ import org.hibernate.query.internal.SimpleQueryOptions; import org.hibernate.query.spi.QueryOptions; import org.hibernate.query.spi.QueryParameterBindings; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor; import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer; import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor; import org.hibernate.sql.ast.tree.select.SelectStatement; -import org.hibernate.sql.exec.internal.CallbackImpl; import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl; import org.hibernate.sql.exec.spi.Callback; import org.hibernate.sql.exec.spi.ExecutionContext; import org.hibernate.sql.exec.spi.JdbcParameterBindings; import org.hibernate.sql.exec.spi.JdbcParametersList; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; + public class ReactiveSingleIdLoadPlan extends SingleIdLoadPlan> { public ReactiveSingleIdLoadPlan( @@ -61,7 +63,7 @@ public CompletionStage load(Object restrictedValue, Object entityInstance, Bo } assert offset == getJdbcParameters().size(); final QueryOptions queryOptions = new SimpleQueryOptions( getLockOptions(), readOnly ); - final Callback callback = new CallbackImpl(); + final ReactiveCallbackImpl callback = new ReactiveCallbackImpl(); EntityMappingType loadable = (EntityMappingType) getLoadable(); ExecutionContext executionContext = executionContext( restrictedValue, @@ -74,17 +76,19 @@ public CompletionStage load(Object restrictedValue, Object entityInstance, Bo // FIXME: Should we get this from jdbcServices.getSelectExecutor()? return StandardReactiveSelectExecutor.INSTANCE .list( getJdbcSelect(), jdbcParameterBindings, executionContext, getRowTransformer(), resultConsumer( singleResultExpected ) ) - .thenApply( this::extractEntity ) - .thenApply( entity -> { - invokeAfterLoadActions( callback, session, entity ); - return (T) entity; - } ); + .thenCompose( list -> { + Object entity = extractEntity( list ); + return invokeAfterLoadActions( callback, session, entity ) + .thenApply( v -> (T) entity ); + } + ); } - private void invokeAfterLoadActions(Callback callback, SharedSessionContractImplementor session, G entity) { - if ( entity != null && getLoadable() != null) { - callback.invokeAfterLoadActions( entity, (EntityMappingType) getLoadable(), session ); + private CompletionStage invokeAfterLoadActions(ReactiveCallbackImpl callback, SharedSessionContractImplementor session, G entity) { + if ( entity != null && getLoadable() != null ) { + return callback.invokeReactiveLoadActions( entity, (EntityMappingType) getLoadable(), session ); } + return voidFuture(); } private Object extractEntity(List list) { diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleUniqueKeyEntityLoaderStandard.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleUniqueKeyEntityLoaderStandard.java index 62e5d0d44..09490cdad 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleUniqueKeyEntityLoaderStandard.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleUniqueKeyEntityLoaderStandard.java @@ -24,13 +24,13 @@ import org.hibernate.metamodel.mapping.SingularAttributeMapping; import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping; import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.loader.ast.spi.ReactiveSingleUniqueKeyEntityLoader; import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor; import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer; import org.hibernate.sql.ast.SqlAstTranslatorFactory; import org.hibernate.sql.ast.tree.select.SelectStatement; import org.hibernate.sql.exec.internal.BaseExecutionContext; -import org.hibernate.sql.exec.internal.CallbackImpl; import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl; import org.hibernate.sql.exec.spi.Callback; import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect; @@ -179,7 +179,7 @@ public SingleUKEntityLoaderExecutionContext(SharedSessionContractImplementor ses super( session ); //Careful, readOnly is possibly null this.queryOptions = readOnly == null ? QueryOptions.NONE : readOnly ? QueryOptions.READ_ONLY : QueryOptions.READ_WRITE; - callback = new CallbackImpl(); + callback = new ReactiveCallbackImpl(); } @Override diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java index f01b5d315..b5e10d133 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java @@ -30,12 +30,14 @@ import org.hibernate.query.sqm.internal.SqmInterpretationsKey.InterpretationsKeySource; import org.hibernate.query.sqm.tree.SqmStatement; import org.hibernate.query.sqm.tree.select.SqmSelectStatement; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan; import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan; import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan; import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer; +import org.hibernate.sql.exec.spi.Callback; import org.hibernate.sql.results.internal.TupleMetadata; import jakarta.persistence.NoResultException; @@ -76,6 +78,8 @@ public class ReactiveAbstractSelectionQuery { private final Function, R> uniqueElement; private final InterpretationsKeySource interpretationsKeySource; + private Callback callback; + // I'm sure we can avoid some of this by making some methods public in ORM, // but this allows me to prototype faster. We can refactor the code later. public ReactiveAbstractSelectionQuery( @@ -363,4 +367,11 @@ public void enableFetchProfile(String profileName) { } fetchProfiles.add( profileName ); } + + public Callback getCallback() { + if ( callback == null ) { + callback = new ReactiveCallbackImpl(); + } + return callback; + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java index bd38d53d4..6d162882a 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java @@ -46,6 +46,7 @@ import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor; import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan; import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan; +import org.hibernate.sql.exec.spi.Callback; import org.hibernate.type.BasicTypeReference; import jakarta.persistence.AttributeConverter; @@ -192,6 +193,11 @@ public R getSingleResultOrNull() { return selectionQueryDelegate.getSingleResultOrNull(); } + @Override + public Callback getCallback() { + return selectionQueryDelegate.getCallback(); + } + @Override public CompletionStage getReactiveSingleResultOrNull() { return selectionQueryDelegate.getReactiveSingleResultOrNull(); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java index 4e9e8f001..096196576 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java @@ -61,6 +61,7 @@ import org.hibernate.reactive.query.sqm.mutation.spi.ReactiveSqmMultiTableMutationStrategy; import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan; import org.hibernate.reactive.session.ReactiveSqmQueryImplementor; +import org.hibernate.sql.exec.spi.Callback; import org.hibernate.transform.ResultTransformer; import jakarta.persistence.CacheRetrieveMode; @@ -174,6 +175,11 @@ public R getSingleResultOrNull() { return selectionQueryDelegate.getSingleResultOrNull(); } + @Override + public Callback getCallback() { + return selectionQueryDelegate.getCallback(); + } + @Override public R uniqueResult() { return selectionQueryDelegate.uniqueResult(); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java index d24ecb2db..bca5e2a65 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java @@ -39,6 +39,7 @@ import org.hibernate.query.sqm.tree.select.SqmSelectStatement; import org.hibernate.reactive.query.spi.ReactiveAbstractSelectionQuery; import org.hibernate.reactive.query.sqm.ReactiveSqmSelectionQuery; +import org.hibernate.sql.exec.spi.Callback; import jakarta.persistence.CacheRetrieveMode; import jakarta.persistence.CacheStoreMode; @@ -218,6 +219,11 @@ public CompletionStage getReactiveResultCount() { .getReactiveResultsCount( getSqmStatement().createCountQuery(), this ); } + @Override + public Callback getCallback() { + return selectionQueryDelegate.getCallback(); + } + @Override public List getResultList() { return selectionQueryDelegate.getResultList(); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java index 94b95318f..f53ed4fca 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java @@ -78,6 +78,8 @@ public interface ReactiveSession extends ReactiveQueryProducer, ReactiveSharedSe CompletionStage reactiveLock(Object entity, LockOptions lockMode); + CompletionStage reactiveLock(String entityName, Object entity, LockOptions lockMode); + CompletionStage reactiveGet(Class entityClass, Object id); CompletionStage reactiveFind(Class entityClass, Object id, LockOptions lockOptions, EntityGraph fetchGraph); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java index 5b29a038d..979046124 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java @@ -1074,6 +1074,12 @@ public CompletionStage reactiveLock(Object object, LockOptions lockOptions return fireLock( new LockEvent( object, lockOptions, this ) ); } + @Override + public CompletionStage reactiveLock(String entityName, Object object, LockOptions lockOptions) { + checkOpen(); + return fireLock( new LockEvent( entityName, object, lockOptions, this ) ); + } + private CompletionStage fireLock(LockEvent event) { pulseTransactionCoordinator(); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java index a6f7ed675..e4c5a77fb 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java @@ -13,18 +13,25 @@ import java.util.concurrent.CompletionStage; import org.hibernate.HibernateException; +import org.hibernate.LockOptions; +import org.hibernate.dialect.Dialect; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.engine.spi.SessionEventListenerManager; import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.query.spi.QueryOptions; import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor; +import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.util.impl.CompletionStages; import org.hibernate.resource.jdbc.spi.JdbcSessionContext; import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor; +import org.hibernate.sql.exec.spi.Callback; import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcLockStrategy; import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect; import org.hibernate.sql.exec.spi.JdbcParameterBindings; import org.hibernate.sql.exec.spi.JdbcSelectExecutor; @@ -60,6 +67,48 @@ public ReactiveDeferredResultSetAccess( this.sqlStatementLogger = executionContext.getSession().getJdbcServices().getSqlStatementLogger(); } + @Override + protected void registerAfterLoadAction(ExecutionContext executionContext, LockOptions lockOptionsToUse) { + Callback callback = executionContext.getCallback(); + final ReactiveCallbackImpl reactiveCallback; + if ( callback instanceof ReactiveCallbackImpl ) { + reactiveCallback = (ReactiveCallbackImpl) callback; + } + else { + assert !callback.hasAfterLoadActions(); + reactiveCallback = new ReactiveCallbackImpl(); + } + reactiveCallback.registerReactiveAfterLoadAction( + (entity, persister, session) -> + ((ReactiveSession)session).reactiveLock( + persister.getEntityName(), + entity, + lockOptionsToUse + ) + ); + } + + + + private static boolean useFollowOnLocking( + JdbcLockStrategy jdbcLockStrategy, + String sql, + QueryOptions queryOptions, + LockOptions lockOptions, + Dialect dialect) { + switch ( jdbcLockStrategy ) { + case FOLLOW_ON: + return true; + case AUTO: + return lockOptions.getFollowOnLocking() == null + ? dialect.useFollowOnLocking( sql, queryOptions ) + : lockOptions.getFollowOnLocking(); + default: + return false; + } + } + + @Override public ResultSet getResultSet() { if ( resultSet == null ) { @@ -227,7 +276,8 @@ private CompletionStage convertException(T object, Throwable throwable) { return failedFuture( cause ); } // SQL server throws an exception as soon as we run the query - if ( cause instanceof UnsupportedOperationException && cause.getMessage().contains( "Unable to decode typeInfo for XML" ) ) { + if ( cause instanceof UnsupportedOperationException && cause.getMessage().contains( + "Unable to decode typeInfo for XML" ) ) { return failedFuture( LOG.unsupportedXmlType() ); } return failedFuture( new HibernateException( cause ) ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveListResultsConsumer.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveListResultsConsumer.java index 9c9008eb0..6d1136e59 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveListResultsConsumer.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveListResultsConsumer.java @@ -13,6 +13,7 @@ import java.util.function.Supplier; import org.hibernate.HibernateException; +import org.hibernate.engine.internal.ReactivePersistenceContextAdapter; import org.hibernate.engine.spi.PersistenceContext; import org.hibernate.engine.spi.SharedSessionContractImplementor; import org.hibernate.query.ResultListTransformer; @@ -103,7 +104,7 @@ public CompletionStage> consume( } return falseFuture(); } ) ) - .thenApply( v -> finishUp( rowReader, rowProcessingState, jdbcValuesSourceProcessingState, results, readRows, queryOptions ) ) + .thenCompose( v -> finishUp( rowReader, rowProcessingState, jdbcValuesSourceProcessingState, results, readRows, queryOptions ) ) .handle( CompletionStages::handle ) .thenCompose( handler -> { end( jdbcValues, session, jdbcValuesSourceProcessingState, persistenceContext, handler.getThrowable() ); @@ -111,20 +112,37 @@ public CompletionStage> consume( } ); } - private List finishUp( + private CompletionStage> finishUp( ReactiveRowReader rowReader, ReactiveRowProcessingState rowProcessingState, JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState, Results results, int[] readRows, QueryOptions queryOptions) { rowReader.finishUp( rowProcessingState ); - jdbcValuesSourceProcessingState.finishUp( readRows[0] > 1 ); + return finishUp( readRows[0] > 1, rowProcessingState.getSession(), jdbcValuesSourceProcessingState ) + .thenApply( v -> { + final ResultListTransformer resultListTransformer = (ResultListTransformer) queryOptions.getResultListTransformer(); + return resultListTransformer != null + ? resultListTransformer.transformList( results.getResults() ) + : results.getResults(); + } ); + } - final ResultListTransformer resultListTransformer = (ResultListTransformer) queryOptions.getResultListTransformer(); - return resultListTransformer != null - ? resultListTransformer.transformList( results.getResults() ) - : results.getResults(); + private static CompletionStage finishUp( + boolean registerSubselects, + SharedSessionContractImplementor session, + JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState) { + jdbcValuesSourceProcessingState.finishLoadingCollections(); + + return ( (ReactivePersistenceContextAdapter) session.getPersistenceContextInternal() ) + .reactivePostLoad( + jdbcValuesSourceProcessingState, + registerSubselects ? + jdbcValuesSourceProcessingState.getExecutionContext()::registerLoadingEntityHolder : + null + ); } + /** * The boolean in the CompletionStage is true if the element has been added to the results */ diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveSingleResultConsumer.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveSingleResultConsumer.java index 814db1e4b..87dac8e55 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveSingleResultConsumer.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/spi/ReactiveSingleResultConsumer.java @@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage; import org.hibernate.Incubating; +import org.hibernate.engine.internal.ReactivePersistenceContextAdapter; import org.hibernate.engine.spi.SharedSessionContractImplementor; import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState; import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet; @@ -29,11 +30,15 @@ public CompletionStage consume( return rowProcessingState.next() .thenCompose( hasNext -> rowReader .reactiveReadRow( rowProcessingState, processingOptions ) - .thenApply( result -> { + .thenCompose( result -> { rowProcessingState.finishRowProcessing( true ); rowReader.finishUp( rowProcessingState ); - jdbcValuesSourceProcessingState.finishUp( false ); - return result; + session.getPersistenceContext(); + jdbcValuesSourceProcessingState.finishLoadingCollections(); + return ( (ReactivePersistenceContextAdapter) session.getPersistenceContextInternal() ) + .reactivePostLoad( + jdbcValuesSourceProcessingState, + null).thenApply( unused -> result ); } ) ); }