diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java b/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java new file mode 100644 index 0000000000..fb96b5fb53 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Collection utilities + */ +public final class CollectionUtils +{ + /** Utility class shouldn't be instantiated directly */ + private CollectionUtils() + { + + } + + /** + * Returns a fixed-size list backed by the specified array. When array is null, returns an empty list + * @param array Array of elements + * @return List of elements + * @param Element type + */ + public static List nullSafeList(final T[] array) + { + if (array == null) + { + return new ArrayList<>(); + } + return Arrays.asList(array); + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java index 307201023b..c87b00a389 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.protocol.v1_0; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,6 +49,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.CollectionUtils; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -121,7 +121,7 @@ else if (command instanceof Discharge) { outcome = new Accepted(); } - else if (Arrays.asList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL)) + else if (CollectionUtils.nullSafeList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL)) { final Rejected rejected = new Rejected(); rejected.setError(error); diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java new file mode 100644 index 0000000000..b305824a39 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.model.Session; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; +import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare; +import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared; +import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; + +class TxnCoordinatorReceivingLinkEndpointTest +{ + private final AMQPConnection_1_0 _connection = mock(AMQPConnection_1_0.class); + private final Session_1_0 _session = mock(Session_1_0.class); + private final ServerTransaction serverTransaction = mock(ServerTransaction.class); + private final IdentifiedTransaction identifiedTransaction = mock(IdentifiedTransaction.class); + + private final AMQPDescribedTypeRegistry _amqpDescribedTypeRegistry = AMQPDescribedTypeRegistry.newInstance() + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer() + .registerExtensionSoleconnLayer(); + + @BeforeEach + void beforeEach() + { + when(identifiedTransaction.getId()).thenReturn(1); + when(identifiedTransaction.getServerTransaction()).thenReturn(serverTransaction); + + when(_connection.getDescribedTypeRegistry()).thenReturn(_amqpDescribedTypeRegistry); + when(_connection.createIdentifiedTransaction()).thenReturn(identifiedTransaction); + + doReturn(_connection).when(_session).getConnection(); + when(_session.getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD)) + .thenReturn(30_000L); + } + + @Test + void receiveDeliveryDischargeOutcomeAccepted() + { + final Binary declareDeliveryTag = new Binary(new byte[] { (byte) 0 }); + final QpidByteBuffer declareQpidByteBuffer = declareMessage(); + final Delivery declareDelivery = mock(Delivery.class); + when(declareDelivery.getDeliveryTag()).thenReturn(declareDeliveryTag); + when(declareDelivery.getPayload()).thenReturn(declareQpidByteBuffer); + + final Binary dischargeDeliveryTag = new Binary(new byte[] { (byte) 1 }); + final QpidByteBuffer dischargeQpidByteBuffer = dischargeMessage(); + final Delivery dischargeDelivery = mock(Delivery.class); + when(dischargeDelivery.getDeliveryTag()).thenReturn(dischargeDeliveryTag); + when(dischargeDelivery.getPayload()).thenReturn(dischargeQpidByteBuffer); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(null); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link)); + txnCoordinatorReceivingLinkEndpoint.start(); + + final Error declareError = txnCoordinatorReceivingLinkEndpoint.receiveDelivery(declareDelivery); + final Error dischargeError = txnCoordinatorReceivingLinkEndpoint.receiveDelivery(dischargeDelivery); + + verify(txnCoordinatorReceivingLinkEndpoint, times(1)) + .updateDispositions(anySet(), any(Declared.class), anyBoolean()); + verify(txnCoordinatorReceivingLinkEndpoint, times(1)) + .updateDispositions(anySet(), any(Accepted.class), anyBoolean()); + assertNull(declareError); + assertNull(dischargeError); + } + + @Test + void receiveDeliveryDischargeOutcomeNull() + { + final QpidByteBuffer qpidByteBuffer = dischargeMessage(); + final Delivery delivery = mock(Delivery.class); + when(delivery.getPayload()).thenReturn(qpidByteBuffer); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(null); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link)); + txnCoordinatorReceivingLinkEndpoint.start(); + + final Error error = txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery); + + verify(txnCoordinatorReceivingLinkEndpoint, times(0)) + .updateDispositions(anySet(), any(), anyBoolean()); + assertEquals("unknown-id", error.getCondition().toString()); + } + + @Test + void receiveDeliveryDischargeOutcomeRejected() + { + final QpidByteBuffer qpidByteBuffer = dischargeMessage(); + final Delivery delivery = mock(Delivery.class); + when(delivery.getPayload()).thenReturn(qpidByteBuffer); + when(delivery.getDeliveryTag()).thenReturn(new Binary("1".getBytes(StandardCharsets.UTF_8))); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(new Symbol[] { Rejected.REJECTED_SYMBOL }); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link)); + txnCoordinatorReceivingLinkEndpoint.start(); + + final Error error = txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery); + + verify(txnCoordinatorReceivingLinkEndpoint, times(1)) + .updateDispositions(anySet(), any(Rejected.class), anyBoolean()); + verify(link, times(1)).getSource(); + verify(source, times(1)).getOutcomes(); + + assertNull(error); + } + + @Test + void amqpValueSectionNotFound() + { + final QpidByteBuffer qpidByteBuffer = emptyMessage(); + final Delivery delivery = mock(Delivery.class); + when(delivery.getPayload()).thenReturn(qpidByteBuffer); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(new Symbol[] { Rejected.REJECTED_SYMBOL }); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + new TxnCoordinatorReceivingLinkEndpoint(_session, link); + txnCoordinatorReceivingLinkEndpoint.start(); + + final ConnectionScopedRuntimeException exception = assertThrows(ConnectionScopedRuntimeException.class, + () -> txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery)); + + assertEquals("Received no AmqpValue section", exception.getMessage()); + } + + @Test + void invalidMessage() + { + final QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocateDirect(1000); + final Delivery delivery = mock(Delivery.class); + when(delivery.getPayload()).thenReturn(qpidByteBuffer); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(new Symbol[] { Rejected.REJECTED_SYMBOL }); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + new TxnCoordinatorReceivingLinkEndpoint(_session, link); + txnCoordinatorReceivingLinkEndpoint.start(); + + final Error error = txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery); + + assertEquals("decode-error", error.getCondition().toString()); + } + + @Test + void unknownCommand() + { + final QpidByteBuffer qpidByteBuffer = coordinatorMessage(); + final Delivery delivery = mock(Delivery.class); + when(delivery.getPayload()).thenReturn(qpidByteBuffer); + + final Source source = mock(Source.class); + when(source.getOutcomes()).thenReturn(new Symbol[] { Rejected.REJECTED_SYMBOL }); + + final Link_1_0 link = mock(Link_1_0.class); + when(link.getSource()).thenReturn(source); + + final TxnCoordinatorReceivingLinkEndpoint txnCoordinatorReceivingLinkEndpoint = + new TxnCoordinatorReceivingLinkEndpoint(_session, link); + txnCoordinatorReceivingLinkEndpoint.start(); + + final ConnectionScopedRuntimeException exception = assertThrows(ConnectionScopedRuntimeException.class, + () -> txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery)); + + assertEquals("Received unknown command 'Coordinator'", exception.getMessage()); + } + + private HeaderSection header() + { + final Header header = new Header(); + header.setTtl(UnsignedInteger.valueOf(10000L)); + return header.createEncodingRetainingSection(); + } + + private DeliveryAnnotationsSection deliveryAnnotations() + { + final Map annotationMap = Map.of(Symbol.valueOf("foo"), "bar"); + final DeliveryAnnotations annotations = new DeliveryAnnotations(annotationMap); + return annotations.createEncodingRetainingSection(); + } + + private QpidByteBuffer declareMessage() + { + final List payloads = new ArrayList<>(); + try + { + add(payloads, header()); + add(payloads, deliveryAnnotations()); + add(payloads, new AmqpValue(new Declare()).createEncodingRetainingSection()); + return QpidByteBuffer.concatenate(payloads); + } + finally + { + payloads.forEach(QpidByteBuffer::dispose); + } + } + + private QpidByteBuffer dischargeMessage() + { + final List payloads = new ArrayList<>(); + try + { + add(payloads, header()); + add(payloads, deliveryAnnotations()); + final Discharge discharge = new Discharge(); + discharge.setTxnId(new Binary(new byte[] { (byte) 1 })); + add(payloads, new AmqpValue(discharge).createEncodingRetainingSection()); + return QpidByteBuffer.concatenate(payloads); + } + finally + { + payloads.forEach(QpidByteBuffer::dispose); + } + } + + private QpidByteBuffer emptyMessage() + { + final List payloads = new ArrayList<>(); + try + { + add(payloads, header()); + add(payloads, deliveryAnnotations()); + return QpidByteBuffer.concatenate(payloads); + } + finally + { + payloads.forEach(QpidByteBuffer::dispose); + } + } + + private QpidByteBuffer coordinatorMessage() + { + final List payloads = new ArrayList<>(); + try + { + add(payloads, header()); + add(payloads, deliveryAnnotations()); + add(payloads, new AmqpValue(new Coordinator()).createEncodingRetainingSection()); + return QpidByteBuffer.concatenate(payloads); + } + finally + { + payloads.forEach(QpidByteBuffer::dispose); + } + } + + private void add(final List payloads, final EncodingRetainingSection section) + { + try + { + payloads.add(section.getEncodedForm()); + } + finally + { + section.dispose(); + } + } +}