diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index c0dc516ba114..d6e4d8a72b2b 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -66,6 +66,10 @@ import org.apache.iotdb.session.util.CheckedSupplier; import org.apache.iotdb.session.util.SessionUtils; +import org.apache.iotdb.session.util.retry.Handler; +import org.apache.iotdb.session.util.retry.RetriableTask; +import org.apache.iotdb.session.util.retry.Retry; +import org.apache.iotdb.session.util.retry.RetryDecision; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; @@ -1479,6 +1483,40 @@ private RETURN doOperation(CheckedSupplier supplier return ret; } + private RETURN doOperationWithUnifiedRetry(CheckedSupplier supplier) throws IoTDBConnectionException, StatementExecutionException, Exception{ + RetriableTask task = supplier::get; + Handler returnHandler = new Handler() { + Exception lastException = null; + @Override + public RetryDecision onExecutionResult(RETURN result) { + return Retry.NO_RETRY; + } + + @Override + public RetryDecision onException(Exception exception) { + lastException = exception; + boolean tryReconnect = reconnect(); + if (!tryReconnect) { + lastException = new IoTDBConnectionException(logForReconnectionFailure()); + } else { + lastException = new IoTDBConnectionException(exception); + } + return Retry.NO_RETRY; + } + + @Override + public RETURN onFinalResult(RETURN lastResult, Exception lastException) throws Exception { + if (lastResult != null) { + return lastResult; + } else { + throw lastException; + } + } + }; + + return Retry.execute(task, returnHandler); + } + public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException { try { return client.fetchAllConnectionsInfo(); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Handler.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Handler.java new file mode 100644 index 000000000000..3aca32f8b4a0 --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Handler.java @@ -0,0 +1,7 @@ +package org.apache.iotdb.session.util.retry; + +public interface Handler { + RetryDecision onExecutionResult(T result); + RetryDecision onException(Exception exception); + T onFinalResult(T lastResult, Exception lastException) throws Exception; +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetriableTask.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetriableTask.java new file mode 100644 index 000000000000..804a3bb4ec71 --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetriableTask.java @@ -0,0 +1,10 @@ +package org.apache.iotdb.session.util.retry; + +/** +* This interface defines a retriable task +* @param return type of this task +*/ +@FunctionalInterface +public interface RetriableTask { + T execute() throws Exception; +} \ No newline at end of file diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Retry.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Retry.java new file mode 100644 index 000000000000..92d709ee653f --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/Retry.java @@ -0,0 +1,43 @@ +package org.apache.iotdb.session.util.retry; + +import org.apache.tsfile.utils.TimeDuration; + +public class Retry { + final static public RetryDecision NO_RETRY = new RetryDecision() { + @Override + public boolean shouldRetry() { + return false; + } + + @Override + public long getSleepTimeIfRetry() { + return 0; + } + }; + public static T execute(RetriableTask task, Handler handler) throws Exception { + T result = null; + RetryDecision decision; + Exception lastException; + + while (true){ + try { + result = task.execute(); + decision = handler.onExecutionResult(result); + lastException = null; + } catch (Exception e) { + decision = handler.onException(e); + lastException = e; + } + + if (!decision.shouldRetry()) { + return handler.onFinalResult(result, lastException); + } + try { + Thread.sleep(decision.getSleepTimeIfRetry()); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + // TODO (william) what shall we do here? + } + } + } +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetryDecision.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetryDecision.java new file mode 100644 index 000000000000..50ebd2e6b56b --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/RetryDecision.java @@ -0,0 +1,9 @@ +package org.apache.iotdb.session.util.retry; + +public interface RetryDecision { + boolean shouldRetry(); + + long getSleepTimeIfRetry(); + + +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/TaskResult.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/TaskResult.java new file mode 100644 index 000000000000..a033a8881674 --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/retry/TaskResult.java @@ -0,0 +1,32 @@ +package org.apache.iotdb.session.util.retry; + +import java.util.Optional; + +public class TaskResult { + private final T result; + private final E exception; + + private TaskResult(T result, E exception) { + this.result = result; + this.exception = exception; + } + static TaskResult from(T result) { + return new TaskResult<>(result, null); + } + + static TaskResult fromException(E exception) { + return new TaskResult<>(null, exception); + } + + public boolean isPresent() { + return Optional.ofNullable(result).isPresent(); + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getException() { + return Optional.ofNullable(exception); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Handler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Handler.java new file mode 100644 index 000000000000..6e0798a1a211 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Handler.java @@ -0,0 +1,9 @@ +package org.apache.iotdb.commons.utils.retry; + +import java.util.Optional; + +public interface Handler { + RetryDecision onExecutionResult(T result); + RetryDecision onException(Exception exception); + T onFinalResult(T lastResult, Exception lastException) throws Exception; +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetriableTask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetriableTask.java new file mode 100644 index 000000000000..81702edc529e --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetriableTask.java @@ -0,0 +1,10 @@ +package org.apache.iotdb.commons.utils.retry; + +/** +* This interface defines a retriable task +* @param return type of this task +*/ +@FunctionalInterface +public interface RetriableTask { + T execute() throws Exception; +} \ No newline at end of file diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Retry.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Retry.java new file mode 100644 index 000000000000..c58c3c2b45cb --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/Retry.java @@ -0,0 +1,32 @@ +package org.apache.iotdb.commons.utils.retry; + +import java.util.Optional; + +public class Retry { + public static T execute(RetriableTask task, Handler handler) throws Exception { + T result = null; + RetryDecision decision; + Exception lastException; + + while (true){ + try { + result = task.execute(); + decision = handler.onExecutionResult(result); + lastException = null; + } catch (Exception e) { + decision = handler.onException(e); + lastException = e; + } + + if (!decision.shouldRetry()) { + return handler.onFinalResult(result, lastException); + } + try { + Thread.sleep(decision.getSleepTimeIfRetry()); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + // TODO (william) what shall we do here? + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetryDecision.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetryDecision.java new file mode 100644 index 000000000000..57797f5c4c9c --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/RetryDecision.java @@ -0,0 +1,7 @@ +package org.apache.iotdb.commons.utils.retry; + +public interface RetryDecision { + boolean shouldRetry(); + + long getSleepTimeIfRetry(); +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/TaskResult.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/TaskResult.java new file mode 100644 index 000000000000..818d7b56f18f --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/retry/TaskResult.java @@ -0,0 +1,32 @@ +package org.apache.iotdb.commons.utils.retry; + +import java.util.Optional; + +public class TaskResult { + private final T result; + private final E exception; + + private TaskResult(T result, E exception) { + this.result = result; + this.exception = exception; + } + static TaskResult from(T result) { + return new TaskResult<>(result, null); + } + + static TaskResult fromException(E exception) { + return new TaskResult<>(null, exception); + } + + public boolean isPresent() { + return Optional.ofNullable(result).isPresent(); + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getException() { + return Optional.ofNullable(exception); + } +}