Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelIterable: Queue Size w/ O(1) #11895

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -85,14 +86,16 @@ static class ParallelIterator<T> implements CloseableIterator<T> {
private final ExecutorService workerPool;
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final AtomicInteger queueSize = new AtomicInteger(0);
private final AtomicBoolean closed = new AtomicBoolean(false);

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
Preconditions.checkArgument(maxQueueSize > 0, "Max queue size must be greater than 0");
this.tasks =
Iterables.transform(
iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize))
iterables,
iterable -> new Task<>(iterable, queue, queueSize, closed, maxQueueSize))
.iterator();
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
Expand Down Expand Up @@ -130,6 +133,7 @@ public void close() {

// clean queue
this.queue.clear();
this.queueSize.set(0);
} catch (IOException e) {
throw new UncheckedIOException("Close failed", e);
}
Expand Down Expand Up @@ -226,18 +230,23 @@ public synchronized T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return queue.poll();
T result = queue.poll();
if (result != null) {
queueSize.decrementAndGet();
}
return result;
}

@VisibleForTesting
int queueSize() {
return queue.size();
return queueSize.get();
}
}

private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
private final Iterable<T> input;
private final ConcurrentLinkedQueue<T> queue;
private final AtomicInteger queueSize;
private final AtomicBoolean closed;
private final int approximateMaxQueueSize;

Expand All @@ -246,10 +255,12 @@ private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
Task(
Iterable<T> input,
ConcurrentLinkedQueue<T> queue,
AtomicInteger queueSize,
AtomicBoolean closed,
int approximateMaxQueueSize) {
this.input = Preconditions.checkNotNull(input, "input cannot be null");
this.queue = Preconditions.checkNotNull(queue, "queue cannot be null");
this.queueSize = Preconditions.checkNotNull(queueSize, "queue cannot be null");
this.closed = Preconditions.checkNotNull(closed, "closed cannot be null");
this.approximateMaxQueueSize = approximateMaxQueueSize;
}
Expand All @@ -262,7 +273,7 @@ public Optional<Task<T>> get() {
}

while (iterator.hasNext()) {
if (queue.size() >= approximateMaxQueueSize) {
if (queueSize.get() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
Expand All @@ -274,6 +285,7 @@ public Optional<Task<T>> get() {
}

queue.add(next);
queueSize.incrementAndGet();
}
} catch (Throwable e) {
try {
Expand Down
Loading