From 8f1cd3c1189a290a072e645ebd3cac27eb6385b5 Mon Sep 17 00:00:00 2001 From: Stefan Van Dyck Date: Fri, 9 May 2025 10:42:34 +0200 Subject: [PATCH] Avoid reading all sample data into memory at start of layer crawling. Caused OOM errors in our pipeline --- .../au/org/ala/sampling/LayerCrawler.java | 119 +++++++++--------- 1 file changed, 59 insertions(+), 60 deletions(-) diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/sampling/LayerCrawler.java b/livingatlas/pipelines/src/main/java/au/org/ala/sampling/LayerCrawler.java index e8ad2ea189..7a04e52967 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/sampling/LayerCrawler.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/sampling/LayerCrawler.java @@ -18,11 +18,10 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileSystem; import org.gbif.pipelines.common.PipelinesException; @@ -31,6 +30,7 @@ import org.gbif.pipelines.core.utils.FsUtils; import org.jetbrains.annotations.NotNull; import org.slf4j.MDC; +import org.spark_project.guava.collect.Iterables; import retrofit2.Response; import retrofit2.Retrofit; import retrofit2.converter.jackson.JacksonConverterFactory; @@ -206,73 +206,81 @@ public void crawl(FileSystem fs, String layers, String inputFilePath, String out InputStream inputStream = ALAFsUtils.openInputStream(fs, inputFilePath); Collection> partitioned; try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - partitioned = partition(reader.lines(), config.getSamplingService().getBatchSize()); + Iterables.partition(reader.lines()::iterator, config.getSamplingService().getBatchSize()) + .forEach( + (partition) -> { + processPartition(fs, layers, inputFilePath, outputDirectoryPath, partition); + }); } + } - for (List partition : partitioned) { - - log.info("Partition size (no of coordinates) : {}", partition.size()); - String coords = String.join(",", partition); - - Instant batchStart = Instant.now(); + @SneakyThrows + private void processPartition( + FileSystem fs, + String layers, + String inputFilePath, + String outputDirectoryPath, + List partition) { + log.info("Partition size (no of coordinates) : {}", partition.size()); + String coords = String.join(",", partition); - // Submit a job to generate a join - Response submit = - service.submitIntersectBatch(layers, coords).execute(); - String batchId = submit.body().getBatchId(); + Instant batchStart = Instant.now(); - String state = UNKNOWN_STATUS; - while (!state.equalsIgnoreCase(FINISHED_STATUS) && !state.equalsIgnoreCase(ERROR_STATUS)) { - Response status = service.getBatchStatus(batchId).execute(); - SamplingService.BatchStatus batchStatus = status.body(); - state = batchStatus.getStatus(); + // Submit a job to generate a join + Response submit = service.submitIntersectBatch(layers, coords).execute(); + String batchId = submit.body().getBatchId(); - Instant batchCurrentTime = Instant.now(); + String state = UNKNOWN_STATUS; + while (!state.equalsIgnoreCase(FINISHED_STATUS) && !state.equalsIgnoreCase(ERROR_STATUS)) { + Response status = service.getBatchStatus(batchId).execute(); + SamplingService.BatchStatus batchStatus = status.body(); + state = batchStatus.getStatus(); - log.info( - "batch ID {} - status: {} - time elapses {} seconds", - batchId, - state, - Duration.between(batchStart, batchCurrentTime).getSeconds()); + Instant batchCurrentTime = Instant.now(); - if (!state.equals(FINISHED_STATUS)) { - TimeUnit.MILLISECONDS.sleep(config.getSamplingService().getBatchStatusSleepTime()); - } else { - log.info("Downloading sampling batch {}", batchId); + log.info( + "batch ID {} - status: {} - time elapses {} seconds", + batchId, + state, + Duration.between(batchStart, batchCurrentTime).getSeconds()); - downloadFile(fs, outputDirectoryPath, batchId, batchStatus); + if (!state.equals(FINISHED_STATUS)) { + TimeUnit.MILLISECONDS.sleep(config.getSamplingService().getBatchStatusSleepTime()); + } else { + log.info("Downloading sampling batch {}", batchId); - String zipFilePath = outputDirectoryPath + "/" + batchId + ".zip"; - ReadableByteChannel readableByteChannel = ALAFsUtils.openByteChannel(fs, zipFilePath); - InputStream zipInput = Channels.newInputStream(readableByteChannel); + downloadFile(fs, outputDirectoryPath, batchId, batchStatus); - try (ZipInputStream zipInputStream = new ZipInputStream(zipInput)) { - ZipEntry entry = zipInputStream.getNextEntry(); - while (entry != null) { - log.info("Unzipping {}", entry.getName()); + String zipFilePath = outputDirectoryPath + "/" + batchId + ".zip"; + ReadableByteChannel readableByteChannel = ALAFsUtils.openByteChannel(fs, zipFilePath); + InputStream zipInput = Channels.newInputStream(readableByteChannel); - String unzippedOutputFilePath = outputDirectoryPath + "/" + batchId + ".csv"; - if (!entry.isDirectory()) { - unzipFiles(fs, zipInputStream, unzippedOutputFilePath); - } + try (ZipInputStream zipInputStream = new ZipInputStream(zipInput)) { + ZipEntry entry = zipInputStream.getNextEntry(); + while (entry != null) { + log.info("Unzipping {}", entry.getName()); - zipInputStream.closeEntry(); - entry = zipInputStream.getNextEntry(); + String unzippedOutputFilePath = outputDirectoryPath + "/" + batchId + ".csv"; + if (!entry.isDirectory()) { + unzipFiles(fs, zipInputStream, unzippedOutputFilePath); } + + zipInputStream.closeEntry(); + entry = zipInputStream.getNextEntry(); } + } - // delete zip file - ALAFsUtils.deleteIfExist(fs, zipFilePath); + // delete zip file + ALAFsUtils.deleteIfExist(fs, zipFilePath); - log.info("Sampling done for file {}", inputFilePath); - } + log.info("Sampling done for file {}", inputFilePath); } + } - if (state.equals(ERROR_STATUS)) { - log.error("Unable to download batch ID {}", batchId); - throw new PipelinesException( - "Unable to complete sampling for dataset. Check the status of sampling service for more details"); - } + if (state.equals(ERROR_STATUS)) { + log.error("Unable to download batch ID {}", batchId); + throw new PipelinesException( + "Unable to complete sampling for dataset. Check the status of sampling service for more details"); } } @@ -306,15 +314,6 @@ private boolean downloadFile( return false; } - /** - * Util to partition a stream into fixed size windows. See - * https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/ - */ - private static Collection> partition(Stream stream, int size) { - final AtomicInteger counter = new AtomicInteger(0); - return stream.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size)).values(); - } - /** Unzip the file to the path. */ public static void unzipFiles( final FileSystem fs, final ZipInputStream zipInputStream, final String unzippedFilePath)