Skip to content

Avoid reading all sample data into memory at start of layer crawling #1158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.gbif.pipelines.common.PipelinesException;
Expand All @@ -31,6 +30,7 @@
import org.gbif.pipelines.core.utils.FsUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.MDC;
import org.spark_project.guava.collect.Iterables;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have this dep, instead:

import com.google.common.collect.Iterables;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super cool test, never did soemthing like that before.
Although the fix was somewhat trivial 😅

Not sure if the other places with issues will be as straight-forward 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this needed by the test:

diff --git a/livingatlas/pipelines/pom.xml b/livingatlas/pipelines/pom.xml
index e1e94eb98..244788601 100644
--- a/livingatlas/pipelines/pom.xml
+++ b/livingatlas/pipelines/pom.xml
@@ -49,11 +49,13 @@
     <download-maven-plugin.version>1.6.1</download-maven-plugin.version>
     <jcabi-log.version>0.18.1</jcabi-log.version>
 
+
     <!-- use same version as okhttp -->
     <!-- avoids this issue https://stackoverflow.com/questions/56854548/mockwebserver-java-lang-nosuchmethoderror -->
     <mockwebserver.version>4.2.2</mockwebserver.version>
     <checker-qual.version>3.9.1</checker-qual.version>
     <snappy-java.version>1.1.8.4</snappy-java.version>
+    <junit-jupiter.version>5.7.2</junit-jupiter.version>
 
     <hadoop.version>3.4.0</hadoop.version>
     <maven-scala-plugin.version>4.8.0</maven-scala-plugin.version>
@@ -763,6 +765,12 @@
       </exclusions>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>${junit-jupiter.version}</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- ALA's layers store -->
     <dependency>
@@ -851,6 +859,12 @@
       <version>2.23.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>5.7.2</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

In general, I like to add tests for many reasons, but in this case is useful to argue that your PR is working (if you increase the number of coordinates to test, it's more obvious the improvement but slower).

I'm trying to find a similar issue in other part of the code.

import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.converter.jackson.JacksonConverterFactory;
Expand Down Expand Up @@ -206,73 +206,81 @@ public void crawl(FileSystem fs, String layers, String inputFilePath, String out
InputStream inputStream = ALAFsUtils.openInputStream(fs, inputFilePath);
Collection<List<String>> partitioned;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
partitioned = partition(reader.lines(), config.getSamplingService().getBatchSize());
Iterables.partition(reader.lines()::iterator, config.getSamplingService().getBatchSize())
.forEach(
(partition) -> {
processPartition(fs, layers, inputFilePath, outputDirectoryPath, partition);
});
}
}

for (List<String> partition : partitioned) {

log.info("Partition size (no of coordinates) : {}", partition.size());
String coords = String.join(",", partition);

Instant batchStart = Instant.now();
@SneakyThrows
private void processPartition(
FileSystem fs,
String layers,
String inputFilePath,
String outputDirectoryPath,
List<String> partition) {
log.info("Partition size (no of coordinates) : {}", partition.size());
String coords = String.join(",", partition);

// Submit a job to generate a join
Response<SamplingService.Batch> submit =
service.submitIntersectBatch(layers, coords).execute();
String batchId = submit.body().getBatchId();
Instant batchStart = Instant.now();

String state = UNKNOWN_STATUS;
while (!state.equalsIgnoreCase(FINISHED_STATUS) && !state.equalsIgnoreCase(ERROR_STATUS)) {
Response<SamplingService.BatchStatus> status = service.getBatchStatus(batchId).execute();
SamplingService.BatchStatus batchStatus = status.body();
state = batchStatus.getStatus();
// Submit a job to generate a join
Response<SamplingService.Batch> submit = service.submitIntersectBatch(layers, coords).execute();
String batchId = submit.body().getBatchId();

Instant batchCurrentTime = Instant.now();
String state = UNKNOWN_STATUS;
while (!state.equalsIgnoreCase(FINISHED_STATUS) && !state.equalsIgnoreCase(ERROR_STATUS)) {
Response<SamplingService.BatchStatus> status = service.getBatchStatus(batchId).execute();
SamplingService.BatchStatus batchStatus = status.body();
state = batchStatus.getStatus();

log.info(
"batch ID {} - status: {} - time elapses {} seconds",
batchId,
state,
Duration.between(batchStart, batchCurrentTime).getSeconds());
Instant batchCurrentTime = Instant.now();

if (!state.equals(FINISHED_STATUS)) {
TimeUnit.MILLISECONDS.sleep(config.getSamplingService().getBatchStatusSleepTime());
} else {
log.info("Downloading sampling batch {}", batchId);
log.info(
"batch ID {} - status: {} - time elapses {} seconds",
batchId,
state,
Duration.between(batchStart, batchCurrentTime).getSeconds());

downloadFile(fs, outputDirectoryPath, batchId, batchStatus);
if (!state.equals(FINISHED_STATUS)) {
TimeUnit.MILLISECONDS.sleep(config.getSamplingService().getBatchStatusSleepTime());
} else {
log.info("Downloading sampling batch {}", batchId);

String zipFilePath = outputDirectoryPath + "/" + batchId + ".zip";
ReadableByteChannel readableByteChannel = ALAFsUtils.openByteChannel(fs, zipFilePath);
InputStream zipInput = Channels.newInputStream(readableByteChannel);
downloadFile(fs, outputDirectoryPath, batchId, batchStatus);

try (ZipInputStream zipInputStream = new ZipInputStream(zipInput)) {
ZipEntry entry = zipInputStream.getNextEntry();
while (entry != null) {
log.info("Unzipping {}", entry.getName());
String zipFilePath = outputDirectoryPath + "/" + batchId + ".zip";
ReadableByteChannel readableByteChannel = ALAFsUtils.openByteChannel(fs, zipFilePath);
InputStream zipInput = Channels.newInputStream(readableByteChannel);

String unzippedOutputFilePath = outputDirectoryPath + "/" + batchId + ".csv";
if (!entry.isDirectory()) {
unzipFiles(fs, zipInputStream, unzippedOutputFilePath);
}
try (ZipInputStream zipInputStream = new ZipInputStream(zipInput)) {
ZipEntry entry = zipInputStream.getNextEntry();
while (entry != null) {
log.info("Unzipping {}", entry.getName());

zipInputStream.closeEntry();
entry = zipInputStream.getNextEntry();
String unzippedOutputFilePath = outputDirectoryPath + "/" + batchId + ".csv";
if (!entry.isDirectory()) {
unzipFiles(fs, zipInputStream, unzippedOutputFilePath);
}

zipInputStream.closeEntry();
entry = zipInputStream.getNextEntry();
}
}

// delete zip file
ALAFsUtils.deleteIfExist(fs, zipFilePath);
// delete zip file
ALAFsUtils.deleteIfExist(fs, zipFilePath);

log.info("Sampling done for file {}", inputFilePath);
}
log.info("Sampling done for file {}", inputFilePath);
}
}

if (state.equals(ERROR_STATUS)) {
log.error("Unable to download batch ID {}", batchId);
throw new PipelinesException(
"Unable to complete sampling for dataset. Check the status of sampling service for more details");
}
if (state.equals(ERROR_STATUS)) {
log.error("Unable to download batch ID {}", batchId);
throw new PipelinesException(
"Unable to complete sampling for dataset. Check the status of sampling service for more details");
}
}

Expand Down Expand Up @@ -306,15 +314,6 @@ private boolean downloadFile(
return false;
}

/**
* Util to partition a stream into fixed size windows. See
* https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/
*/
private static <T> Collection<List<T>> partition(Stream<T> stream, int size) {
final AtomicInteger counter = new AtomicInteger(0);
return stream.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size)).values();
}

/** Unzip the file to the path. */
public static void unzipFiles(
final FileSystem fs, final ZipInputStream zipInputStream, final String unzippedFilePath)
Expand Down