Skip to content

[#1818] fix(spark3): Avoid calling RssShuffleDataIterator.cleanup multiple times #1819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 25, 2024

Conversation

wForget
Copy link
Member

@wForget wForget commented Jun 21, 2024

What changes were proposed in this pull request?

Avoid calling RssShuffleDataIterator.cleanup multiple times.

Why are the changes needed?

Fix: #1818

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@rickyma
Copy link
Contributor

rickyma commented Jun 21, 2024

Have you tested it? When will we encounter this issue? Could you add some UTs for this?

Copy link

github-actions bot commented Jun 21, 2024

Test Results

 2 649 files  +8   2 649 suites  +8   5h 26m 53s ⏱️ -45s
   945 tests +1     944 ✅ +1   1 💤 ±0  0 ❌ ±0 
11 781 runs  +8  11 766 ✅ +8  15 💤 ±0  0 ❌ ±0 

Results for commit 26c7030. ± Comparison against base commit ceae615.

♻️ This comment has been updated with latest results.

@wForget
Copy link
Member Author

wForget commented Jun 21, 2024

Have you tested it? When will we encounter this issue? Could you add some UTs for this?

It happened by chance when I was running integration test for spark3 locally.

Comment on lines 287 to 298
new Function0<BoxedUnit>() {
private boolean cleanupCalled = false;

@Override
public BoxedUnit apply() {
if (!cleanupCalled) {
cleanupCalled = true;
context.taskMetrics().mergeShuffleReadMetrics();
iterator.cleanup();
}
return BoxedUnit.UNIT;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. It's possible that the completion would be called twice:

  1. called after being consumed all
  2. invoked in the TaskCompletionListener again in L284 - L290.
    So this code looks correct to me.

However, it's quite ugly and unreadable to add an anonymous class here directly. Would you mind to extract this as a utility method, something like:

import scala.Function0;

public class Once {
  private Once() {
  }

  public static <R> Function0<R> once(Function0<R> f) {
    return new Function0<R>() {
      private R value = null;
      private volatile boolean computed = false;

      @Override
      public R apply() {
        if (!computed) {
          computed = true;
          value = f.apply();
        }
        return value;
      }
    };
  }
}

And call Once.once here?

@rickyma
Copy link
Contributor

rickyma commented Jun 24, 2024

Maybe we can place FunctionUtils under package org.apache.uniffle.common.util? Since it looks more like a common utility?

Also, there are tests failures.

@codecov-commenter
Copy link

codecov-commenter commented Jun 24, 2024

Codecov Report

Attention: Patch coverage is 85.71429% with 1 line in your changes missing coverage. Please review.

Project coverage is 53.52%. Comparing base (dddcced) to head (26c7030).
Report is 20 commits behind head on master.

Files Patch % Lines
...n/java/org/apache/spark/shuffle/FunctionUtils.java 85.71% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1819      +/-   ##
============================================
- Coverage     53.53%   53.52%   -0.02%     
- Complexity     2356     2979     +623     
============================================
  Files           368      439      +71     
  Lines         16852    24158    +7306     
  Branches       1540     2258     +718     
============================================
+ Hits           9022    12930    +3908     
- Misses         7303    10432    +3129     
- Partials        527      796     +269     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wForget
Copy link
Member Author

wForget commented Jun 24, 2024

Maybe we can place FunctionUtils under package org.apache.uniffle.common.util? Since it looks more like a common utility?

Also, there are tests failures.

It looks like there is scala compatibility issue and we can't put it in common, so I've moved it to spark3 client.

@rickyma rickyma requested a review from advancedxy June 24, 2024 07:15
@rickyma
Copy link
Contributor

rickyma commented Jun 24, 2024

Maybe we can just use pure Java codes, something like:

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class FunctionUtils {

    public static <T> Supplier<T> once(Supplier<T> supplier) {
        return new Supplier<T>() {
            private final AtomicBoolean computed = new AtomicBoolean(false);
            private T value;

            @Override
            public T get() {
                if (computed.compareAndSet(false, true)) {
                    value = supplier.get();
                }
                return value;
            }
        };
    }

    public static void main(String[] args) {
        Supplier<String> supplier = () -> {
            System.out.println("Called only once");
            return "Hello, World!";
        };

        Supplier<String> memoizedSupplier = once(supplier);

        System.out.println(memoizedSupplier.get());
        System.out.println(memoizedSupplier.get());
        System.out.println(memoizedSupplier.get());
    }
}

Then we can avoid the Scala compatibility issue. WDYT?

@wForget
Copy link
Member Author

wForget commented Jun 24, 2024

Maybe we can just use pure Java codes, something like:

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class FunctionUtils {

    public static <T> Supplier<T> once(Supplier<T> supplier) {
        return new Supplier<T>() {
            private final AtomicBoolean computed = new AtomicBoolean(false);
            private T value;

            @Override
            public T get() {
                if (computed.compareAndSet(false, true)) {
                    value = supplier.get();
                }
                return value;
            }
        };
    }

    public static void main(String[] args) {
        Supplier<String> supplier = () -> {
            System.out.println("Called only once");
            return "Hello, World!";
        };

        Supplier<String> memoizedSupplier = once(supplier);

        System.out.println(memoizedSupplier.get());
        System.out.println(memoizedSupplier.get());
        System.out.println(memoizedSupplier.get());
    }
}

Then we can avoid the Scala compatibility issue. WDYT?

CompletionIterator$.MODULE$.apply accepts Function0 type

@rickyma
Copy link
Contributor

rickyma commented Jun 24, 2024

Then I have no more comments, let @advancedxy take another look.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let's wait some more time before @zuston can take a look since the corresponding code is added by him.

We can merge it by late tonight if no objections.

public class FunctionUtilsTests {

@Test
public void testOnceFunction0() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@advancedxy
Copy link
Contributor

It looks like there is scala compatibility issue and we can't put it in common,

Thanks for your contribution. Curious to ask, what kind of compatibility issue arise when putting in common?

@wForget
Copy link
Member Author

wForget commented Jun 24, 2024

what kind of compatibility issue arise when putting in common?

You can see more details from this failed GA: https://github.com/apache/incubator-uniffle/actions/runs/9640163249/job/26583457745

/home/runner/work/incubator-uniffle/incubator-uniffle/client-spark/common/src/main/java/org/apache/spark/shuffle/FunctionUtils.java:[25,30] error: <anonymous org.apache.spark.shuffle.FunctionUtils$1> is not abstract and does not override abstract method apply$mcV$sp() in Function0

@advancedxy
Copy link
Contributor

https://github.com/apache/incubator-uniffle/actions/runs/9640163249/job/26583457745

hmmm, I see, thanks for the info. I think the problem is caused by mixed usage of Scala 2.11(from spark 2.x) and Scala 2.12.

In my opinion, we should drop Spark 2.x support at some point, it's introducing more and more maintenance cost.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@zuston zuston changed the title [#1818] Avoid calling RssShuffleDataIterator.cleanup multiple times [#1818] fix(spark3): Avoid calling RssShuffleDataIterator.cleanup multiple times Jun 25, 2024
@zuston zuston merged commit 0971148 into apache:master Jun 25, 2024
43 checks passed
@zuston
Copy link
Member

zuston commented Jun 25, 2024

Merged. Thanks @wForget for your contribution and thanks @rickyma @advancedxy for your review

zhengchenyu pushed a commit that referenced this pull request Aug 2, 2024
…tiple times (#1819)

### What changes were proposed in this pull request?

Avoid calling `RssShuffleDataIterator.cleanup` multiple times.

### Why are the changes needed?

Fix: #1818

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] RssShuffleDataIterator.cleanup may be called multiple times
5 participants