Skip to content

Commit

Permalink
apacheGH-2821: WIP on passing timeouts from update exec to underlying…
Browse files Browse the repository at this point in the history
… query exec.
  • Loading branch information
Aklakan committed Nov 10, 2024
1 parent 1db78a5 commit c70164d
Show file tree
Hide file tree
Showing 26 changed files with 473 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.net.http.HttpClient;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.jena.graph.Node;
Expand Down Expand Up @@ -150,6 +151,8 @@ public String buildString() {
private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext());
// Uses query rewrite to replace variables by values.
protected Map<Var, Node> substitutionMap = new HashMap<>();
protected long timeout = -1;
protected TimeUnit timeoutUnit = null;

protected ExecUpdateHTTPBuilder() {}

Expand Down Expand Up @@ -214,6 +217,12 @@ public Y substitution(Var var, Node value) {
return thisBuilder();
}

public Y timeout(long timeout, TimeUnit timeoutUnit) {
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
return thisBuilder();
}

public Y httpClient(HttpClient httpClient) {
this.httpClient = Objects.requireNonNull(httpClient);
return thisBuilder();
Expand Down
11 changes: 11 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ARQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ public static void enableBlankNodeResultLabels(boolean val) {
*/
public static final Symbol queryTimeout = SystemARQ.allocSymbol("queryTimeout");

/**
* Set timeout. The value of this symbol gives the value of the timeout in milliseconds
* <ul>
* <li>A Number; the long value is used</li>
* <li>A string, e.g. "1000", parsed as a number</li>
* <li>A string, as two numbers separated by a comma, e.g. "500,10000" parsed as two numbers</li>
* </ul>
* @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long, TimeUnit)
*/
public static final Symbol updateTimeout = SystemARQ.allocSymbol("updateTimeout");

// This can't be a context constant because NodeValues don't look in the context.
// /**
// * Context symbol controlling Roman Numerals in Filters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
Expand Down Expand Up @@ -78,18 +76,7 @@ public ExecutionContext(DatasetGraph dataset, OpExecutorFactory factory) {
}

public ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory) {
this(params, activeGraph, dataset, factory, cancellationSignal(params));
}

private static AtomicBoolean cancellationSignal(Context cxt) {
if ( cxt == null )
return null;
try {
return cxt.get(ARQConstants.symCancelQuery);
} catch (ClassCastException ex) {
Log.error(ExecutionContext.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage());
return null;
}
this(params, activeGraph, dataset, factory, Context.getCancelSignal(params));
}

private ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public QueryIterProcessBinding(QueryIterator qIter, ExecutionContext context) {
nextBinding = null ;
AtomicBoolean signal;
try {
signal = context.getContext().get(ARQConstants.symCancelQuery);
signal = context.getCancelSignal();
// FIXME Is above the same as this: context.getContext().get(ARQConstants.symCancelQuery);
} catch(Exception ex) {
signal = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.jena.sparql.exec;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
import org.apache.jena.sparql.engine.iterator.QueryIteratorWrapper;
import org.apache.jena.sparql.exec.TimeoutLib.Timeout;
import org.apache.jena.sparql.graph.GraphOps;
import org.apache.jena.sparql.modify.TemplateLib;
import org.apache.jena.sparql.syntax.ElementGroup;
Expand Down Expand Up @@ -89,22 +89,25 @@ public class QueryExecDataset implements QueryExec
private long timeout2 = TIMEOUT_UNSET;
private final AlarmClock alarmClock = AlarmClock.get();
private long queryStartTime = -1; // Unset
private AtomicBoolean cancelSignal = new AtomicBoolean(false);
private AtomicBoolean cancelSignal;

protected QueryExecDataset(Query query, String queryString, DatasetGraph datasetGraph, Context cxt,
QueryEngineFactory qeFactory,
long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2,
QueryEngineFactory qeFactory, Timeout timeout,
Binding initialToEngine) {
// Context cxt is already a safe copy.
this.query = query;
this.queryString = queryString;
this.dataset = datasetGraph;
this.qeFactory = qeFactory;
this.context = (cxt == null) ? Context.setupContextForDataset(cxt, datasetGraph) : cxt;
this.timeout1 = asMillis(timeout1, timeUnit1);
this.timeout2 = asMillis(timeout2, timeUnit2);
this.timeout1 = timeout.initialTimeoutMillis();
this.timeout2 = timeout.overallTimeoutMillis();
// See also query substitution handled in QueryExecBuilder
this.initialBinding = initialToEngine;

// Cancel signal may originate from an e.c. an update execution.
this.cancelSignal = Context.getOrSetCancelSignal(context);

init();
}

Expand All @@ -114,10 +117,6 @@ private void init() {
context.put(ARQConstants.sysCurrentQuery, query);
}

private static long asMillis(long duration, TimeUnit timeUnit) {
return (duration < 0) ? duration : timeUnit.toMillis(duration);
}

@Override
public void close() {
closed = true;
Expand Down Expand Up @@ -457,23 +456,6 @@ private void startQueryIteratorActual() {

execInit();

// JENA-2821 - Unconditionally provide a cancel signal because manual abort via QueryExec.abort()
// may be triggered any time, even if no timeouts were configured.
// Prior to this issue, the cancel signal was only provided when timeouts were configured.

// The following note is older:
// JENA-2141 - the timeout can go off while building the query iterator structure.
// In this case, use a signal passed through the context.
// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
//
// This applies to the time to first result because to get the first result, the
// queryIterator must have been built. So it does not apply for the second
// stage of N,-1 or N,M.
context.set(ARQConstants.symCancelQuery, cancelSignal);


/* Timeouts:
* -1,-1 No timeouts
* N, same as -1,N Overall timeout only. No wrapper needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
Expand All @@ -34,8 +33,9 @@
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.QueryEngineFactory;
import org.apache.jena.sparql.engine.QueryEngineRegistry;
import org.apache.jena.sparql.engine.Timeouts;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.exec.TimeoutLib.Timeout;
import org.apache.jena.sparql.exec.TimeoutLib.TimeoutBuilderImpl;
import org.apache.jena.sparql.syntax.syntaxtransform.QueryTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
Expand Down Expand Up @@ -69,10 +69,7 @@ public static QueryExecDatasetBuilder create() {

// Uses initial binding to execution (old, original) feature
private Binding initialBinding = null;
private long initialTimeout = UNSET;
private TimeUnit initialTimeoutUnit = null;
private long overallTimeout = UNSET;
private TimeUnit overallTimeoutUnit = null;
private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl();

private QueryExecDatasetBuilder() { }

Expand Down Expand Up @@ -166,60 +163,22 @@ public QueryExecDatasetBuilder initialBinding(Binding binding) {

@Override
public QueryExecDatasetBuilder timeout(long value, TimeUnit timeUnit) {
this.initialTimeout = UNSET;
this.initialTimeoutUnit = null;
this.overallTimeout = value;
this.overallTimeoutUnit = timeUnit;
timeoutBuilder.timeout(value, timeUnit);
return this;
}

@Override
public QueryExecDatasetBuilder initialTimeout(long value, TimeUnit timeUnit) {
this.initialTimeout = value < 0 ? -1L : value ;
this.initialTimeoutUnit = timeUnit;
timeoutBuilder.initialTimeout(value, timeUnit);
return this;
}

@Override
public QueryExecDatasetBuilder overallTimeout(long value, TimeUnit timeUnit) {
this.overallTimeout = value;
this.overallTimeoutUnit = timeUnit;
timeoutBuilder.overallTimeout(value, timeUnit);
return this;
}

// Set times from context if not set directly. e..g Context provides default values.
// Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter.
private static void defaultTimeoutsFromContext(QueryExecDatasetBuilder builder, Context cxt) {
applyTimeouts(builder, cxt.get(ARQ.queryTimeout));
}

/** Take obj, find the timeout(s) and apply to the builder */
private static void applyTimeouts(QueryExecDatasetBuilder builder, Object obj) {
if ( obj == null )
return ;
try {
if ( obj instanceof Number ) {
long x = ((Number)obj).longValue();
if ( builder.overallTimeout < 0 )
builder.overallTimeout(x, TimeUnit.MILLISECONDS);
} else if ( obj instanceof String ) {
String str = obj.toString();
Pair<Long, Long> pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS);
if ( pair == null ) {
Log.warn(builder, "Bad timeout string: "+str);
return ;
}
if ( builder.initialTimeout < 0 )
builder.initialTimeout(pair.getLeft(), TimeUnit.MILLISECONDS);
if ( builder.overallTimeout < 0 )
builder.overallTimeout(pair.getRight(), TimeUnit.MILLISECONDS);
} else
Log.warn(builder, "Can't interpret timeout: " + obj);
} catch (Exception ex) {
Log.warn(builder, "Exception setting timeouts (context) from: "+obj);
}
}

@Override
public QueryExec build() {
Objects.requireNonNull(query, "No query for QueryExec");
Expand All @@ -243,17 +202,17 @@ public QueryExec build() {
queryStringActual = null;
}

defaultTimeoutsFromContext(this, cxt);
TimeoutLib.defaultTimeoutsFromContext(this.timeoutBuilder, cxt);

if ( dataset != null )
cxt.set(ARQConstants.sysCurrentDataset, DatasetFactory.wrap(dataset));
if ( queryActual != null )
cxt.set(ARQConstants.sysCurrentQuery, queryActual);

Timeout timeout = timeoutBuilder.build();

QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual, dataset, cxt, qeFactory,
initialTimeout, initialTimeoutUnit,
overallTimeout, overallTimeoutUnit,
initialBinding);
timeout, initialBinding);
return qExec;
}
}
Loading

0 comments on commit c70164d

Please sign in to comment.