Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to reply asynchronously to a ResendRequest #505

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Helps IDEA users apply some of the formatting rules enforced by checkstyle

root = true

[*.java]
indent_size = 4
max_line_length = 120
ij_java_method_brace_style = next_line
ij_java_block_brace_style = next_line
ij_java_else_on_new_line = true
ij_java_class_brace_style = next_line
ij_java_space_after_type_cast = false
ij_any_catch_on_new_line = true
ij_any_spaces_around_equality_operators = true
ij_java_continuation_indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class CommonDecoderImpl
protected int invalidTagId = Decoder.NO_ERROR;
protected int rejectReason = Decoder.NO_ERROR;
protected AsciiBuffer buffer;
protected UnknownTagVisitor unknownTagVisitor;

public int invalidTagId()
{
Expand All @@ -42,6 +43,11 @@ public int rejectReason()
return rejectReason;
}

public void setUnknownTagVisitor(final UnknownTagVisitor unknownTagVisitor)
{
this.unknownTagVisitor = unknownTagVisitor;
}

public int getInt(
final AsciiBuffer buffer,
final int startInclusive, final int endExclusive, final int tag, final boolean validation)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package uk.co.real_logic.artio.builder;

import uk.co.real_logic.artio.util.MutableAsciiBuffer;

/**
* Class provides common implementation methods used by encoders.
*/
public class CommonEncoderImpl
{
protected MutableAsciiBuffer customTagsBuffer = new MutableAsciiBuffer(new byte[128]);

public void setCustomTagsBuffer(final MutableAsciiBuffer customTagsBuffer)
{
this.customTagsBuffer = customTagsBuffer;
}

protected int customTagsLength = 0;

private int putTagHeader(final int tag)
{
int pos = customTagsLength;
pos += customTagsBuffer.putIntAscii(pos, tag);
customTagsBuffer.putByte(pos++, (byte)'=');
return pos;
}

public CommonEncoderImpl customTag(final int tag, final boolean value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putCharAscii(pos, value ? 'Y' : 'N');
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTag(final int tag, final int value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putIntAscii(pos, value);
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTag(final int tag, final char value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putCharAscii(pos, value);
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTag(final int tag, final long value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putLongAscii(pos, value);
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTag(final int tag, final double value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putAscii(pos, String.valueOf(value));
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTagAscii(final int tag, final CharSequence value)
{
int pos = putTagHeader(tag);
pos += customTagsBuffer.putStringWithoutLengthAscii(pos, value);
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public CommonEncoderImpl customTagAscii(final int tag, final byte[] value)
{
int pos = putTagHeader(tag);
customTagsBuffer.putBytes(pos, value);
pos += value.length;
customTagsBuffer.putSeparator(pos++);
customTagsLength = pos;
return this;
}

public void resetCustomTags()
{
customTagsLength = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,11 @@ public interface Decoder extends CharAppender
* @return the encoder passed as an argument.
*/
Encoder toEncoder(Encoder encoder);

/**
* Sets a visitor to be called when an unknown tag is encountered while decoding a message.
*
* @param visitor the visitor to invoke
*/
void setUnknownTagVisitor(UnknownTagVisitor visitor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package uk.co.real_logic.artio.builder;

import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* Interface for visiting unknown tags.
*
* @see CommonDecoderImpl#setUnknownTagVisitor(UnknownTagVisitor)
*/
public interface UnknownTagVisitor
{
/**
* Called when an unknown tag is encountered while decoding a message.
*
* @param tag The tag number of the unknown tag
* @param buffer The buffer containing the unknown tag
* @param offset The offset at which the unknown tag starts
* @param length The length of the unknown tag
*/
void onUnknownTag(
int tag,
AsciiBuffer buffer,
int offset,
int length
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1691,24 +1691,28 @@ private String decodeMethod(final List<Entry> entries, final Aggregate aggregate

final String suffix =
" default:\n" +
" boolean isTrailer = " + unknownFieldPredicate(type) + ";\n" +
" if (!" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED + ")\n" +
" {\n" +
(isGroup ?
" seenFields.remove(tag);\n" :
" alreadyVisitedFields.remove(tag);\n") +
" if (unknownTagVisitor != null && !isTrailer)\n" +
" {\n" +
" unknownTagVisitor.onUnknownTag(tag, buffer, valueOffset, valueLength);\n" +
" }\n" +
" }\n" +
(isGroup ? "" :
" else\n" +
" {\n" +
" if (!" + unknownFieldPredicate(type) + ")\n" +
" if (!isTrailer)\n" +
" {\n" +
" unknownFields.add(tag);\n" +
" }\n" +
" }\n") +

// Skip the thing if it's a completely unknown field and you aren't validating messages
" if (" + CODEC_REJECT_UNKNOWN_FIELD_ENABLED +
" || " + unknownFieldPredicate(type) + ")\n" +
" || isTrailer)\n" +
" {\n" +
decodeTrailerOrReturn(hasCommonCompounds, 5) +
" }\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.generation.OutputManager;
import uk.co.real_logic.artio.builder.CommonEncoderImpl;
import uk.co.real_logic.artio.builder.Encoder;
import uk.co.real_logic.artio.builder.FieldBagEncoder;
import uk.co.real_logic.artio.builder.SessionHeaderEncoder;
Expand Down Expand Up @@ -210,6 +211,7 @@ protected void generateAggregateFile(final Aggregate aggregate, final AggregateT
MutableDirectBuffer.class,
UnsafeBuffer.class,
AsciiSequenceView.class,
CommonEncoderImpl.class,
FieldBagEncoder.class);
generateAggregateClass(aggregate, aggregateType, className, out);
});
Expand Down Expand Up @@ -342,7 +344,7 @@ private String classDeclaration(
}
else
{
extendsClause = "";
extendsClause = " extends CommonEncoderImpl";
}
return String.format(
"\n" +
Expand All @@ -368,6 +370,9 @@ private String completeResetMethod(
case HEADER:
additionalReset = " beginStringAsCopy(DEFAULT_BEGIN_STRING, 0, DEFAULT_BEGIN_STRING.length);\n";
break;
case MESSAGE:
additionalReset = " resetCustomTags();\n";
break;
default:
additionalReset = "";
}
Expand Down Expand Up @@ -994,6 +999,13 @@ private String encodeMethod(final List<Entry> entries, final AggregateType aggre
if (aggregateType == AggregateType.MESSAGE)
{
suffix =
"\n" +
" if (customTagsLength > 0)\n" +
" {\n" +
" buffer.putBytes(position, customTagsBuffer, 0, customTagsLength);\n" +
" position += customTagsLength;\n" +
" }\n" +
"\n" +
" position += trailer.startTrailer(buffer, position);\n" +
"\n" +
" final int messageStart = header.finishHeader(buffer, bodyStart, position - bodyStart);\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public final class ExampleDictionary
"8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=A\001127=19700101-00:00:00.001" +
"\00110=043\001";

public static final String UNKNOWN_TAG_MESSAGE =
"8=FIX.4.4\0019=53\00135=0\001115=abc\00110100=FOO\001116=2\001117=A\00110101=BAR" +
"\001127=19700101-00:00:00.001\00110=043\001";

public static final String OUT_OF_RANGE_FLOAT_VALUE_MESSAGE =
"8=FIX.4.4\0019=53\00135=0\001115=abc\001116=2\001117=10000000000000000000000\001" +
"127=19700101-00:00:00.001\00110=043\001";
Expand Down Expand Up @@ -423,6 +427,13 @@ public final class ExampleDictionary
"8=FIX.4.4\0019=91\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" +
"\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\00110=176\001";

public static final String WITH_CUSTOM_TAGS =
"8=FIX.4.4\0019=110\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" +
"\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20" +
"\00110100=42" +
"\00110101=foo" +
"\00110=227\001";

public static final String NESTED_COMPONENT_MESSAGE =
"8=FIX.4.4\0019=120\00135=0\001115=abc\001116=2\001117=1.1\001127=19700101-00:00:00.001" +
"\001124=2\001130=2\001131=1\001404=10\001131=2\001404=20\001141=180\001142=2\001143=99\001143=100\001" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.junit.Test;
import uk.co.real_logic.artio.builder.Decoder;

import java.util.HashMap;

import static org.junit.Assert.assertEquals;
import static uk.co.real_logic.artio.dictionary.ExampleDictionary.*;
import static uk.co.real_logic.artio.util.CustomMatchers.assertTargetThrows;
Expand Down Expand Up @@ -65,4 +67,15 @@ public void shouldNotThrowWhenAccessingUnsetString() throws Exception
assertTargetThrows(() -> getAsciiSequenceView(decoder, "testReqID"), IllegalArgumentException.class,
"No value for optional field: TestReqID");
}

@Test
public void shouldVisitUnknownTags() throws Exception
{
final Decoder decoder = newHeartbeat();
final HashMap<Integer, String> map = new HashMap<>();
decoder.setUnknownTagVisitor((tag, buffer, offset, length) -> map.put(tag, buffer.getAscii(offset, length)));
decode(UNKNOWN_TAG_MESSAGE, decoder);
assertEquals("FOO", map.get(10100));
assertEquals("BAR", map.get(10101));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import uk.co.real_logic.artio.EncodingException;
import uk.co.real_logic.artio.builder.CommonEncoderImpl;
import uk.co.real_logic.artio.builder.Encoder;
import uk.co.real_logic.artio.builder.FieldBagEncoder;
import uk.co.real_logic.artio.fields.DecimalFloat;
Expand Down Expand Up @@ -737,6 +738,19 @@ public void shouldValidateMissingRequiredFloatFields() throws Exception
encoder.encode(buffer, 1);
}

@Test
public void shouldSetCustomTags() throws Exception
{
final Encoder encoder = newHeartbeat();
setRequiredFields(encoder);
setupComponent(encoder);

((CommonEncoderImpl)encoder).customTag(10100, 42);
((CommonEncoderImpl)encoder).customTagAscii(10101, "foo");

assertEncodesTo(encoder, WITH_CUSTOM_TAGS);
}

@Test(expected = EncodingException.class)
public void shouldValidateMissingRequiredIntFields() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* Customer interface to control whether resend requests are responded to or not.
Expand All @@ -33,11 +34,16 @@ public interface ResendRequestController
* (eg: begin sequence number &gt; end sequence number or begin sequence number &gt; last sent sequence number)
* then this callback won't be invoked.
*
* SessionProxy is now also notified immediately after this method is called, with additional parameters that
* allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by
* ResendRequestController.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
void onResend(
Session session,
Expand Down
Loading
Loading