Skip to content

Commit

Permalink
save for temp use
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam committed Jan 27, 2025
1 parent a11401b commit a633593
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
import org.apache.iotdb.session.util.CheckedSupplier;
import org.apache.iotdb.session.util.SessionUtils;

import org.apache.iotdb.session.util.retry.Handler;
import org.apache.iotdb.session.util.retry.RetriableTask;
import org.apache.iotdb.session.util.retry.Retry;
import org.apache.iotdb.session.util.retry.RetryDecision;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
Expand Down Expand Up @@ -1479,6 +1483,40 @@ private <RETURN> RETURN doOperation(CheckedSupplier<RETURN, TException> supplier
return ret;
}

private <RETURN> RETURN doOperationWithUnifiedRetry(CheckedSupplier<RETURN, TException> supplier) throws IoTDBConnectionException, StatementExecutionException, Exception{
RetriableTask<RETURN> task = supplier::get;
Handler<RETURN> returnHandler = new Handler<RETURN>() {
Exception lastException = null;
@Override
public RetryDecision onExecutionResult(RETURN result) {
return Retry.NO_RETRY;
}

@Override
public RetryDecision onException(Exception exception) {
lastException = exception;
boolean tryReconnect = reconnect();
if (!tryReconnect) {
lastException = new IoTDBConnectionException(logForReconnectionFailure());
} else {
lastException = new IoTDBConnectionException(exception);
}
return Retry.NO_RETRY;
}

@Override
public RETURN onFinalResult(RETURN lastResult, Exception lastException) throws Exception {
if (lastResult != null) {
return lastResult;
} else {
throw lastException;
}
}
};

return Retry.execute(task, returnHandler);
}

public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
try {
return client.fetchAllConnectionsInfo();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.iotdb.session.util.retry;

public interface Handler<T> {
RetryDecision onExecutionResult(T result);
RetryDecision onException(Exception exception);
T onFinalResult(T lastResult, Exception lastException) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.iotdb.session.util.retry;

/**
* This interface defines a retriable task
* @param <T> return type of this task
*/
@FunctionalInterface
public interface RetriableTask<T> {
T execute() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.iotdb.session.util.retry;

import org.apache.tsfile.utils.TimeDuration;

public class Retry {
final static public RetryDecision NO_RETRY = new RetryDecision() {
@Override
public boolean shouldRetry() {
return false;
}

@Override
public long getSleepTimeIfRetry() {
return 0;
}
};
public static <T> T execute(RetriableTask<T> task, Handler<T> handler) throws Exception {
T result = null;
RetryDecision decision;
Exception lastException;

while (true){
try {
result = task.execute();
decision = handler.onExecutionResult(result);
lastException = null;
} catch (Exception e) {
decision = handler.onException(e);
lastException = e;
}

if (!decision.shouldRetry()) {
return handler.onFinalResult(result, lastException);
}
try {
Thread.sleep(decision.getSleepTimeIfRetry());
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
// TODO (william) what shall we do here?
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.iotdb.session.util.retry;

public interface RetryDecision {
boolean shouldRetry();

long getSleepTimeIfRetry();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.iotdb.session.util.retry;

import java.util.Optional;

public class TaskResult <T, E extends Throwable> {
private final T result;
private final E exception;

private TaskResult(T result, E exception) {
this.result = result;
this.exception = exception;
}
static <T> TaskResult<T, Throwable> from(T result) {
return new TaskResult<>(result, null);
}

static <E extends Throwable> TaskResult<Object, E> fromException(E exception) {
return new TaskResult<>(null, exception);
}

public boolean isPresent() {
return Optional.ofNullable(result).isPresent();
}

public Optional<T> getResult() {
return Optional.ofNullable(result);
}

public Optional<E> getException() {
return Optional.ofNullable(exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.iotdb.commons.utils.retry;

import java.util.Optional;

public interface Handler<T> {
RetryDecision onExecutionResult(T result);
RetryDecision onException(Exception exception);
T onFinalResult(T lastResult, Exception lastException) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.iotdb.commons.utils.retry;

/**
* This interface defines a retriable task
* @param <T> return type of this task
*/
@FunctionalInterface
public interface RetriableTask<T> {
T execute() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.iotdb.commons.utils.retry;

import java.util.Optional;

public class Retry {
public static <T> T execute(RetriableTask<T> task, Handler<T> handler) throws Exception {
T result = null;
RetryDecision decision;
Exception lastException;

while (true){
try {
result = task.execute();
decision = handler.onExecutionResult(result);
lastException = null;
} catch (Exception e) {
decision = handler.onException(e);
lastException = e;
}

if (!decision.shouldRetry()) {
return handler.onFinalResult(result, lastException);
}
try {
Thread.sleep(decision.getSleepTimeIfRetry());
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
// TODO (william) what shall we do here?
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.iotdb.commons.utils.retry;

public interface RetryDecision {
boolean shouldRetry();

long getSleepTimeIfRetry();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.iotdb.commons.utils.retry;

import java.util.Optional;

public class TaskResult <T, E extends Throwable> {
private final T result;
private final E exception;

private TaskResult(T result, E exception) {
this.result = result;
this.exception = exception;
}
static <T> TaskResult<T, Throwable> from(T result) {
return new TaskResult<>(result, null);
}

static <E extends Throwable> TaskResult<Object, E> fromException(E exception) {
return new TaskResult<>(null, exception);
}

public boolean isPresent() {
return Optional.ofNullable(result).isPresent();
}

public Optional<T> getResult() {
return Optional.ofNullable(result);
}

public Optional<E> getException() {
return Optional.ofNullable(exception);
}
}

0 comments on commit a633593

Please sign in to comment.