Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.linecorp.armeria.client.proxy.ProxyConfigSelector;
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.GracefulShutdown;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.SessionProtocol;
Expand Down Expand Up @@ -165,6 +166,33 @@ public ClientFactoryBuilder workerGroup(int numThreads) {
return workerGroup(EventLoopGroups.newEventLoopGroup(numThreads), true);
}

/**
* Sets the worker {@link EventLoopGroup} which is responsible for performing socket I/O and running
* {@link Client#execute(ClientRequestContext, Request)}.
* If not set, {@linkplain CommonPools#workerGroup() the common worker group} is used.
*
* @param shutdownOnClose whether to shut down the worker {@link EventLoopGroup}
* when the {@link ClientFactory} is closed
*/
public ClientFactoryBuilder workerGroup(
EventLoopGroup workerGroup, boolean shutdownOnClose, GracefulShutdown gracefulShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional) What do you think of validating if shutdownOnClose == true since GracefulShutdown won't have meaning unless the workerGroup is actually closed?

e.g.

        if (!shutdownOnClose) {
            checkArgument(gracefulShutdown == GracefulShutdown.disabled(),
                          "GracefulShutdown is not supported when shutdownOnClose is false");
        }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, this makes sense.

option(ClientFactoryOptions.WORKER_GROUP, requireNonNull(workerGroup, "workerGroup"));
option(ClientFactoryOptions.SHUTDOWN_WORKER_GROUP_ON_CLOSE, shutdownOnClose);
option(ClientFactoryOptions.WORKER_GROUP_GRACEFUL_SHUTDOWN, gracefulShutdown);
return this;
}

/**
* Uses a newly created {@link EventLoopGroup} with the specified number of threads for
* performing socket I/O and running {@link Client#execute(ClientRequestContext, Request)}.
* The worker {@link EventLoopGroup} will be shut down when the {@link ClientFactory} is closed.
*
* @param numThreads the number of event loop threads
*/
public ClientFactoryBuilder workerGroup(int numThreads, GracefulShutdown gracefulShutdown) {
return workerGroup(EventLoopGroups.newEventLoopGroup(numThreads), true, gracefulShutdown);
}

/**
* Sets the factory that creates an {@link EventLoopScheduler} which is responsible for assigning an
* {@link EventLoop} to handle a connection to the specified {@link Endpoint}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.client.proxy.ProxyConfigSelector;
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.GracefulShutdown;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.TlsKeyPair;
Expand Down Expand Up @@ -71,6 +72,13 @@ public final class ClientFactoryOptions
public static final ClientFactoryOption<Boolean> SHUTDOWN_WORKER_GROUP_ON_CLOSE =
ClientFactoryOption.define("SHUTDOWN_WORKER_GROUP_ON_CLOSE", false);

/**
* Graceful shutdown settings for the worker {@link EventLoopGroup} when the {@link ClientFactory} is
* closed.
*/
public static final ClientFactoryOption<GracefulShutdown> WORKER_GROUP_GRACEFUL_SHUTDOWN =
ClientFactoryOption.define("WORKER_GROUP_GRACEFUL_SHUTDOWN", Flags.workerGroupGracefulShutdown());

/**
* The factory that creates an {@link EventLoopScheduler} which is responsible for assigning an
* {@link EventLoop} to handle a connection to the specified {@link Endpoint}.
Expand Down Expand Up @@ -744,4 +752,14 @@ public ClientTlsConfig tlsConfig() {
public Consumer<? super ChannelPipeline> channelPipelineCustomizer() {
return get(CHANNEL_PIPELINE_CUSTOMIZER);
}

/**
* Returns the settings for graceful shutdown of the worker {@link EventLoopGroup} when the
* {@link ClientFactory} is closed.
*/
@UnstableApi
public GracefulShutdown workerGroupGracefulShutdown() {
return get(WORKER_GROUP_GRACEFUL_SHUTDOWN);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -40,6 +41,7 @@
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.proxy.ProxyConfigSelector;
import com.linecorp.armeria.client.redirect.RedirectConfig;
import com.linecorp.armeria.common.GracefulShutdown;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.RequestContext;
Expand Down Expand Up @@ -462,7 +464,12 @@ private void closeAsync(CompletableFuture<?> future) {
}
bootstrapSslContexts.release(sslContextFactory);
if (shutdownWorkerGroupOnClose) {
workerGroup.shutdownGracefully().addListener((FutureListener<Object>) f -> {
final GracefulShutdown gracefulShutdown = options.workerGroupGracefulShutdown();
workerGroup.shutdownGracefully(
gracefulShutdown.quietPeriod().toMillis(),
gracefulShutdown.timeout().toMillis(),
TimeUnit.MILLISECONDS
).addListener((FutureListener<Object>) f -> {
if (f.cause() != null) {
logger.warn("Failed to shut down a worker group:", f.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,21 @@
* under the License.
*/

package com.linecorp.armeria.server;
package com.linecorp.armeria.common;

import java.time.Duration;
import java.util.Objects;
import java.util.function.BiFunction;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.HttpRequest;

final class DefaultGracefulShutdown implements GracefulShutdown {

private final Duration quietPeriod;
private final Duration timeout;
private final BiFunction<ServiceRequestContext, HttpRequest, Throwable> toExceptionFunction;

DefaultGracefulShutdown(Duration quietPeriod, Duration timeout,
BiFunction<ServiceRequestContext, HttpRequest, Throwable> toExceptionFunction) {
DefaultGracefulShutdown(Duration quietPeriod, Duration timeout) {
this.quietPeriod = quietPeriod;
this.timeout = timeout;
this.toExceptionFunction = toExceptionFunction;
}

@Override
Expand All @@ -47,11 +41,6 @@ public Duration timeout() {
return timeout;
}

@Override
public Throwable toException(ServiceRequestContext ctx, HttpRequest request) {
return toExceptionFunction.apply(ctx, request);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -62,21 +51,19 @@ public boolean equals(Object o) {
}
final DefaultGracefulShutdown that = (DefaultGracefulShutdown) o;
return quietPeriod.equals(that.quietPeriod) &&
timeout.equals(that.timeout) &&
toExceptionFunction.equals(that.toExceptionFunction);
timeout.equals(that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(quietPeriod, timeout, toExceptionFunction);
return Objects.hash(quietPeriod, timeout);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("quietPeriod", quietPeriod)
.add("timeout", timeout)
.add("toExceptionFunction", toExceptionFunction)
.toString();
}
}
14 changes: 14 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ private static boolean validateTransportType(TransportType transportType, String
private static final boolean ANNOTATED_SERVICE_CONTENT_LOGGING =
getValue(FlagsProvider::annotatedServiceContentLogging, "annotatedServiceContentLogging");

private static final GracefulShutdown WORKER_GROUP_GRACEFUL_SHUTDOWN =
getValue(FlagsProvider::workerGroupGracefulShutdown, "workerGroupGracefulShutdown");

/**
* Returns the specification of the {@link Sampler} that determines whether to retain the stack
* trace of the exceptions that are thrown frequently by Armeria. A sampled exception will have the stack
Expand Down Expand Up @@ -1730,6 +1733,17 @@ public static boolean annotatedServiceContentLogging() {
return ANNOTATED_SERVICE_CONTENT_LOGGING;
}

/**
* Provides a configuration for a graceful shutdown of a worker group.
* The method returns an instance of {@link GracefulShutdown}, which specifies
* the shutdown strategy for the worker group. By default, this method
* disables the graceful shutdown process.
*/
@UnstableApi
public static GracefulShutdown workerGroupGracefulShutdown() {
return WORKER_GROUP_GRACEFUL_SHUTDOWN;
}

@Nullable
private static String nullableCaffeineSpec(Function<FlagsProvider, String> method, String flagName) {
return caffeineSpec(method, flagName, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,4 +1292,15 @@ default ResponseTimeoutMode responseTimeoutMode() {
default Boolean annotatedServiceContentLogging() {
return null;
}

/**
* Provides a configuration for a graceful shutdown of a worker group.
* The method returns an instance of {@link GracefulShutdown}, which specifies
* the shutdown strategy for the worker group. By default, this method
* disables the graceful shutdown process.
*/
@UnstableApi
default GracefulShutdown workerGroupGracefulShutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers: I think it's fine to skip the SystemPropertyFlagsProvider implementation since we will need to define a schema for parsing GracefulShutdown which I think is out of scope of this PR.

return GracefulShutdown.disabled();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow users to override FlagsProvider#workerGroupGracefulShutdown using FlagsProvider#priority, what do you think of returning null here and returning the default GracefulShutdown at DefaultFlagsProvider?

Suggested change
return GracefulShutdown.disabled();
return null;

Also, to keep backwards compatibility, what do you think of returning netty's default as the default value?

https://github.com/netty/netty/blob/11938b53522a578b7e113e4fe85baa560e7aa841/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java#L39-L40

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will update it like this, but I wonder if this check would be still valid then:

if (!shutdownOnClose) {
            checkArgument(gracefulShutdown == GracefulShutdown.disabled(),
                          "GracefulShutdown is not supported when shutdownOnClose is false");
        }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
* under the License.
*/

package com.linecorp.armeria.server;
package com.linecorp.armeria.common;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this class is annotated with @UnstableApi, it cannot be moved to a different package. This is to ensure we maintain backward compatibility for as many users as possible.

How about adding a new one and deprecating the old one?
We also need to convert the old GracefulShutdown to the new one in the ServerBuilder:

@UnstableApi
public ServerBuilder gracefulShutdown(com.linecorp.armeria.server.GracefulShutdown gracefulShutdown) {
    // convert to common.GracefulShutdown and GracefulShutdownExceptionFactory
    return this;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original proposal was to move the class and incur a breaking change:

  • I wasn't able to find many (if any) cases where the server.GracefulShutdown construct was actually used
  • My thought was there wouldn't be many servers per application/repository - since server.GracefulShutdown is usually configured once per server I didn't think the breaking change would affect multiple points within each codebase.

Since this is a balance of maintaining two sets of classes vs. incurring breaking changes, I guess this is somewhat subjective and could go either way.

I'm fine with either approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a team discussion, we have decided to introduce a breaking change for the following reasons:

  • The feature was recently introduced and, based on a GitHub search, has limited adoption.
  • Maintaining two separate GracefulShutdown implementations is confusing for users and adds unnecessary complexity.

@novoj Sorry for confusing you. 😉

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay. I didn't react because I anticipated there might be a discussion about it. Both approaches have their benefits and disadvantages.


import java.time.Duration;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ShuttingDownException;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.server.Server;

/**
* Configures the graceful shutdown behavior of a {@link Server}.
Expand Down Expand Up @@ -57,12 +54,4 @@ static GracefulShutdown disabled() {
*/
Duration timeout();

/**
* Returns an {@link Throwable} to terminate a pending request when the server is shutting down.
* The exception will be converted to an {@link HttpResponse} by {@link ServerErrorHandler}.
*
* <p>If null is returned, the request will be terminated with {@link ShuttingDownException} that will be
* converted to an {@link HttpStatus#SERVICE_UNAVAILABLE} response.
*/
Throwable toException(ServiceRequestContext ctx, HttpRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@
* under the License.
*/

package com.linecorp.armeria.server;
package com.linecorp.armeria.common;

import static com.linecorp.armeria.server.DefaultServerConfig.validateNonNegative;
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.function.BiFunction;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ShuttingDownException;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
Expand All @@ -36,20 +31,25 @@ public final class GracefulShutdownBuilder {
// Defaults to no graceful shutdown.
private static final Duration DEFAULT_GRACEFUL_SHUTDOWN_QUIET_PERIOD = Duration.ZERO;
private static final Duration DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ZERO;
private static final BiFunction<ServiceRequestContext, HttpRequest, Throwable> DEFAULT_ERROR_FUNCTION =
(ctx, req) -> ShuttingDownException.get();

static final GracefulShutdown DISABLED = GracefulShutdown.builder().build();

private Duration quietPeriod = DEFAULT_GRACEFUL_SHUTDOWN_QUIET_PERIOD;
private Duration timeout = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT;
private BiFunction<ServiceRequestContext, HttpRequest, Throwable> toException = DEFAULT_ERROR_FUNCTION;

GracefulShutdownBuilder() {}

static Duration validateNonNegative(Duration duration, String fieldName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; can be private

Suggested change
static Duration validateNonNegative(Duration duration, String fieldName) {
private static Duration validateNonNegative(Duration duration, String fieldName) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

if (duration.isNegative()) {
throw new IllegalArgumentException(fieldName + ": " + duration + " (expected: >= 0)");
}
return duration;
}

/**
* Sets the quiet period to wait for active requests to go end before shutting down.
* {@link Duration#ZERO} means the server will stop right away without waiting.
* {@link Duration#ZERO} means the client or server (depending on the place of usage) will stop right away
* without waiting.
*
* <p>The default is {@link Duration#ZERO}.
*/
Expand All @@ -61,7 +61,7 @@ public GracefulShutdownBuilder quietPeriod(Duration quietPeriod) {

/**
* Sets the quiet period millis to wait for active requests to go end before shutting down.
* 0 means the server will stop right away without waiting.
* 0 means the client or server (depending on the place of usage) will stop right away without waiting.
*
* <p>The default is 0.
*/
Expand All @@ -70,9 +70,9 @@ public GracefulShutdownBuilder quietPeriodMillis(long quietPeriodMillis) {
}

/**
* Sets the amount of time to wait before shutting down the server regardless of active requests.
* This should be set to a time greater than {@code quietPeriod} to ensure the server shuts down even
* if there is a stuck request.
* Sets the amount of time to wait before shutting down the client or server (depending on the place of
* usage) regardless of active requests. This should be set to a time greater than {@code quietPeriod} to
* ensure the client/server shuts down even if there is a stuck request.
*
* <p>The default is {@link Duration#ZERO}.
*/
Expand All @@ -83,29 +83,16 @@ public GracefulShutdownBuilder timeout(Duration timeout) {
}

/**
* Sets the amount of time to wait before shutting down the server regardless of active requests.
* This should be set to a time greater than {@code quietPeriod} to ensure the server shuts down even
* if there is a stuck request.
* Sets the amount of time to wait before shutting down the client or server (depending on the place of
* usage) regardless of active requests. This should be set to a time greater than {@code quietPeriod} to
* ensure the client/server shuts down even if there is a stuck request.
*
* <p>The default is {@link Duration#ZERO}.
*/
public GracefulShutdownBuilder timeoutMillis(long timeoutMillis) {
return timeout(Duration.ofMillis(timeoutMillis));
}

/**
* Sets the function that returns an {@link Throwable} to terminate a pending request when the server is
* shutting down. If unspecified, the request will be terminated with {@link ShuttingDownException} that
* will be converted to an {@link HttpStatus#SERVICE_UNAVAILABLE} response.
*/
public GracefulShutdownBuilder toExceptionFunction(
BiFunction<? super ServiceRequestContext, ? super HttpRequest, ? extends Throwable> toException) {
requireNonNull(toException, "toException");
//noinspection unchecked
this.toException = (BiFunction<ServiceRequestContext, HttpRequest, Throwable>) toException;
return this;
}

private static void validateGreaterThanOrEqual(Duration larger, String largerFieldName,
Duration smaller, String smallerFieldName) {
if (larger.compareTo(smaller) < 0) {
Expand All @@ -119,6 +106,6 @@ private static void validateGreaterThanOrEqual(Duration larger, String largerFie
*/
public GracefulShutdown build() {
validateGreaterThanOrEqual(timeout, "timeout", quietPeriod, "quietPeriod");
return new DefaultGracefulShutdown(quietPeriod, timeout, toException);
return new DefaultGracefulShutdown(quietPeriod, timeout);
}
}
Loading
Loading