Skip to content

Commit

Permalink
apacheGH-2821: Cancel Signal unavailable without timeouts - breaks ma…
Browse files Browse the repository at this point in the history
…nual abort()
  • Loading branch information
Aklakan committed Nov 11, 2024
1 parent d11c2a9 commit 375244b
Show file tree
Hide file tree
Showing 32 changed files with 821 additions and 192 deletions.
14 changes: 14 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,21 @@ public static void handleResponseNoBody(HttpResponse<InputStream> response) {
* @return String
*/
public static String handleResponseRtnString(HttpResponse<InputStream> response) {
return handleResponseRtnString(response, null);
}

/**
* Handle the HTTP response and read the body to produce a string if a 200.
* Otherwise, throw an {@link HttpException}.
* @param response
* @param callback A callback that receives the opened input stream.
* @return String
*/
public static String handleResponseRtnString(HttpResponse<InputStream> response, Consumer<InputStream> callback) {
InputStream input = handleResponseInputStream(response);
if (callback != null) {
callback.accept(input);
}
try {
return IO.readWholeFileAsUTF8(input);
} catch (RuntimeIOException e) { throw new HttpException(e); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,31 +280,20 @@ public Y httpHeaders(Map<String, String> headers) {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
ensureContext();
contextAcc.context(context);
//this.context.putAll(context);
return thisBuilder();
}

public Y set(Symbol symbol, Object value) {
ensureContext();
contextAcc.set(symbol, value);
//context.set(symbol, value);
return thisBuilder();
}

public Y set(Symbol symbol, boolean value) {
ensureContext();
contextAcc.set(symbol, value);
//context.set(symbol, value);
return thisBuilder();
}

private void ensureContext() {
// if ( context == null )
// context = new Context();
}

/**
* Set a timeout of the overall operation.
* Time-to-connect can be set with a custom {@link HttpClient} - see {@link java.net.http.HttpClient.Builder#connectTimeout(java.time.Duration)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.net.http.HttpClient;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.jena.graph.Node;
Expand All @@ -31,6 +32,7 @@
import org.apache.jena.sparql.exec.http.UpdateSendMode;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.sys.JenaSystem;
import org.apache.jena.update.Update;
Expand All @@ -56,8 +58,9 @@ public String toString() {

/** Accumulator for update elements. Can build an overall string or UpdateRequest from the elements. */
private class UpdateEltAcc implements Iterable<UpdateElt> {
/** Delimiter for joining multiple SPARQL update strings into a single one. */
public static final String DELIMITER = ";\n";
/** Delimiter for joining multiple SPARQL update strings into a single one.
* The delimiter takes into account that the last line of a statement may be a single-line-comment. */
public static final String DELIMITER = "\n;\n";

private List<UpdateElt> updateOperations = new ArrayList<>();
private List<UpdateElt> updateOperationsView = Collections.unmodifiableList(updateOperations);
Expand All @@ -76,6 +79,7 @@ public void add(Update update) {
add(new UpdateElt(update));
}

/** Add a string by parsing it. */
public void add(String updateRequestString) {
UpdateRequest updateRequest = UpdateFactory.create(updateRequestString);
add(updateRequest);
Expand Down Expand Up @@ -146,9 +150,11 @@ public String buildString() {
protected UpdateSendMode sendMode = UpdateSendMode.systemDefault;
protected List<String> usingGraphURIs = null;
protected List<String> usingNamedGraphURIs = null;
protected Context context = null;
private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext());
// Uses query rewrite to replace variables by values.
protected Map<Var, Node> substitutionMap = new HashMap<>();
protected long timeout = -1;
protected TimeUnit timeoutUnit = null;

protected ExecUpdateHTTPBuilder() {}

Expand Down Expand Up @@ -213,6 +219,12 @@ public Y substitution(Var var, Node value) {
return thisBuilder();
}

public Y timeout(long timeout, TimeUnit timeoutUnit) {
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
return thisBuilder();
}

public Y httpClient(HttpClient httpClient) {
this.httpClient = Objects.requireNonNull(httpClient);
return thisBuilder();
Expand Down Expand Up @@ -274,28 +286,20 @@ public Y httpHeaders(Map<String, String> headers) {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
ensureContext();
this.context.setAll(context);
this.contextAcc.context(context);
return thisBuilder();
}

public Y set(Symbol symbol, Object value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

public Y set(Symbol symbol, boolean value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

private void ensureContext() {
if ( context == null )
context = Context.create();
}

public X build() {
Objects.requireNonNull(serviceURL, "No service URL");
if ( updateEltAcc.isEmpty() )
Expand All @@ -322,7 +326,7 @@ public X build() {
// If the UpdateRequest object wasn't built until now then build the string instead.
String updateStringActual = updateActual == null ? updateEltAcc.buildString() : null;

Context cxt = (context!=null) ? context : ARQ.getContext().copy();
Context cxt = contextAcc.context();
return buildX(hClient, updateActual, updateStringActual, cxt);
}

Expand Down
11 changes: 11 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ARQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ public static void enableBlankNodeResultLabels(boolean val) {
*/
public static final Symbol queryTimeout = SystemARQ.allocSymbol("queryTimeout");

/**
* Set timeout. The value of this symbol gives the value of the timeout in milliseconds
* <ul>
* <li>A Number; the long value is used</li>
* <li>A string, e.g. "1000", parsed as a number</li>
* <li>A string, as two numbers separated by a comma, e.g. "500,10000" parsed as two numbers</li>
* </ul>
* @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long, TimeUnit)
*/
public static final Symbol updateTimeout = SystemARQ.allocSymbol("updateTimeout");

// This can't be a context constant because NodeValues don't look in the context.
// /**
// * Context symbol controlling Roman Numerals in Filters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
Expand Down Expand Up @@ -78,18 +76,7 @@ public ExecutionContext(DatasetGraph dataset, OpExecutorFactory factory) {
}

public ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory) {
this(params, activeGraph, dataset, factory, cancellationSignal(params));
}

private static AtomicBoolean cancellationSignal(Context cxt) {
if ( cxt == null )
return null;
try {
return cxt.get(ARQConstants.symCancelQuery);
} catch (ClassCastException ex) {
Log.error(ExecutionContext.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage());
return null;
}
this(params, activeGraph, dataset, factory, Context.getCancelSignal(params));
}

private ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) {
Expand Down
181 changes: 181 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.Symbol;

/** Processing timeout strings. */
public class Timeouts {
Expand Down Expand Up @@ -49,4 +52,182 @@ public static Pair<Long, Long> parseTimeoutStr(String str, TimeUnit unit) {
return null;
}
}

public static record DurationWithUnit(long amount, TimeUnit unit) {
public DurationWithUnit() {
this(-1, TimeUnit.MILLISECONDS);
}

public boolean isSet() {
return amount >= 0;
}

public long asMillis() {
return (amount < 0) ? amount : unit.toMillis(amount);
}
}

public static record Timeout(DurationWithUnit initialTimeout, DurationWithUnit overallTimeout) {
public static Timeout UNSET = new Timeout(-1, -1);

public Timeout(long initialTimeout, TimeUnit initialTimeoutUnit, long overallTimeout, TimeUnit overallTimeoutUnit) {
this(new DurationWithUnit(initialTimeout, initialTimeoutUnit), new DurationWithUnit(overallTimeout, overallTimeoutUnit));
}

public Timeout(long initialTimeout, long overallTimeout) {
this(initialTimeout, TimeUnit.MILLISECONDS, overallTimeout, TimeUnit.MILLISECONDS);
}

public boolean hasInitialTimeout() {
return initialTimeout().isSet();
}

public long initialTimeoutMillis() {
return initialTimeout().asMillis();
}

public boolean hasOverallTimeout() {
return overallTimeout().isSet();
}

public long overallTimeoutMillis() {
return overallTimeout().asMillis();
}

public boolean hasTimeout() {
return hasInitialTimeout() || hasOverallTimeout();
}
}

// TimeoutBuilder reserved as a possible super-interface for {Query, Update}Exec(ution)Builder.
public static class TimeoutBuilderImpl {
private static final long UNSET = -1;

protected long initialTimeout = UNSET;
protected TimeUnit initialTimeoutUnit = null;
protected long overallTimeout = UNSET;
protected TimeUnit overallTimeoutUnit = null;

public TimeoutBuilderImpl timeout(long value, TimeUnit timeUnit) {
this.initialTimeout = UNSET;
this.initialTimeoutUnit = null;
this.overallTimeout = value;
this.overallTimeoutUnit = timeUnit;
return this;
}

public TimeoutBuilderImpl initialTimeout(long value, TimeUnit timeUnit) {
this.initialTimeout = value < 0 ? -1L : value ;
this.initialTimeoutUnit = timeUnit;
return this;
}

public boolean hasInitialTimeout() {
return initialTimeout >= 0;
}

public TimeoutBuilderImpl overallTimeout(long value, TimeUnit timeUnit) {
this.overallTimeout = value;
this.overallTimeoutUnit = timeUnit;
return this;
}

public boolean hasOverallTimeout() {
return overallTimeout >= 0;
}

public Timeout build() {
return new Timeout(initialTimeout, initialTimeoutUnit, overallTimeout, overallTimeoutUnit);
}
}

/** Update any unset timeout in the builder from the specification object. */
public static void applyDefaultTimeout(TimeoutBuilderImpl builder, Timeout timeout) {
if (timeout != null) {
if ( !builder.hasInitialTimeout() )
builder.initialTimeout(timeout.initialTimeout().amount(), timeout.initialTimeout().unit());
if ( !builder.hasOverallTimeout() )
builder.overallTimeout(timeout.overallTimeout().amount(), timeout.overallTimeout().unit());
}
}

public static Timeout extractQueryTimeout(Context cxt) {
return extractTimeout(cxt, ARQ.queryTimeout);
}

public static Timeout extractUpdateTimeout(Context cxt) {
return extractTimeout(cxt, ARQ.updateTimeout);
}

public static Timeout extractTimeout(Context cxt, Symbol symbol) {
Object obj = cxt.get(symbol);
return parseTimeout(obj);
}

public static Timeout parseTimeout(Object obj) {
Timeout result = Timeout.UNSET;
if ( obj != null ) {
try {
if ( obj instanceof Timeout to ) {
result = to;
} else if ( obj instanceof Number n ) {
long x = n.longValue();
result = new Timeout(new DurationWithUnit(), new DurationWithUnit(x, TimeUnit.MILLISECONDS));
} else if ( obj instanceof String str ) {
Pair<Long, Long> pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS);
if ( pair == null ) {
Log.warn(Timeouts.class, "Bad timeout string: "+str);
return result;
}
result = new Timeout(pair.getLeft(), pair.getRight());
} else
Log.warn(Timeouts.class, "Can't interpret timeout: " + obj);
} catch (Exception ex) {
Log.warn(Timeouts.class, "Exception setting timeouts (context) from: "+obj, ex);
}
}
return result;
}

public static void setQueryTimeout(Context cxt, Timeout timeout) {
setTimeout(cxt, ARQ.queryTimeout, timeout);
}

public static void setUpdateTimeout(Context cxt, Timeout timeout) {
setTimeout(cxt, ARQ.updateTimeout, timeout);
}

public static void setTimeout(Context cxt, Symbol symbol, Timeout timeout) {
Object obj = toContextValue(timeout);
cxt.set(symbol, obj);
}

/** Inverse function of {@link #parseTimeout(Object)}. */
public static Object toContextValue(Timeout timeout) {
Object result = timeout == null
? null
: timeout.hasInitialTimeout()
? toString(timeout)
: timeout.hasOverallTimeout()
? timeout.overallTimeoutMillis()
: null;
return result;
}

/** Inverse function of {@link #parseTimeout(Object)}. */
public static String toString(Timeout timeout) {
String result = timeout.hasInitialTimeout()
? timeout.initialTimeoutMillis() + "," + timeout.overallTimeoutMillis()
: timeout.hasOverallTimeout()
? Long.toString(timeout.overallTimeoutMillis())
: null;
return result;
}

// Set times from context if not set directly. e..g Context provides default values.
// Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter.
public static void applyDefaultQueryTimeoutFromContext(TimeoutBuilderImpl builder, Context cxt) {
Timeout queryTimeout = extractQueryTimeout(cxt);
applyDefaultTimeout(builder, queryTimeout);
}
}
Loading

0 comments on commit 375244b

Please sign in to comment.