diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000..2ab6bda649 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +# Helps IDEA users apply some of the formatting rules enforced by checkstyle + +root = true + +[*.java] +indent_size = 4 +max_line_length = 120 +ij_java_method_brace_style = next_line +ij_java_block_brace_style = next_line +ij_java_else_on_new_line = true +ij_java_class_brace_style = next_line +ij_java_space_after_type_cast = false +ij_any_catch_on_new_line = true +ij_any_spaces_around_equality_operators = true +ij_java_continuation_indent_size = 4 \ No newline at end of file diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java index aa952aba01..e3158aa3d3 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonDecoderImpl.java @@ -31,6 +31,7 @@ public abstract class CommonDecoderImpl protected int invalidTagId = Decoder.NO_ERROR; protected int rejectReason = Decoder.NO_ERROR; protected AsciiBuffer buffer; + protected UnknownTagVisitor unknownTagVisitor; public int invalidTagId() { @@ -42,6 +43,11 @@ public int rejectReason() return rejectReason; } + public void setUnknownTagVisitor(final UnknownTagVisitor unknownTagVisitor) + { + this.unknownTagVisitor = unknownTagVisitor; + } + public int getInt( final AsciiBuffer buffer, final int startInclusive, final int endExclusive, final int tag, final boolean validation) diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java new file mode 100644 index 0000000000..e1fbf674c0 --- /dev/null +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/CommonEncoderImpl.java @@ -0,0 +1,95 @@ +package uk.co.real_logic.artio.builder; + +import uk.co.real_logic.artio.util.MutableAsciiBuffer; + +/** + * Class provides common implementation methods used by encoders. + */ +public class CommonEncoderImpl +{ + protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[128]); + + public void setCustomTagsBuffer(final MutableAsciiBuffer customTagsBuffer) + { + this.customTagsBuffer = customTagsBuffer; + } + + protected int customTagsLength = 0; + + private int putTagHeader(final int tag) + { + int pos = customTagsLength; + pos += customTagsBuffer.putIntAscii(pos, tag); + customTagsBuffer.putByte(pos++, (byte)'='); + return pos; + } + + public CommonEncoderImpl customTag(final int tag, final boolean value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putCharAscii(pos, value ? 'Y' : 'N'); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final int value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putIntAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final char value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putCharAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final long value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putLongAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTag(final int tag, final double value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putAscii(pos, String.valueOf(value)); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTagAscii(final int tag, final CharSequence value) + { + int pos = putTagHeader(tag); + pos += customTagsBuffer.putStringWithoutLengthAscii(pos, value); + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public CommonEncoderImpl customTagAscii(final int tag, final byte[] value) + { + int pos = putTagHeader(tag); + customTagsBuffer.putBytes(pos, value); + pos += value.length; + customTagsBuffer.putSeparator(pos++); + customTagsLength = pos; + return this; + } + + public void resetCustomTags() + { + customTagsLength = 0; + } +} diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java index fa5111661b..f822c3a38f 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/Decoder.java @@ -91,4 +91,11 @@ public interface Decoder extends CharAppender * @return the encoder passed as an argument. */ Encoder toEncoder(Encoder encoder); + + /** + * Sets a visitor to be called when an unknown tag is encountered while decoding a message. + * + * @param visitor the visitor to invoke + */ + void setUnknownTagVisitor(UnknownTagVisitor visitor); } diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java new file mode 100644 index 0000000000..5503d50b37 --- /dev/null +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/builder/UnknownTagVisitor.java @@ -0,0 +1,26 @@ +package uk.co.real_logic.artio.builder; + +import uk.co.real_logic.artio.util.AsciiBuffer; + +/** + * Interface for visiting unknown tags. + * + * @see CommonDecoderImpl#setUnknownTagVisitor(UnknownTagVisitor) + */ +public interface UnknownTagVisitor +{ + /** + * Called when an unknown tag is encountered while decoding a message. + * + * @param tag The tag number of the unknown tag + * @param buffer The buffer containing the unknown tag + * @param offset The offset at which the unknown tag starts + * @param length The length of the unknown tag + */ + void onUnknownTag( + int tag, + AsciiBuffer buffer, + int offset, + int length + ); +} diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java index 54fc32a812..f88d54db61 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/DecoderGenerator.java @@ -1691,24 +1691,28 @@ private String decodeMethod(final List entries, final Aggregate aggregate final String suffix = " default:\n" + + " boolean isTrailer = " + unknownFieldPredicate(type) + ";\n" + " if (!" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED + ")\n" + " {\n" + (isGroup ? " seenFields.remove(tag);\n" : " alreadyVisitedFields.remove(tag);\n") + + " if (unknownTagVisitor != null && !isTrailer)\n" + + " {\n" + + " unknownTagVisitor.onUnknownTag(tag, buffer, valueOffset, valueLength);\n" + + " }\n" + " }\n" + (isGroup ? "" : " else\n" + " {\n" + - " if (!" + unknownFieldPredicate(type) + ")\n" + + " if (!isTrailer)\n" + " {\n" + " unknownFields.add(tag);\n" + " }\n" + " }\n") + - // Skip the thing if it's a completely unknown field and you aren't validating messages " if (" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED + - " || " + unknownFieldPredicate(type) + ")\n" + + " || isTrailer)\n" + " {\n" + decodeTrailerOrReturn(hasCommonCompounds, 5) + " }\n" + diff --git a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java index e4cbefa608..1d88926e0c 100644 --- a/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java +++ b/artio-codecs/src/main/java/uk/co/real_logic/artio/dictionary/generation/EncoderGenerator.java @@ -20,6 +20,7 @@ import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.generation.OutputManager; +import uk.co.real_logic.artio.builder.CommonEncoderImpl; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.builder.FieldBagEncoder; import uk.co.real_logic.artio.builder.SessionHeaderEncoder; @@ -210,6 +211,7 @@ protected void generateAggregateFile(final Aggregate aggregate, final AggregateT MutableDirectBuffer.class, UnsafeBuffer.class, AsciiSequenceView.class, + CommonEncoderImpl.class, FieldBagEncoder.class); generateAggregateClass(aggregate, aggregateType, className, out); }); @@ -342,7 +344,7 @@ private String classDeclaration( } else { - extendsClause = ""; + extendsClause = " extends CommonEncoderImpl"; } return String.format( "\n" + @@ -368,6 +370,9 @@ private String completeResetMethod( case HEADER: additionalReset = " beginStringAsCopy(DEFAULT_BEGIN_STRING, 0, DEFAULT_BEGIN_STRING.length);\n"; break; + case MESSAGE: + additionalReset = " resetCustomTags();\n"; + break; default: additionalReset = ""; } @@ -994,6 +999,13 @@ private String encodeMethod(final List entries, final AggregateType aggre if (aggregateType == AggregateType.MESSAGE) { suffix = + "\n" + + " if (customTagsLength > 0)\n" + + " {\n" + + " buffer.putBytes(position, customTagsBuffer, 0, customTagsLength);\n" + + " position += customTagsLength;\n" + + " }\n" + + "\n" + " position += trailer.startTrailer(buffer, position);\n" + "\n" + " final int messageStart = header.finishHeader(buffer, bodyStart, position - bodyStart);\n" + diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java index 87d592f6c3..fb5e26c9ae 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/ExampleDictionary.java @@ -244,6 +244,10 @@ public final class ExampleDictionary "8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=A\001127=19700101-00:00:00.001" + "\00110=043\001"; + public static final String UNKNOWN_TAG_MESSAGE = + "8=FIX.4.4\0019=53\00135=0\001115=abc\00110100=FOO\001116=2\001117=A\00110101=BAR" + + "\001127=19700101-00:00:00.001\00110=043\001"; + public static final String OUT_OF_RANGE_FLOAT_VALUE_MESSAGE = "8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=10000000000000000000000\001" + "127=19700101-00:00:00.001\00110=043\001"; @@ -423,6 +427,13 @@ public final class ExampleDictionary "8=FIX.4.4\0019=91\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\00110=176\001"; + public static final String WITH_CUSTOM_TAGS = + "8=FIX.4.4\0019=110\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20" + + "\00110100=42" + + "\00110101=foo" + + "\00110=227\001"; + public static final String NESTED_COMPONENT_MESSAGE = "8=FIX.4.4\0019=120\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" + "\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\001141=180\001142=2\001143=99\001143=100\001" + diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java index 703064071f..396a0f66c3 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/DecoderGeneratorFlyweightTest.java @@ -20,6 +20,8 @@ import org.junit.Test; import uk.co.real_logic.artio.builder.Decoder; +import java.util.HashMap; + import static org.junit.Assert.assertEquals; import static uk.co.real_logic.artio.dictionary.ExampleDictionary.*; import static uk.co.real_logic.artio.util.CustomMatchers.assertTargetThrows; @@ -65,4 +67,15 @@ public void shouldNotThrowWhenAccessingUnsetString() throws Exception assertTargetThrows(() -> getAsciiSequenceView(decoder, "testReqID"), IllegalArgumentException.class, "No value for optional field: TestReqID"); } + + @Test + public void shouldVisitUnknownTags() throws Exception + { + final Decoder decoder = newHeartbeat(); + final HashMap map = new HashMap<>(); + decoder.setUnknownTagVisitor((tag, buffer, offset, length) -> map.put(tag, buffer.getAscii(offset, length))); + decode(UNKNOWN_TAG_MESSAGE, decoder); + assertEquals("FOO", map.get(10100)); + assertEquals("BAR", map.get(10101)); + } } diff --git a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java index 7312cac69e..0e4ea88e1b 100644 --- a/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java +++ b/artio-codecs/src/test/java/uk/co/real_logic/artio/dictionary/generation/EncoderGeneratorTest.java @@ -23,6 +23,7 @@ import org.junit.BeforeClass; import org.junit.Test; import uk.co.real_logic.artio.EncodingException; +import uk.co.real_logic.artio.builder.CommonEncoderImpl; import uk.co.real_logic.artio.builder.Encoder; import uk.co.real_logic.artio.builder.FieldBagEncoder; import uk.co.real_logic.artio.fields.DecimalFloat; @@ -737,6 +738,19 @@ public void shouldValidateMissingRequiredFloatFields() throws Exception encoder.encode(buffer, 1); } + @Test + public void shouldSetCustomTags() throws Exception + { + final Encoder encoder = newHeartbeat(); + setRequiredFields(encoder); + setupComponent(encoder); + + ((CommonEncoderImpl)encoder).customTag(10100, 42); + ((CommonEncoderImpl)encoder).customTagAscii(10101, "foo"); + + assertEncodesTo(encoder, WITH_CUSTOM_TAGS); + } + @Test(expected = EncodingException.class) public void shouldValidateMissingRequiredIntFields() throws Exception { diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java index be3b383cab..99897327ca 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestController.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.session; import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; +import uk.co.real_logic.artio.util.AsciiBuffer; /** * Customer interface to control whether resend requests are responded to or not. @@ -33,11 +34,16 @@ public interface ResendRequestController * (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number) * then this callback won't be invoked. * + * SessionProxy is now also notified immediately after this method is called, with additional parameters that + * allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by + * ResendRequestController. + * * @param session the session that has received the resend request. * @param resendRequest the decoded resend request in question. * @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the * resend request uses 0 for its endSeqNo parameter. * @param response respond to the resend request by calling methods on this object. + * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int) */ void onResend( Session session, diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java index d715af54f2..5f05e62287 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/ResendRequestResponse.java @@ -16,10 +16,12 @@ package uk.co.real_logic.artio.session; import uk.co.real_logic.artio.builder.AbstractRejectEncoder; +import uk.co.real_logic.artio.util.AsciiBuffer; public class ResendRequestResponse { - private boolean result; + private boolean resendNow; + private boolean delayProcessing; private int refTagId; private AbstractRejectEncoder rejectEncoder; @@ -29,7 +31,8 @@ public class ResendRequestResponse */ public void resend() { - result = true; + resendNow = true; + delayProcessing = false; } /** @@ -41,14 +44,16 @@ public void reject(final int refTagId) { this.refTagId = refTagId; - result = false; + resendNow = false; + delayProcessing = false; } public void reject(final AbstractRejectEncoder rejectEncoder) { this.rejectEncoder = rejectEncoder; - result = false; + resendNow = false; + delayProcessing = false; } AbstractRejectEncoder rejectEncoder() @@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder() boolean result() { - return result; + return resendNow; } int refTagId() { return refTagId; } + + /** + * Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates + * that the request must not be processed nor rejected. It is the responsibility of the caller to call + * Session.executeResendRequest() when ready. + * + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + * @return true if response to the request must not be done immediately + */ + public boolean shouldDelay() + { + return delayProcessing; + } + + /** + * This method indicates that the request must not be processed nor rejected. It is the responsibility of + * the caller to call Session.executeResendRequest() when ready. + * + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + */ + public void delay() + { + resendNow = false; + delayProcessing = true; + } } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java index 4378fd117b..106d57bdef 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/Session.java @@ -2097,50 +2097,99 @@ Action onResendRequest( final ResendRequestResponse resendRequestResponse = this.resendRequestResponse; if (!backpressuredResendRequestResponse) { + // historic behavior resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse); + + // also invoke the proxy + if (Pressure.isBackPressured(proxy.onResend(this, resendRequest, + correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength))) + { + return ABORT; + } } if (resendRequestResponse.result()) { - final long correlationId = generateReplayCorrelationId(); - - // Notify the sender end point that a replay is going to happen. - if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest) + return executeResendRequest( + beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength + ); + } + else if (!resendRequestResponse.shouldDelay()) + { + final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder(); + if (rejectEncoder != null) { - if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, - correlationId, outboundPublication)) - { - lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); - backpressuredResendRequestResponse = true; - backpressuredOutboundValidResendRequest = true; - return ABORT; - } - - backpressuredOutboundValidResendRequest = false; + return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder); } + return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum); + } + else + { + return CONTINUE; + } + } + + private Action executeResendRequest( + final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + final long correlationId = generateReplayCorrelationId(); + + // Notify the sender end point that a replay is going to happen. + if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest) + { if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, - correlationId, inboundPublication)) + correlationId, outboundPublication)) { - lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); + if (lastReceivedMsgSeqNum >= 0) + { + lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); + } backpressuredResendRequestResponse = true; + backpressuredOutboundValidResendRequest = true; return ABORT; } - backpressuredResendRequestResponse = false; - replaysInFlight++; - return CONTINUE; + backpressuredOutboundValidResendRequest = false; } - else + + if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, + correlationId, inboundPublication)) { - final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder(); - if (rejectEncoder != null) + if (lastReceivedMsgSeqNum >= 0) { - return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder); + lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum); } - - return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum); + backpressuredResendRequestResponse = true; + return ABORT; } + + backpressuredResendRequestResponse = false; + replaysInFlight++; + return CONTINUE; + } + + + /** + * Executes a resend request. Used to be done immediately when receiving such a request, but + * it is now possible to delay the execution, so this method must be called when ready. + * + * @param beginSeqNum begin sequence number found in received ResendRequest + * @param correctedEndSeqNo corrected end sequence number + * @param messageBuffer buffer containing the ResendRequest message + * @param messageOffset offset of message in buffer + * @param messageLength length of message in buffer + * @return an Action: be sure to handle back pressure! + * @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int) + */ + public Action executeResendRequest( + final int beginSeqNum, final int correctedEndSeqNo, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength); } private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java index 66ac31e3f1..f629712ef0 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/session/SessionProxy.java @@ -15,10 +15,12 @@ */ package uk.co.real_logic.artio.session; +import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; import uk.co.real_logic.artio.dictionary.FixDictionary; import uk.co.real_logic.artio.fields.RejectReason; import uk.co.real_logic.artio.messages.CancelOnDisconnectOption; import uk.co.real_logic.artio.messages.DisconnectReason; +import uk.co.real_logic.artio.util.AsciiBuffer; /** * A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can @@ -116,4 +118,34 @@ long sendSequenceReset( * @return true if asynchronous, false otherwise. */ boolean isAsync(); + + /** + * Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer + * containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the + * Resend request. + * + * @param session the session that has received the resend request. + * @param resendRequest the decoded resend request in question. + * @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the + * resend request uses 0 for its endSeqNo parameter. + * @param response respond to the resend request by calling methods on this object. + * @param messageBuffer buffer containing the ResendRequest message + * @param messageOffset offset of message in buffer + * @param messageLength length of message in buffer + * @return a null or negative number if back pressured + * @see Session#executeResendRequest(int, int, AsciiBuffer, int, int) + * @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse) + */ + default long onResend( + Session session, + AbstractResendRequestDecoder resendRequest, + int correctedEndSeqNo, + ResendRequestResponse response, + AsciiBuffer messageBuffer, + int messageOffset, + int messageLength + ) + { + return 1; + } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java new file mode 100644 index 0000000000..c6faac9826 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java @@ -0,0 +1,332 @@ +package uk.co.real_logic.artio.system_tests; + +import org.agrona.ErrorHandler; +import org.agrona.concurrent.EpochNanoClock; +import org.junit.Ignore; +import org.junit.Test; +import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; +import uk.co.real_logic.artio.engine.EngineConfiguration; +import uk.co.real_logic.artio.engine.FixEngine; +import uk.co.real_logic.artio.fields.EpochFractionFormat; +import uk.co.real_logic.artio.library.LibraryConfiguration; +import uk.co.real_logic.artio.protocol.GatewayPublication; +import uk.co.real_logic.artio.session.DirectSessionProxy; +import uk.co.real_logic.artio.session.ResendRequestResponse; +import uk.co.real_logic.artio.session.Session; +import uk.co.real_logic.artio.session.SessionCustomisationStrategy; +import uk.co.real_logic.artio.session.SessionIdStrategy; +import uk.co.real_logic.artio.session.SessionProxy; +import uk.co.real_logic.artio.util.AsciiBuffer; +import uk.co.real_logic.artio.util.DebugFIXClient; +import uk.co.real_logic.artio.util.DebugServer; +import uk.co.real_logic.artio.util.MutableAsciiBuffer; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; +import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; + +/** + * Reproduce race (issue #503) while sending ResendRequest and ResetSequence when both + * parties detect a gap on Logon. + *

+ * Also reproduces the fact that SessionProxy is not invoked when a ResetSequence message is sent during replay. + */ +public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest +{ + private boolean useProxy; + private boolean sendResendRequestCalled; + private boolean sendSequenceResetCalled; + + /** + * When positive, simulate a SessionProxy that sends outbound FIX messages asynchronously, + * through an external cluster. + */ + private long sleepBeforeSendResendRequest; + + private final ArrayList autoClose = new ArrayList<>(); + private DebugServer initialAcceptor; + + private void launch() throws IOException + { + mediaDriver = launchMediaDriver(); + launchInitialAcceptor(); + launchInitiating(); + testSystem = new TestSystem(initiatingLibrary); + } + + private void launchInitiating() + { + final EngineConfiguration initiatingConfig = initiatingConfig(libraryAeronPort, nanoClock) + .deleteLogFileDirOnStart(true) + .initialAcceptedSessionOwner(SOLE_LIBRARY); + initiatingEngine = FixEngine.launch(initiatingConfig); + final LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock); + if (useProxy) + { + lib.sessionProxyFactory(this::sessionProxyFactory); + } + initiatingLibrary = connect(lib); + } + + static class PendingResendRequest + { + final Session session; + final MutableAsciiBuffer message; + final int beginSeqNo; + final int endSeqNo; + + PendingResendRequest( + final Session session, final int beginSeqNo, final int endSeqNo, final MutableAsciiBuffer message + ) + { + this.session = session; + this.beginSeqNo = beginSeqNo; + this.endSeqNo = endSeqNo; + this.message = message; + } + + public void execute() + { + System.err.println("Execute resend request"); + session.executeResendRequest(beginSeqNo, endSeqNo, message, 0, message.capacity()); + } + } + + private void launchInitialAcceptor() throws IOException + { + initialAcceptor = new DebugServer(port); + initialAcceptor.setWaitForData(true); + initialAcceptor.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|", + "8=FIX.4.4|9=94|35=1|49=acceptor|56=initiator|34=2|52=***|112=hello|98=0|108=10|141=N|10=024|" + ); + initialAcceptor.start(); + } + + /** + * Sanity check that we can connect Artio to a debug server with canned messages. + */ + @Test + public void testDebugServer() throws IOException + { + final DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|" + ); + srv.start(); + + mediaDriver = launchMediaDriver(); + launchInitiating(); + testSystem = new TestSystem(initiatingLibrary); + connectAndAcquire(); + } + + class Proxy extends DirectSessionProxy + { + /** + * Stores details of received ResendRequest while we wait for ours to be sent. + */ + private PendingResendRequest pendingResendRequest; + + Proxy( + final int sessionBufferSize, final GatewayPublication gatewayPublication, + final SessionIdStrategy sessionIdStrategy, final SessionCustomisationStrategy customisationStrategy, + final EpochNanoClock clock, final long connectionId, final int libraryId, + final ErrorHandler errorHandler, final EpochFractionFormat epochFractionPrecision + ) + { + super(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, clock, connectionId, + libraryId, errorHandler, epochFractionPrecision); + } + + @Override + public long onResend( + final Session session, final AbstractResendRequestDecoder resendRequest, + final int endSeqNo, final ResendRequestResponse response, + final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength + ) + { + System.err.println("onResend() called"); + if (useProxy && sleepBeforeSendResendRequest != 0) + { + response.delay(); + final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]); + buf.putBytes(0, messageBuffer, messageOffset, messageLength); + pendingResendRequest = new PendingResendRequest(session, resendRequest.beginSeqNo(), endSeqNo, buf); + } + return 1; + } + + @Override + public long sendResendRequest( + final int msgSeqNo, + final int beginSeqNo, + final int endSeqNo, + final int sequenceIndex, + final int lastMsgSeqNumProcessed) + { + System.err.println("sendResendRequest called with msgSeqNo = " + msgSeqNo); + sendResendRequestCalled = true; + if (sleepBeforeSendResendRequest > 0) + { + new Thread(() -> + { + try + { + Thread.sleep(sleepBeforeSendResendRequest); + } + catch (final InterruptedException ignored) + { + } + System.err.println("Executing super.sendResendRequest() after delay: msgSeqNo = " + msgSeqNo); + super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + if (pendingResendRequest != null) + { + pendingResendRequest.execute(); + } + else + { + System.err.println("onResend not called (async)"); + } + }).start(); + } + else + { + System.err.println("Directly executing sendResendRequest msgSeqNo = " + msgSeqNo); + super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + if (pendingResendRequest != null) + { + pendingResendRequest.execute(); + } + } + return 1; + } + + @Override + public long sendSequenceReset( + final int msgSeqNo, + final int newSeqNo, + final int sequenceIndex, + final int lastMsgSeqNumProcessed) + { + sendSequenceResetCalled = true; + return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + } + + @Override + public boolean isAsync() + { + return true; + } + } + + private SessionProxy sessionProxyFactory( + final int sessionBufferSize, + final GatewayPublication gatewayPublication, + final SessionIdStrategy sessionIdStrategy, + final SessionCustomisationStrategy customisationStrategy, + final EpochNanoClock clock, + final long connectionId, + final int libraryId, + final ErrorHandler errorHandler, + final EpochFractionFormat epochFractionPrecision) + { + return new Proxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, + clock, connectionId, libraryId, errorHandler, epochFractionPrecision); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldNotInvertResendAndResetNoProxy() throws Exception + { + useProxy = false; + reconnectTest(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSendResendBeforeResetSyncProxy() throws Exception + { + useProxy = true; + sleepBeforeSendResendRequest = 0; + reconnectTest(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSendResendBeforeResetAsyncProxy() throws Exception + { + useProxy = true; + sleepBeforeSendResendRequest = 100; + reconnectTest(); + } + + @Ignore // SequenceReset is directly sent by replayer, does not go through SessionProxy + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldCallProxySendSequenceReset() throws Exception + { + useProxy = true; + reconnectTest(); + assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled); + assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled); + } + + private void reconnectTest() throws Exception + { + launch(); + + connectAndAcquire(); + final DebugFIXClient acc1 = new DebugFIXClient(initialAcceptor.popClient(5000)); + acc1.start(); + + acc1.popAndAssert("35=A 34=1"); + acc1.popAndAssert("35=0 34=2 112=hello"); + acc1.close(); + initialAcceptor.stop(); + assertEquals(2, initiatingSession.lastReceivedMsgSeqNum()); + + final DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|", + "8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|" + ); + srv.start(); + autoClose.add(srv::stop); + + connectPersistentSessions(4, 4, false); + + final DebugFIXClient acc2 = new DebugFIXClient(srv.popClient(5000)); + acc2.start(); + autoClose.add(acc2::close); + acc2.popAndAssert("35=A 34=4"); + acc2.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first + acc2.popAndAssert("35=4 34=4 36=6"); + } + + @Override + public void close() + { + for (final AutoCloseable autoCloseable : autoClose) + { + try + { + autoCloseable.close(); + } + catch (final Exception ignored) + { + } + } + super.close(); + } + + private void connectAndAcquire() + { + connectSessions(); + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java new file mode 100644 index 0000000000..81bca804bf --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java @@ -0,0 +1,113 @@ +package uk.co.real_logic.artio.util; + +import org.junit.Assert; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Scanner; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Helper to pop FIX messages received on a socket. + * + * @see DebugServer + */ +public class DebugFIXClient +{ + private final DebugServer.HasIOStream io; + private Thread thread; + + private final BlockingQueue> messages = new LinkedBlockingQueue<>(); + private volatile boolean disposed; + private String prefix = " <<< "; + + public DebugFIXClient(final DebugServer.HasIOStream io) + { + this.io = Objects.requireNonNull(io); + } + + public void start() + { + assert thread == null; + thread = new Thread(this::run, "DebugFIXClient"); + thread.start(); + } + + public void close() throws Exception + { + disposed = true; + io.in.close(); + io.in.close(); + io.socket.close(); + thread.interrupt(); + thread.join(); + } + + private void run() + { + final StringBuilder s = new StringBuilder(128); + while (!disposed) + { + final Scanner scanner = new Scanner(io.in).useDelimiter("\u0001"); + Map msg = new HashMap<>(); + while (scanner.hasNext()) + { + final String fld = scanner.next(); + s.append(fld).append('|'); + final int eq = fld.indexOf('='); + final String tag = fld.substring(0, eq); + msg.put(tag, fld.substring(eq + 1)); + if (tag.equals("10")) + { + messages.add(msg); + msg = new HashMap<>(); + System.err.println(prefix + s); + s.setLength(0); + } + } + } + } + + public Map popMessage() throws InterruptedException + { + return messages.poll(5, TimeUnit.SECONDS); + } + + /** + * Pop a message and check that it contains some field/value pairs. + * + * @param tagValues a string of the form "35=5 58=Bye" + */ + public void popAndAssert(final String tagValues) throws InterruptedException + { + final Map map = popMessage(); + System.err.println(map); + if (map == null) + { + throw new AssertionError("No message received"); + } + + for (final String rule : tagValues.split(" ")) + { + final String tag = rule.substring(0, rule.indexOf('=')); + final String value = map.get(tag); + try + { + Assert.assertEquals(rule, tag + "=" + value); + } + catch (final Throwable e) + { + e.printStackTrace(); + throw e; + } + } + } + + public void setPrefix(final String prefix) + { + this.prefix = prefix; + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java new file mode 100644 index 0000000000..78dd8e856f --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java @@ -0,0 +1,168 @@ +package uk.co.real_logic.artio.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A server that accepts TCP connections and is able to reply automatically with canned + * data. It can be used to simulate a FIX server in order to quickly sent specific messages. + */ +public class DebugServer +{ + private final int port; + private final Queue connectResponses; + private final BlockingQueue clients; + private final ServerSocket serverSocket; + + /** + * If true, wait until some data is received before sending prepared messages. + */ + private boolean waitForData; + + /** + * Creates a debug server listening on specified port. + * + * @param port TCP listening port + */ + public DebugServer(final int port) throws IOException + { + this.port = port; + this.connectResponses = new ConcurrentLinkedQueue<>(); + this.clients = new LinkedBlockingQueue<>(); + this.serverSocket = new ServerSocket(port); + } + + /** + * Adds a message that must be directly sent to connecting clients. Messages + * are sent in the same order they were added. + * + * @param message binary message to send to new clients + */ + public void addConnectResponse(final byte[] message) + { + connectResponses.add(message); + } + + /** + * Warning: causes problems because SendingTime and checksum needs to be regenerated + * and they are not. + * + * @param message FIX message to automatically send to new clients + */ + public void addFIXResponse(final String... message) + { + for (final String msg : message) + { + addConnectResponse(FixMessageTweak.recycle(msg)); + } + } + + /** + * Starts the debug server, accepting incoming connections and sending + * prepared data. + */ + public void start() throws IOException + { + new Thread("DebugServer-" + port) + { + @Override + public void run() + { + try + { + while (!serverSocket.isClosed()) + { + final Socket socket = serverSocket.accept(); + System.out.println("Connection accepted from " + socket.getInetAddress()); + try + { + final BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + final BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream()); + + if (!connectResponses.isEmpty() && waitForData) + { + in.mark(0); + in.read(); + in.reset(); + } + + final HasIOStream client = new HasIOStream(socket, in, out); + sendResponses(client.out); + clients.add(client); + } + catch (final IOException e) + { + e.printStackTrace(); + } + } + } + catch (final IOException e) + { + if (!serverSocket.isClosed()) + { + e.printStackTrace(); + } + } + } + }.start(); + } + + public void stop() throws IOException + { + serverSocket.close(); + } + + /** + * Sends prepared data to the client. + * + * @param outputStream output stream for client + */ + private void sendResponses(final OutputStream outputStream) throws IOException + { + for (final byte[] response : connectResponses) + { + outputStream.write(response); + outputStream.flush(); + } + } + + public HasIOStream popClient(final long timeoutMs) throws InterruptedException + { + return clients.poll(timeoutMs, TimeUnit.MILLISECONDS); + } + + public int getPort() + { + return port; + } + + public void setWaitForData(final boolean waitForData) + { + this.waitForData = waitForData; + } + + public static class HasIOStream + { + + public final Socket socket; + public final InputStream in; + public final OutputStream out; + + public HasIOStream(final Socket socket, final InputStream in, final OutputStream out) + { + this.socket = socket; + this.in = in; + this.out = out; + } + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java new file mode 100644 index 0000000000..457106909d --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java @@ -0,0 +1,51 @@ +package uk.co.real_logic.artio.util; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +/** + * Easily generate valid FIX messages from captured data. + */ +public class FixMessageTweak +{ + + /** + * Replaces sending time with current time and recompute BodyLength / Checksum. + * + * @param asciiMsg a FIX message where SOH character may have been replaced with '|' + * @return a FIX message ready to send on a socket + */ + public static byte[] recycle(final String asciiMsg) + { + String msg = asciiMsg; + msg = msg.replace('|', '\001'); + + // Replace Sending Time (52) with current time + final String time = LocalDateTime + .now(ZoneOffset.UTC) + .format(DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSS")); + msg = msg.replaceAll("52=[^\u0001]*\u0001", "52=" + time + '\001'); + + // recompute body length + final int body = msg.indexOf('\001', 10) + 1; + final int trailer = msg.indexOf("10="); + msg = msg.replaceAll("9=\\d+", "9=" + (trailer - body)); + + // recompute checksum + msg = msg.replaceAll("10=[0-9]{3}", "10=" + computeChecksum(msg)); + + return msg.getBytes(StandardCharsets.UTF_8); + } + + private static String computeChecksum(final String fixMessage) + { + int checksum = 0; + for (int i = fixMessage.indexOf("10=") - 1; i >= 0; i--) + { + checksum += fixMessage.charAt(i); + } + return String.format("%03d", checksum % 256); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e0cd7acdfa..8e056409bb 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,8 +1,8 @@ [versions] aeron = "1.46.7" agrona = "1.23.1" -byteBuddy = "1.15.7" -checkstyle = "10.19.0" +byteBuddy = "1.15.10" +checkstyle = "10.20.1" junit = "5.11.3" gradle = "8.10.2" jmh = "1.37"