Skip to content

Commit

Permalink
Refactor: Use InputStream.transferTo and readAllBytes (#2669)
Browse files Browse the repository at this point in the history
Instead of custom code that accomplishes the same, sometimes more slowly.
Delete unused BytesOutputStream.
  • Loading branch information
dsmiley authored Sep 3, 2024
1 parent 18ec785 commit c6651a7
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 229 deletions.
15 changes: 6 additions & 9 deletions solr/core/src/java/org/apache/solr/cli/PostTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DeprecatedAttributes;
import org.apache.commons.cli.Option;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.solr.client.api.util.SolrVersion;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
Expand Down Expand Up @@ -1064,16 +1065,12 @@ public static InputStream stringToStream(String s) {
* source and thrown away.
*/
private static void pipe(InputStream source, OutputStream dest) throws IOException {
byte[] buf = new byte[1024];
int read = 0;
while ((read = source.read(buf)) >= 0) {
if (null != dest) {
dest.write(buf, 0, read);
}
}
if (null != dest) {
dest.flush();
if (dest == null) {
dest = NullOutputStream.INSTANCE;
}
// copy source to dest
source.transferTo(dest);
dest.flush();
}

public FileFilter getFileFilterFromFileTypes(String fileTypes) {
Expand Down
16 changes: 14 additions & 2 deletions solr/core/src/java/org/apache/solr/handler/BlobHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
Expand All @@ -34,6 +35,7 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -111,8 +113,8 @@ public void handleRequestBody(final SolrQueryRequest req, SolrQueryResponse rsp)

for (ContentStream stream : req.getContentStreams()) {
ByteBuffer payload;
try (InputStream is = stream.getStream()) {
payload = Utils.toByteArray(is, maxSize);
try (InputStream is = boundedInputStream(stream.getStream(), maxSize)) {
payload = Utils.toByteArray(is);
}
MessageDigest m = MessageDigest.getInstance("MD5");
m.update(payload.array(), payload.arrayOffset() + payload.position(), payload.limit());
Expand Down Expand Up @@ -261,6 +263,16 @@ public void write(OutputStream os) throws IOException {
}
}

private static InputStream boundedInputStream(final InputStream is, final long maxLength)
throws IOException {
return new BoundedInputStream(is, maxLength) {
@Override
protected void onMaxLength(long maxLength, long count) {
throw new BufferOverflowException();
}
};
}

private void verifyWithRealtimeGet(
String blobName, long version, SolrQueryRequest req, Map<String, Object> doc) {
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import static org.apache.solr.handler.loader.CSVLoaderBase.SEPARATOR;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -65,14 +63,11 @@ public class DefaultSampleDocumentsLoader implements SampleDocumentsLoader {
private static final int MAX_STREAM_SIZE = (5 * 1024 * 1024);
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static byte[] streamAsBytes(final InputStream in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int r;
try (in) {
while ((r = in.read(buf)) != -1) baos.write(buf, 0, r);
/** Reads all bytes from the stream. */
private static byte[] readAllBytes(ContentStream cs) throws IOException {
try (var is = cs.getStream()) {
return is.readAllBytes();
}
return baos.toByteArray();
}

@Override
Expand Down Expand Up @@ -101,7 +96,7 @@ public SampleDocuments parseDocsFromStream(
fileSource = stream.getSourceInfo() != null ? stream.getSourceInfo() : "file";
}

byte[] uploadedBytes = streamAsBytes(stream.getStream());
byte[] uploadedBytes = readAllBytes(stream);
// recheck the upload size in case the stream returned null for getSize
if (uploadedBytes.length > MAX_STREAM_SIZE) {
throw new SolrException(
Expand Down Expand Up @@ -208,8 +203,7 @@ protected List<SolrInputDocument> loadJsonDocs(
String charset = ContentStreamBase.getCharsetFromContentType(stream.getContentType());
String jsonStr =
new String(
streamAsBytes(stream.getStream()),
charset != null ? charset : ContentStreamBase.DEFAULT_CHARSET);
readAllBytes(stream), charset != null ? charset : ContentStreamBase.DEFAULT_CHARSET);
String[] lines = jsonStr.split("\n");
if (lines.length > 1) {
for (String line : lines) {
Expand Down Expand Up @@ -239,7 +233,7 @@ protected List<SolrInputDocument> loadJsonDocs(
protected List<SolrInputDocument> loadXmlDocs(
SolrParams params, ContentStreamBase.ByteArrayStream stream, final int maxDocsToLoad)
throws IOException {
String xmlString = readInputAsString(stream.getStream()).trim();
String xmlString = new String(readAllBytes(stream), StandardCharsets.UTF_8).trim();
List<SolrInputDocument> docs;
if (xmlString.contains("<add>") && xmlString.contains("<doc>")) {
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
Expand Down Expand Up @@ -320,10 +314,6 @@ protected List<Map<String, Object>> loadJsonLines(String[] lines) throws IOExcep
return docs;
}

protected String readInputAsString(InputStream in) throws IOException {
return new String(streamAsBytes(in), StandardCharsets.UTF_8);
}

protected char detectTSV(String csvStr) {
char sep = ',';
int endOfFirstLine = csvStr.indexOf('\n');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ public void updateFileContents(SolrQueryRequest req, SolrQueryResponse rsp)
"File '" + file + "' not found in configSet: " + configSet);
}

byte[] data =
DefaultSampleDocumentsLoader.streamAsBytes(
extractSingleContentStream(req, true).getStream());
byte[] data;
try (InputStream in = extractSingleContentStream(req, true).getStream()) {
data = in.readAllBytes();
}
Exception updateFileError = null;
boolean requestIsTrusted =
ConfigSetAPIBase.isTrusted(req.getUserPrincipal(), coreContainer.getAuthenticationPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.IOSupplier;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
Expand Down Expand Up @@ -542,12 +543,12 @@ List<SolrInputDocument> getStoredSampleDocs(final String configSet) throws IOExc
((CloudLegacySolrClient) cloudClient()).getHttpClient().execute(httpGet);
int statusCode = entity.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
byte[] bytes = DefaultSampleDocumentsLoader.streamAsBytes(entity.getEntity().getContent());
byte[] bytes = readAllBytes(() -> entity.getEntity().getContent());
if (bytes.length > 0) {
docs = (List<SolrInputDocument>) Utils.fromJavabin(bytes);
}
} else if (statusCode != HttpStatus.SC_NOT_FOUND) {
byte[] bytes = DefaultSampleDocumentsLoader.streamAsBytes(entity.getEntity().getContent());
byte[] bytes = readAllBytes(() -> entity.getEntity().getContent());
throw new IOException(
"Failed to lookup stored docs for "
+ configSet
Expand All @@ -562,10 +563,14 @@ List<SolrInputDocument> getStoredSampleDocs(final String configSet) throws IOExc

void storeSampleDocs(final String configSet, List<SolrInputDocument> docs) throws IOException {
docs.forEach(d -> d.removeField(VERSION_FIELD)); // remove _version_ field before storing ...
postDataToBlobStore(
cloudClient(),
configSet + "_sample",
DefaultSampleDocumentsLoader.streamAsBytes(toJavabin(docs)));
postDataToBlobStore(cloudClient(), configSet + "_sample", readAllBytes(() -> toJavabin(docs)));
}

/** Gets the stream, reads all the bytes, closes the stream. */
static byte[] readAllBytes(IOSupplier<InputStream> hasStream) throws IOException {
try (InputStream in = hasStream.get()) {
return in.readAllBytes();
}
}

protected void postDataToBlobStore(CloudSolrClient cloudClient, String blobName, byte[] bytes)
Expand Down
28 changes: 4 additions & 24 deletions solr/core/src/test/org/apache/solr/cloud/TestConfigSetsAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -1759,14 +1759,7 @@ private static void zipWithForbiddenEndings(File fileOrDirectory, File zipfile)
zout.putNextEntry(new ZipEntry("test." + fileType));

try (InputStream in = new FileInputStream(fileOrDirectory)) {
byte[] buffer = new byte[1024];
while (true) {
int readCount = in.read(buffer);
if (readCount < 0) {
break;
}
zout.write(buffer, 0, readCount);
}
in.transferTo(zout);
}

zout.closeEntry();
Expand All @@ -1788,8 +1781,7 @@ private static void zip(File directory, File zipfile) throws IOException {
Deque<File> queue = new ArrayDeque<>();
queue.push(directory);
OutputStream out = new FileOutputStream(zipfile);
ZipOutputStream zout = new ZipOutputStream(out);
try {
try (ZipOutputStream zout = new ZipOutputStream(out)) {
while (!queue.isEmpty()) {
directory = queue.pop();
for (File kid : directory.listFiles()) {
Expand All @@ -1801,26 +1793,14 @@ private static void zip(File directory, File zipfile) throws IOException {
} else {
zout.putNextEntry(new ZipEntry(name));

InputStream in = new FileInputStream(kid);
try {
byte[] buffer = new byte[1024];
while (true) {
int readCount = in.read(buffer);
if (readCount < 0) {
break;
}
zout.write(buffer, 0, readCount);
}
} finally {
in.close();
try (InputStream in = new FileInputStream(kid)) {
in.transferTo(zout);
}

zout.closeEntry();
}
}
}
} finally {
zout.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void testPersistSampleDocs() throws Exception {
helper.postDataToBlobStore(
cluster.getSolrClient(),
configSet + "_sample",
DefaultSampleDocumentsLoader.streamAsBytes(toJavabin(Collections.singletonList(doc))));
SchemaDesignerConfigSetHelper.readAllBytes(() -> toJavabin(List.of(doc))));

List<SolrInputDocument> docs = helper.getStoredSampleDocs(configSet);
assertTrue(docs != null && docs.size() == 1);
Expand Down
13 changes: 2 additions & 11 deletions solr/core/src/test/org/apache/solr/util/TestCborDataFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,13 @@ private byte[] runQuery(String testCollection, CloudSolrClient client, String wt
request.setResponseParser(new InputStreamResponseParser(wt));
}
result = client.request(request, testCollection);
byte[] b = copyStream((InputStream) result.get("stream"));
InputStream inputStream = (InputStream) result.get("stream");
byte[] b = inputStream.readAllBytes();
System.out.println(wt + "_time : " + timer.getTime());
System.out.println(wt + "_size : " + b.length);
return b;
}

private static byte[] copyStream(InputStream inputStream) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
return outputStream.toByteArray();
}

private void modifySchema(String testCollection, CloudSolrClient client)
throws SolrServerException, IOException {
GenericSolrRequest req =
Expand Down
Loading

0 comments on commit c6651a7

Please sign in to comment.