Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
*/
package org.apache.qpid.jms.message;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;

import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;

@SuppressWarnings("unchecked")
public class JmsBytesMessage extends JmsMessage implements BytesMessage {

protected transient DataOutputStream dataOut;
protected transient DataInputStream dataIn;
protected transient ByteBufOutputStream dataOut;
protected transient ByteBufInputStream dataIn;

private final JmsBytesMessageFacade facade;

Expand Down Expand Up @@ -422,14 +422,14 @@ protected <T> T doGetBody(Class<T> asType) throws JMSException {
private void initializeWriting() throws JMSException {
checkReadOnlyBody();
if (this.dataOut == null) {
this.dataOut = new DataOutputStream(this.facade.getOutputStream());
this.dataOut = this.facade.getOutputStream();
}
}

private void initializeReading() throws JMSException {
checkWriteOnlyBody();
if (dataIn == null) {
dataIn = new DataInputStream(this.facade.getInputStream());
dataIn = this.facade.getInputStream();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.qpid.jms.message.facade;

import java.io.InputStream;
import java.io.OutputStream;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;

import javax.jms.IllegalStateException;
import javax.jms.JMSException;
Expand Down Expand Up @@ -58,7 +58,7 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade {
* @throws JMSException if an error occurs creating the stream.
* @throws IllegalStateException if there is a current OutputStream in use.
*/
InputStream getInputStream() throws JMSException;
ByteBufInputStream getInputStream() throws JMSException;

/**
* Create and return a new OuputStream used to populate the body of the message. If an
Expand All @@ -73,7 +73,7 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade {
* @throws JMSException if an error occurs creating the stream.
* @throws IllegalStateException if there is a current OutputStream in use.
*/
OutputStream getOutputStream() throws JMSException;
ByteBufOutputStream getOutputStream() throws JMSException;

/**
* Reset the message state such that a call to getInputStream or getOutputStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.jms.IllegalStateException;
import javax.jms.JMSException;
Expand Down Expand Up @@ -100,7 +98,7 @@ public void clearBody() {
}

@Override
public InputStream getInputStream() throws JMSException {
public ByteBufInputStream getInputStream() throws JMSException {
if (bytesOut != null) {
throw new IllegalStateException("Body is being written to, cannot perform a read.");
}
Expand All @@ -116,7 +114,7 @@ public InputStream getInputStream() throws JMSException {
}

@Override
public OutputStream getOutputStream() throws JMSException {
public ByteBufOutputStream getOutputStream() throws JMSException {
if (bytesIn != null) {
throw new IllegalStateException("Body is being read from, cannot perform a write.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
Expand All @@ -38,6 +36,8 @@
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;

import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
import org.apache.qpid.jms.message.facade.test.JmsTestBytesMessageFacade;
import org.apache.qpid.jms.message.facade.test.JmsTestMessageFactory;
Expand Down Expand Up @@ -699,7 +699,7 @@ public void testWriteOnlyBody() throws JMSException {
@Test
public void testReadMethodsCaptureEOFExceptionThrowsMessageEOFEx() throws Exception {
JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class);
InputStream bytesIn = Mockito.mock(InputStream.class);
ByteBufInputStream bytesIn = Mockito.mock(ByteBufInputStream.class);
Mockito.when(facade.getInputStream()).thenReturn(bytesIn);

Mockito.when(bytesIn.read()).thenThrow(new EOFException());
Expand Down Expand Up @@ -791,7 +791,7 @@ public void testReadMethodsCaptureEOFExceptionThrowsMessageEOFEx() throws Except
@Test
public void testReadMethodsCaptureIOExceptionThrowsJMSEx() throws Exception {
JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class);
InputStream bytesIn = Mockito.mock(InputStream.class);
ByteBufInputStream bytesIn = Mockito.mock(ByteBufInputStream.class);
Mockito.when(facade.getInputStream()).thenReturn(bytesIn);

Mockito.when(bytesIn.read()).thenThrow(new IOException());
Expand Down Expand Up @@ -883,7 +883,7 @@ public void testReadMethodsCaptureIOExceptionThrowsJMSEx() throws Exception {
@Test
public void testWriteMethodsCaptureIOExceptionThrowsJMSEx() throws Exception {
JmsBytesMessageFacade facade = Mockito.mock(JmsBytesMessageFacade.class);
OutputStream bytesOut = Mockito.mock(OutputStream.class);
ByteBufOutputStream bytesOut = Mockito.mock(ByteBufOutputStream.class);
Mockito.when(facade.getOutputStream()).thenReturn(bytesOut);

Mockito.doThrow(new IOException()).when(bytesOut).write(Mockito.anyByte());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void clearBody() {
}

@Override
public InputStream getInputStream() throws JMSException {
public ByteBufInputStream getInputStream() throws JMSException {
if (bytesOut != null) {
throw new IllegalStateException("Body is being written to, cannot perform a read.");
}
Expand All @@ -98,7 +98,7 @@ public InputStream getInputStream() throws JMSException {
}

@Override
public OutputStream getOutputStream() throws JMSException {
public ByteBufOutputStream getOutputStream() throws JMSException {
if (bytesIn != null) {
throw new IllegalStateException("Body is being read from, cannot perform a write.");
}
Expand Down