Skip to content

Commit

Permalink
[C] Align send channel validation with Java.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Nov 7, 2024
1 parent 7ef55bb commit 058b1d0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 91 deletions.
145 changes: 60 additions & 85 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,55 @@ static int aeron_driver_conductor_find_existing_receive_channel_endpoint(
*result_endpoint = endpoint;
return 0;
}
static int aeron_driver_conductor_find_existing_send_channel_endpoint(
aeron_driver_conductor_t *conductor,
aeron_udp_channel_t *channel,
aeron_send_channel_endpoint_t **result_endpoint)
{
if (AERON_URI_INVALID_TAG != channel->tag_id)
{
for (size_t i = 0, size = conductor->send_channel_endpoints.length; i < size; i++)
{
aeron_send_channel_endpoint_t *endpoint = conductor->send_channel_endpoints.array[i].endpoint;
if (channel->tag_id == endpoint->conductor_fields.udp_channel->tag_id)
{
if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

*result_endpoint = endpoint;
return 0;
}
}

if (!channel->has_explicit_control &&
AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL != channel->control_mode &&
NULL == channel->uri.params.udp.endpoint)
{
AERON_SET_ERR(
EINVAL,
"URI must have explicit control, endpoint, or be manual control-mode when original: channel=%.*s",
(int)channel->uri_length,
channel->original_uri);
return -1;
}
}

aeron_send_channel_endpoint_t *endpoint = aeron_str_to_ptr_hash_map_get(
&conductor->send_channel_endpoint_by_channel_map, channel->canonical_form, channel->canonical_length);
if (NULL != endpoint &&
AERON_URI_INVALID_TAG != endpoint->conductor_fields.udp_channel->tag_id &&
AERON_URI_INVALID_TAG != channel->tag_id &&
channel->tag_id != endpoint->conductor_fields.udp_channel->tag_id)
{
endpoint = NULL;
}

*result_endpoint = endpoint;
return 0;
}

static bool aeron_driver_conductor_has_clashing_subscription(
aeron_driver_conductor_t *conductor,
Expand Down Expand Up @@ -2352,44 +2401,6 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
return ensure_capacity_result >= 0 ? publication : NULL;
}

aeron_send_channel_endpoint_t *aeron_driver_conductor_find_send_channel_endpoint_by_tag(
aeron_driver_conductor_t *conductor, int64_t channel_tag_id)
{
if (AERON_URI_INVALID_TAG != channel_tag_id)
{
for (size_t i = 0, size = conductor->send_channel_endpoints.length; i < size; i++)
{
aeron_send_channel_endpoint_t *endpoint = conductor->send_channel_endpoints.array[i].endpoint;

if (channel_tag_id == endpoint->conductor_fields.udp_channel->tag_id)
{
return endpoint;
}
}
}

return NULL;
}

aeron_receive_channel_endpoint_t *aeron_driver_conductor_find_receive_channel_endpoint_by_tag(
aeron_driver_conductor_t *conductor, int64_t channel_tag_id)
{
if (AERON_URI_INVALID_TAG != channel_tag_id)
{
for (size_t i = 0, size = conductor->receive_channel_endpoints.length; i < size; i++)
{
aeron_receive_channel_endpoint_t *endpoint = conductor->receive_channel_endpoints.array[i].endpoint;

if (channel_tag_id == endpoint->conductor_fields.udp_channel->tag_id)
{
return endpoint;
}
}
}

return NULL;
}

/* This should be re-usable if/when we decide to reuse transports with MDS */
int aeron_driver_conductor_update_and_check_ats_status(
aeron_driver_context_t *context, aeron_udp_channel_t *channel, const aeron_udp_channel_t *existing_channel)
Expand Down Expand Up @@ -2432,41 +2443,10 @@ aeron_send_channel_endpoint_t *aeron_driver_conductor_get_or_add_send_channel_en
aeron_driver_uri_publication_params_t *params,
int64_t registration_id)
{
aeron_send_channel_endpoint_t *endpoint = aeron_driver_conductor_find_send_channel_endpoint_by_tag(
conductor, channel->tag_id);

if (NULL != endpoint)
{
if (aeron_driver_conductor_tagged_channels_match(endpoint->conductor_fields.udp_channel, channel) < 0)
{
AERON_APPEND_ERR("%s", "");
goto error_cleanup;
}
}
else
aeron_send_channel_endpoint_t *endpoint;
if (aeron_driver_conductor_find_existing_send_channel_endpoint(conductor, channel, &endpoint) < 0)
{
if (AERON_URI_INVALID_TAG != channel->tag_id &&
!channel->has_explicit_control &&
AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL != channel->control_mode &&
NULL == channel->uri.params.udp.endpoint)
{
AERON_SET_ERR(
EINVAL,
"URI must have explicit control, endpoint, or be manual control-mode when original: channel=%.*s",
(int)channel->uri_length,
channel->original_uri);
goto error_cleanup;
}

endpoint = aeron_str_to_ptr_hash_map_get(
&conductor->send_channel_endpoint_by_channel_map, channel->canonical_form, channel->canonical_length);
if (NULL != endpoint &&
AERON_URI_INVALID_TAG != endpoint->conductor_fields.udp_channel->tag_id &&
AERON_URI_INVALID_TAG != channel->tag_id &&
channel->tag_id != endpoint->conductor_fields.udp_channel->tag_id)
{
endpoint = NULL;
}
goto error_cleanup;
}

if (aeron_driver_conductor_update_and_check_ats_status(
Expand Down Expand Up @@ -2522,6 +2502,12 @@ aeron_send_channel_endpoint_t *aeron_driver_conductor_get_or_add_send_channel_en
}
else
{
if (aeron_driver_conductor_send_endpoint_has_clashing_timestamp_offsets(conductor, endpoint, channel))
{
AERON_APPEND_ERR("%s", "");
goto error_cleanup;
}

if (aeron_publication_params_validate_mtu_for_sndbuf(
params,
endpoint->conductor_fields.socket_sndbuf,
Expand Down Expand Up @@ -4116,11 +4102,7 @@ int aeron_driver_conductor_on_add_network_publication_complete(

aeron_driver_uri_publication_params_t params;

if (aeron_diver_uri_publication_params(
&udp_channel->uri,
&params,
conductor,
is_exclusive) < 0 ||
if (aeron_diver_uri_publication_params(&udp_channel->uri, &params, conductor, is_exclusive) < 0 ||
aeron_driver_conductor_validate_experimental_features(
conductor->context->enable_experimental_features, udp_channel) < 0 ||
aeron_driver_conductor_validate_endpoint_for_publication(udp_channel) < 0 ||
Expand Down Expand Up @@ -4151,12 +4133,7 @@ int aeron_driver_conductor_on_add_network_publication_complete(
if (NULL == endpoint)
{
AERON_APPEND_ERR("%s", "");
goto error_cleanup_skip_channel_delete;
}

if (aeron_driver_conductor_send_endpoint_has_clashing_timestamp_offsets(conductor, endpoint, udp_channel))
{
AERON_APPEND_ERR("%s", "");
udp_channel = NULL; // deleted by the previous method
goto error_cleanup;
}

Expand Down Expand Up @@ -4236,8 +4213,6 @@ int aeron_driver_conductor_on_add_network_publication_complete(

error_cleanup:
aeron_udp_channel_delete(udp_channel);

error_cleanup_skip_channel_delete:
return -1;
}

Expand Down
6 changes: 0 additions & 6 deletions aeron-driver/src/main/c/aeron_driver_conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,6 @@ void aeron_driver_conductor_on_publication_error(void *clientd, void *item);

void aeron_driver_conductor_on_release_resource(void *clientd, void *item);

aeron_send_channel_endpoint_t *aeron_driver_conductor_find_send_channel_endpoint_by_tag(
aeron_driver_conductor_t *conductor, int64_t channel_tag_id);

aeron_receive_channel_endpoint_t *aeron_driver_conductor_find_receive_channel_endpoint_by_tag(
aeron_driver_conductor_t *conductor, int64_t channel_tag_id);

inline bool aeron_driver_conductor_is_subscribable_linked(
aeron_subscription_link_t *link, aeron_subscribable_t *subscribable)
{
Expand Down

0 comments on commit 058b1d0

Please sign in to comment.