Skip to content

Commit

Permalink
Merge pull request #19 from Nuix/bulk-redactor-threading
Browse files Browse the repository at this point in the history
add parellelism to bulk redactor
  • Loading branch information
JuicyDragon authored Jun 24, 2021
2 parents ef90197 + 9b2dc28 commit 932b76f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import java.util.Collection;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;

import jxl.common.Logger;
import nuix.Case;
import nuix.Item;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.nuix.superutilities.misc.SQLiteBacked;
import com.nuix.superutilities.query.QueryHelper;

import jxl.common.Logger;
import org.apache.log4j.Logger;
import nuix.BulkAnnotater;
import nuix.Case;
import nuix.Item;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.TreeMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -69,7 +71,7 @@ public void whenProgressUpdated(Consumer<BulkRedactorProgressInfo> callback) {
progressUpdatedCallback = callback;
}

private void fireProgressUpdated(BulkRedactorProgressInfo info) {
private synchronized void fireProgressUpdated(BulkRedactorProgressInfo info) {
if(progressUpdatedCallback != null) {
progressUpdatedCallback.accept(info);
}
Expand Down Expand Up @@ -197,13 +199,17 @@ public List<NuixImageAnnotationRegion> findExpressionsInPdfFile(File file, Colle
* @param nuixCase The source Nuix case. Needed to obtain items (if none were given) and/or obtain the appropriate markup set.
* @param settings The settings used to find and generate the redactions.
* @param scopeItems Items to find and redact.
* @param concurrency How many threads to put in ForkJoinPool
* @throws Exception If something goes wrong
* @return Returns a list of all match region objects (so they can be reported, inspected, etc)
*/
public List<NuixImageAnnotationRegion> findAndMarkup(Case nuixCase, BulkRedactorSettings settings, Collection<Item> scopeItems) throws Exception {
public List<NuixImageAnnotationRegion> findAndMarkup(Case nuixCase, BulkRedactorSettings settings, Collection<Item> scopeItems, int concurrency) throws Exception {
Collection<Item> itemsToProcess;
if(scopeItems == null || scopeItems.size() < 1) {
logger.info("No scopeItems were provided, using all items in case");
scopeItems = nuixCase.search("");
itemsToProcess = nuixCase.search("");
} else {
itemsToProcess = scopeItems;
}

List<NuixImageAnnotationRegion> allFoundRegions = new ArrayList<NuixImageAnnotationRegion>();
Expand All @@ -213,12 +219,13 @@ public List<NuixImageAnnotationRegion> findAndMarkup(Case nuixCase, BulkRedactor
com.nuix.data.util.aspose.AsposePdf.ensureInitialised();

PdfWorkCache pdfCache = new PdfWorkCache(settings.getTempDirectory());
MarkupSet markupSet = null;
if (settings.getApplyRedactions() || settings.getApplyRedactions()) {
MarkupSet markupSet;
if (settings.getApplyRedactions() || settings.getApplyRedactions()) {
markupSet = settings.getMarkupSet(nuixCase);
} else {
markupSet = null;
}
int currentIteration = 0;
int matches = 0;


logMessage("Regular Expressions:");
for(String expression : settings.getExpressions()) {
Expand All @@ -230,53 +237,75 @@ public List<NuixImageAnnotationRegion> findAndMarkup(Case nuixCase, BulkRedactor
logMessage(namedEntity);
}

for(Item item : scopeItems) {
currentIteration += 1;
File tempPdf = pdfCache.getPdfPath(item);

List<NuixImageAnnotationRegion> regions = findExpressionsInPdfFile(tempPdf, settings.getExpressions());
if(regions.size() > 0) {
for(NuixImageAnnotationRegion region : regions) {
region.setItem(item);
}
allFoundRegions.addAll(regions);
logMessage("Item with GUID %s had %s matches",item.getGuid(),regions.size());
for(NuixImageAnnotationRegion region : regions) {
matches++;
if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); }
if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); }
}
}

//Named entities require that we get matched values, convert those to expressions and then do another pass
if (settings.getNamedEntityTypes().size() > 0) {
Set<String> entityValues = new HashSet<String>();
for(String entityType : settings.getNamedEntityTypes()) {
entityValues.addAll(item.getEntities(entityType));
}

Set<String> entityExpressions = entityValues.stream().map(v -> BulkRedactorSettings.phraseToExpression(v)).collect(Collectors.toSet());
List<NuixImageAnnotationRegion> entityRegions = findExpressionsInPdfFile(tempPdf, entityExpressions);
if(entityRegions.size() > 0) {
for(NuixImageAnnotationRegion region : entityRegions) {
region.setItem(item);
int scopeItemsSize = scopeItems.size();
AtomicInteger currentIteration = new AtomicInteger(0);
AtomicInteger matches = new AtomicInteger(0);

Consumer<Item> workHorse = new Consumer<Item>() {
@Override
public void accept(Item item) {
try {
currentIteration.addAndGet(1);
File tempPdf = pdfCache.getPdfPath(item);

Set<String> allExpressions = new HashSet<String>();
allExpressions.addAll(settings.getExpressions());
if (settings.getNamedEntityTypes().size() > 0) {
Set<String> entityValues = new HashSet<String>();
for(String entityType : settings.getNamedEntityTypes()) {
entityValues.addAll(item.getEntities(entityType));
}
entityValues.stream().map(v -> BulkRedactorSettings.phraseToExpression(v)).forEach(new Consumer<String>() {
@Override
public void accept(String exp) {
allExpressions.add(exp);
}
});
}
allFoundRegions.addAll(regions);
logMessage("Item with GUID %s had %s named entity matches",item.getGuid(),entityRegions.size());
for(NuixImageAnnotationRegion region : entityRegions) {
matches++;
if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); }
if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); }
}

List<NuixImageAnnotationRegion> regions = findExpressionsInPdfFile(tempPdf, allExpressions);
if(regions.size() > 0) {
for(NuixImageAnnotationRegion region : regions) {
region.setItem(item);
}
allFoundRegions.addAll(regions);
logMessage("Item with GUID %s had %s matches",item.getGuid(),regions.size());
for(NuixImageAnnotationRegion region : regions) {
if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); }
if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); }
}
matches.addAndGet(regions.size());
}

pdfCache.forgetItem(item);

// Report progress
synchronized(this) {
BulkRedactorProgressInfo progressInfo = new BulkRedactorProgressInfo();
progressInfo.setCurrent(currentIteration.get());
progressInfo.setTotal(scopeItemsSize);
progressInfo.setMatches(matches.get());
fireProgressUpdated(progressInfo);
}
} catch (Exception e) {
logMessage("Exception processing item with GUID %s, %s (See Nuix logs for more detail)", item.getGuid(), e.getMessage());
logger.error(String.format("Error while processing item with GUID %s", item.getGuid()),e);
}
}

// Report progress
BulkRedactorProgressInfo progressInfo = new BulkRedactorProgressInfo();
progressInfo.setCurrent(currentIteration);
progressInfo.setTotal(scopeItems.size());
progressInfo.setMatches(matches);
fireProgressUpdated(progressInfo);
};

ForkJoinPool pool = null;
try {
pool = new ForkJoinPool(concurrency);
pool.submit(()->{
itemsToProcess.parallelStream().forEach(workHorse);
}).get();
} catch (Exception e) {
logger.error("Error while scanning",e);
throw e;
} finally {
if(pool != null)
pool.shutdown();
}

logMessage("Cleaning up temp directory %s",settings.getTempDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public synchronized File getPdfPath(Item item) throws Exception {
tempPdf = new File(tempPdf,guid.substring(3, 6));
tempPdf.mkdirs();
tempPdf = new File(tempPdf,item.getGuid()+".pdf");
item.getPrintedImage().generate(printSettings); // Make sure PDF is generated or export can have issues
if(!item.getPrintedImage().isStored()) {
item.getPrintedImage().generate(printSettings); // Make sure PDF is generated or export can have issues
}
pdfExporter.exportItem(item, tempPdf);
pdfCache.put(item.getGuid(), tempPdf);
}
Expand Down

0 comments on commit 932b76f

Please sign in to comment.