Skip to content

Avoid reading all sample data into memory at start of layer crawling #1158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from

Conversation

StefanVanDyck
Copy link

@StefanVanDyck StefanVanDyck commented May 14, 2025

Fix for the living atlas pipeline.

Layer crawling step reads in all the data and keeps it in memory, while only processing in batches.
This caused the step to use a large amount of memory and our pipeline runs to be killed due to OOM.

This changes fixes it, by only reading in the data as required by the batch processing.

Copy link
Collaborator

@vjrj vjrj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @StefanVanDyck for this PR.

I created this to test your PR (feel free to add it to the PR):

package au.org.ala.sampling;

import au.org.ala.kvs.ALAPipelinesConfig;
import okhttp3.Request;
import okio.Timeout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.gbif.pipelines.core.config.model.SamplingConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import retrofit2.Callback;
import retrofit2.Response;

import java.io.BufferedWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/** Memory regression test for LayerCrawler. */
class LayerCrawlerMemoryIT {

  @TempDir
  Path tmpDir;

  @Test
  void extraHeapMustStayBelow110MiB() throws Exception {

    /* ---------- synthetic CSV with 1 M coords ---------- */
    Path csv = Files.createFile(tmpDir.resolve("coords.csv"));
    try (BufferedWriter w = Files.newBufferedWriter(csv, StandardCharsets.UTF_8)) {
      for (int i = 0; i < 2_000_000; i++) w.write("10.0,20.0\n");
    }

    /* ---------- output folder & local FS ---------- */
    Path outDir = Files.createDirectory(tmpDir.resolve("out"));
    FileSystem fs = FileSystem.getLocal(new Configuration(false));

    /* ---------- dummy zip for download() ---------- */
    Path zip = createDummyZip(tmpDir);
    String zipUrl = zip.toUri().toURL().toString();

    /* ---------- minimal config ---------- */
    SamplingConfig sc = new SamplingConfig();
    sc.setBatchSize(1_000);
    sc.setBatchStatusSleepTime(10);
    sc.setDownloadRetries(1);

    ALAPipelinesConfig cfg = new ALAPipelinesConfig();
    cfg.setSamplingService(sc);

    /* ---------- stub SamplingService ---------- */
    SamplingService svc = new SamplingService() {
      @Override public retrofit2.Call<Batch> submitIntersectBatch(String l,String c){
        Batch b = new Batch(); b.setBatchId("b1");
        return CallSync.of(Response.success(b));
      }
      @Override public retrofit2.Call<BatchStatus> getBatchStatus(String id){
        BatchStatus s = new BatchStatus();
        s.setStatus(LayerCrawler.FINISHED_STATUS);
        s.setDownloadUrl(zipUrl);
        return CallSync.of(Response.success(s));
      }
      @Override public retrofit2.Call<List<Field>> getFields(){
        return CallSync.of(Response.success(Collections.emptyList()));
      }
      @Override public retrofit2.Call<List<Layer>> getLayers(){
        return CallSync.of(Response.success(Collections.emptyList()));
      }
    };

    /* ---------- crawler with injected deps ---------- */
    LayerCrawler crawler = new LayerCrawler();
    inject(crawler,"config",cfg);
    inject(crawler,"service",svc);

    /* ---------- baseline after full GC ---------- */
    System.gc();
    Thread.sleep(200);
    long baseline = usedHeap();
    AtomicLong peak = new AtomicLong(baseline);

    /* ---------- sampler thread ---------- */
    Thread sampler = new Thread(() -> {
      try {
        while (!Thread.currentThread().isInterrupted()) {
          peak.updateAndGet(p -> Math.max(p, usedHeap()));
          Thread.sleep(20);
        }
      } catch (InterruptedException ignored) {}
    });
    sampler.start();

    /* ---------- run crawler ---------- */
    crawler.crawl(fs, "1", csv.toString(), outDir.toString());

    sampler.interrupt();
    long growth = peak.get() - baseline;

    assert growth < 110L * 1024 * 1024 :
        "extra heap " + growth / (1024*1024) + " MiB";
  }

  /* ---------- helpers -------------------------------------------------- */

  private static long usedHeap() {
    Runtime r = Runtime.getRuntime();
    return r.totalMemory() - r.freeMemory();
  }

  private static void inject(Object target,String field,Object value)throws Exception{
    var f = target.getClass().getDeclaredField(field);
    f.setAccessible(true); f.set(target,value);
  }

  private static Path createDummyZip(Path dir)throws Exception{
    Path z = dir.resolve("dummy.zip");
    try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(z))) {
      zos.putNextEntry(new ZipEntry("sample.csv"));
      zos.write("id,layer\n".getBytes(StandardCharsets.UTF_8));
      zos.closeEntry();
    }
    return z;
  }

  /** Minimal synchronous Retrofit Call (no extra deps). */
  private static final class CallSync<T> implements retrofit2.Call<T> {
    private final Response<T> resp;
    private CallSync(Response<T> r){ this.resp=r; }
    static <T> CallSync<T> of(Response<T> r){ return new CallSync<>(r); }
    @Override public Response<T> execute(){ return resp; }
    @Override public void enqueue(Callback<T> cb){ throw new UnsupportedOperationException(); }
    @Override public boolean isExecuted(){ return true; }
    @Override public void cancel(){}
    @Override public boolean isCanceled(){ return false; }
    @Override public retrofit2.Call<T> clone(){ return new CallSync<>(resp); }
    @Override public Request request(){ return new Request.Builder().url("http://localhost/").build(); }
    @Override public Timeout timeout(){ return Timeout.NONE; }
  }
}

That fails with the previous version of LayerCrawler and works with yours.

In graphs, before:
image

After:
image

More than the missing import, LGTM. Thanks!

PS: I think that this kind of PR (adding a similar test) can be a good strategy to fix other LA pipelines steps with similar issues.

@@ -31,6 +30,7 @@
import org.gbif.pipelines.core.utils.FsUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.MDC;
import org.spark_project.guava.collect.Iterables;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have this dep, instead:

import com.google.common.collect.Iterables;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super cool test, never did soemthing like that before.
Although the fix was somewhat trivial 😅

Not sure if the other places with issues will be as straight-forward 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this needed by the test:

diff --git a/livingatlas/pipelines/pom.xml b/livingatlas/pipelines/pom.xml
index e1e94eb98..244788601 100644
--- a/livingatlas/pipelines/pom.xml
+++ b/livingatlas/pipelines/pom.xml
@@ -49,11 +49,13 @@
     <download-maven-plugin.version>1.6.1</download-maven-plugin.version>
     <jcabi-log.version>0.18.1</jcabi-log.version>
 
+
     <!-- use same version as okhttp -->
     <!-- avoids this issue https://stackoverflow.com/questions/56854548/mockwebserver-java-lang-nosuchmethoderror -->
     <mockwebserver.version>4.2.2</mockwebserver.version>
     <checker-qual.version>3.9.1</checker-qual.version>
     <snappy-java.version>1.1.8.4</snappy-java.version>
+    <junit-jupiter.version>5.7.2</junit-jupiter.version>
 
     <hadoop.version>3.4.0</hadoop.version>
     <maven-scala-plugin.version>4.8.0</maven-scala-plugin.version>
@@ -763,6 +765,12 @@
       </exclusions>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>${junit-jupiter.version}</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- ALA's layers store -->
     <dependency>
@@ -851,6 +859,12 @@
       <version>2.23.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>5.7.2</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

In general, I like to add tests for many reasons, but in this case is useful to argue that your PR is working (if you increase the number of coordinates to test, it's more obvious the improvement but slower).

I'm trying to find a similar issue in other part of the code.

@vjrj
Copy link
Collaborator

vjrj commented May 29, 2025

cc @h-m-jones

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants