diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java index afb112c95..73b343d73 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.jms.message; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -25,14 +23,16 @@ import javax.jms.JMSException; import javax.jms.MessageFormatException; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade; @SuppressWarnings("unchecked") public class JmsBytesMessage extends JmsMessage implements BytesMessage { - protected transient DataOutputStream dataOut; - protected transient DataInputStream dataIn; + protected transient ByteBufOutputStream dataOut; + protected transient ByteBufInputStream dataIn; private final JmsBytesMessageFacade facade; @@ -422,14 +422,14 @@ protected T doGetBody(Class asType) throws JMSException { private void initializeWriting() throws JMSException { checkReadOnlyBody(); if (this.dataOut == null) { - this.dataOut = new DataOutputStream(this.facade.getOutputStream()); + this.dataOut = this.facade.getOutputStream(); } } private void initializeReading() throws JMSException { checkWriteOnlyBody(); if (dataIn == null) { - dataIn = new DataInputStream(this.facade.getInputStream()); + dataIn = this.facade.getInputStream(); } } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java index 73117b963..92f34bc72 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java @@ -16,8 +16,8 @@ */ package org.apache.qpid.jms.message.facade; -import java.io.InputStream; -import java.io.OutputStream; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import javax.jms.IllegalStateException; import javax.jms.JMSException; @@ -58,7 +58,7 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade { * @throws JMSException if an error occurs creating the stream. * @throws IllegalStateException if there is a current OutputStream in use. */ - InputStream getInputStream() throws JMSException; + ByteBufInputStream getInputStream() throws JMSException; /** * Create and return a new OuputStream used to populate the body of the message. If an @@ -73,7 +73,7 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade { * @throws JMSException if an error occurs creating the stream. * @throws IllegalStateException if there is a current OutputStream in use. */ - OutputStream getOutputStream() throws JMSException; + ByteBufOutputStream getOutputStream() throws JMSException; /** * Reset the message state such that a call to getInputStream or getOutputStream diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java index b84d49271..b3f0281ca 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java @@ -20,8 +20,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import javax.jms.IllegalStateException; import javax.jms.JMSException; @@ -100,7 +98,7 @@ public void clearBody() { } @Override - public InputStream getInputStream() throws JMSException { + public ByteBufInputStream getInputStream() throws JMSException { if (bytesOut != null) { throw new IllegalStateException("Body is being written to, cannot perform a read."); } @@ -116,7 +114,7 @@ public InputStream getInputStream() throws JMSException { } @Override - public OutputStream getOutputStream() throws JMSException { + public ByteBufOutputStream getOutputStream() throws JMSException { if (bytesIn != null) { throw new IllegalStateException("Body is being read from, cannot perform a write."); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java index d56c533c7..c322d8790 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java @@ -24,8 +24,6 @@ import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.lang.reflect.Array; import java.util.Arrays; import java.util.List; @@ -38,6 +36,8 @@ import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade; import org.apache.qpid.jms.message.facade.test.JmsTestBytesMessageFacade; import org.apache.qpid.jms.message.facade.test.JmsTestMessageFactory; @@ -699,7 +699,7 @@ public void testWriteOnlyBody() throws JMSException { @Test public void testReadMethodsCaptureEOFExceptionThrowsMessageEOFEx() throws Exception { JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class); - InputStream bytesIn = Mockito.mock(InputStream.class); + ByteBufInputStream bytesIn = Mockito.mock(ByteBufInputStream.class); Mockito.when(facade.getInputStream()).thenReturn(bytesIn); Mockito.when(bytesIn.read()).thenThrow(new EOFException()); @@ -791,7 +791,7 @@ public void testReadMethodsCaptureEOFExceptionThrowsMessageEOFEx() throws Except @Test public void testReadMethodsCaptureIOExceptionThrowsJMSEx() throws Exception { JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class); - InputStream bytesIn = Mockito.mock(InputStream.class); + ByteBufInputStream bytesIn = Mockito.mock(ByteBufInputStream.class); Mockito.when(facade.getInputStream()).thenReturn(bytesIn); Mockito.when(bytesIn.read()).thenThrow(new IOException()); @@ -883,7 +883,7 @@ public void testReadMethodsCaptureIOExceptionThrowsJMSEx() throws Exception { @Test public void testWriteMethodsCaptureIOExceptionThrowsJMSEx() throws Exception { JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class); - OutputStream bytesOut = Mockito.mock(OutputStream.class); + ByteBufOutputStream bytesOut = Mockito.mock(ByteBufOutputStream.class); Mockito.when(facade.getOutputStream()).thenReturn(bytesOut); Mockito.doThrow(new IOException()).when(bytesOut).write(Mockito.anyByte()); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestBytesMessageFacade.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestBytesMessageFacade.java index 6cfda495a..edbecf0ba 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestBytesMessageFacade.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/test/JmsTestBytesMessageFacade.java @@ -84,7 +84,7 @@ public void clearBody() { } @Override - public InputStream getInputStream() throws JMSException { + public ByteBufInputStream getInputStream() throws JMSException { if (bytesOut != null) { throw new IllegalStateException("Body is being written to, cannot perform a read."); } @@ -98,7 +98,7 @@ public InputStream getInputStream() throws JMSException { } @Override - public OutputStream getOutputStream() throws JMSException { + public ByteBufOutputStream getOutputStream() throws JMSException { if (bytesIn != null) { throw new IllegalStateException("Body is being read from, cannot perform a write."); }