Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"profiles": {
"ConsoleReferenceServer": {
"commandName": "Project",
"commandLineArgs": "-l -c"
}
}
}
10 changes: 10 additions & 0 deletions Libraries/Opc.Ua.Client/Session/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ public interface ISession : ISessionClient
/// </summary>
bool DeleteSubscriptionsOnClose { get; set; }

/// <summary>
/// Gets or sets the time in milliseconds to wait for outstanding publish requests to complete before canceling them during session close.
/// </summary>
/// <remarks>
/// A value of 0 means no waiting - outstanding requests are canceled immediately.
/// A negative value means wait indefinitely for all outstanding requests to complete.
/// The default value is 5000 milliseconds (5 seconds).
/// </remarks>
int PublishRequestCancelDelayOnCloseSession { get; set; }

/// <summary>
/// Gets or Sets the default subscription for the session.
/// </summary>
Expand Down
257 changes: 209 additions & 48 deletions Libraries/Opc.Ua.Client/Session/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public Session(ITransportChannel channel, Session template, bool copyEventHandle
m_defaultSubscription = template.m_defaultSubscription;
DeleteSubscriptionsOnClose = template.DeleteSubscriptionsOnClose;
TransferSubscriptionsOnReconnect = template.TransferSubscriptionsOnReconnect;
PublishRequestCancelDelayOnCloseSession = template.PublishRequestCancelDelayOnCloseSession;
m_sessionTimeout = template.m_sessionTimeout;
m_maxRequestMessageSize = template.m_maxRequestMessageSize;
m_minPublishRequestCount = template.m_minPublishRequestCount;
Expand Down Expand Up @@ -244,8 +245,10 @@ private void Initialize()
m_maxPublishRequestCount = kMaxPublishRequestCountMax;
m_sessionName = string.Empty;
DeleteSubscriptionsOnClose = true;
PublishRequestCancelDelayOnCloseSession = 5000; // 5 seconds default
TransferSubscriptionsOnReconnect = false;
Reconnecting = false;
Closing = false;
m_reconnectLock = new SemaphoreSlim(1, 1);
ServerMaxContinuationPointsPerBrowse = 0;
}
Expand Down Expand Up @@ -488,6 +491,11 @@ public event EventHandler SessionConfigurationChanged
/// </summary>
public bool Reconnecting { get; private set; }

/// <summary>
/// Whether the session is closing
/// </summary>
public bool Closing { get; private set; }

/// <summary>
/// Gets the period for wich the server will maintain the session if
/// there is no communication from the client.
Expand Down Expand Up @@ -587,6 +595,9 @@ public int SubscriptionCount
/// </remarks>
public bool DeleteSubscriptionsOnClose { get; set; }

/// <inheritdoc/>
public int PublishRequestCancelDelayOnCloseSession { get; set; }

/// <summary>
/// If the subscriptions are transferred when a session is reconnected.
/// </summary>
Expand Down Expand Up @@ -3907,70 +3918,82 @@ public virtual async Task<StatusCode> CloseAsync(

StatusCode result = StatusCodes.Good;

// stop the keep alive timer.
StopKeepAliveTimer();

// check if correctly connected.
bool connected = Connected;
Closing = true;

// halt all background threads.
if (connected && m_SessionClosing != null)
try
{
try
{
m_SessionClosing(this, null);
}
catch (Exception e)
// stop the keep alive timer.
StopKeepAliveTimer();

// check if correctly connected.
bool connected = Connected;

// halt all background threads.
if (connected && m_SessionClosing != null)
{
m_logger.LogError(e, "Session: Unexpected error raising SessionClosing event.");
try
{
m_SessionClosing(this, null);
}
catch (Exception e)
{
m_logger.LogError(e, "Session: Unexpected error raising SessionClosing event.");
}
}
}

// close the session with the server.
if (connected && !KeepAliveStopped)
{
try
// close the session with the server.
if (connected && !KeepAliveStopped)
{
// close the session and delete all subscriptions if specified.
var requestHeader = new RequestHeader
try
{
TimeoutHint = timeout > 0
? (uint)timeout
: (uint)(OperationTimeout > 0 ? OperationTimeout : 0)
};
CloseSessionResponse response = await base.CloseSessionAsync(
requestHeader,
DeleteSubscriptionsOnClose,
ct)
.ConfigureAwait(false);
// Wait for or cancel outstanding publish requests before closing session.
await WaitForOrCancelOutstandingPublishRequestsAsync(ct).ConfigureAwait(false);

if (closeChannel)
// close the session and delete all subscriptions if specified.
var requestHeader = new RequestHeader
{
TimeoutHint = timeout > 0
? (uint)timeout
: (uint)(OperationTimeout > 0 ? OperationTimeout : 0)
};
CloseSessionResponse response = await base.CloseSessionAsync(
requestHeader,
DeleteSubscriptionsOnClose,
ct)
.ConfigureAwait(false);

if (closeChannel)
{
await CloseChannelAsync(ct).ConfigureAwait(false);
}

// raised notification indicating the session is closed.
SessionCreated(null, null);
}
// don't throw errors on disconnect, but return them
// so the caller can log the error.
catch (ServiceResultException sre)
{
await CloseChannelAsync(ct).ConfigureAwait(false);
result = sre.StatusCode;
}
catch (Exception)
{
result = StatusCodes.Bad;
}

// raised notification indicating the session is closed.
SessionCreated(null, null);
}
// don't throw errors on disconnect, but return them
// so the caller can log the error.
catch (ServiceResultException sre)
{
result = sre.StatusCode;
}
catch (Exception)

// clean up.
if (closeChannel)
{
result = StatusCodes.Bad;
Dispose();
}
}

// clean up.
if (closeChannel)
return result;
}
finally
{
Dispose();
Closing = false;
}

return result;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -4355,6 +4378,138 @@ private void StopKeepAliveTimer()
m_keepAliveTimer = null;
}

/// <summary>
/// Waits for outstanding publish requests to complete or cancels them.
/// </summary>
private async Task WaitForOrCancelOutstandingPublishRequestsAsync(CancellationToken ct)
{
// Get outstanding publish requests
List<uint> publishRequestHandles = [];
lock (m_outstandingRequests)
{
foreach (AsyncRequestState state in m_outstandingRequests)
{
if (state.RequestTypeId == DataTypes.PublishRequest && !state.Defunct)
{
publishRequestHandles.Add(state.RequestId);
}
}
}

if (publishRequestHandles.Count == 0)
{
m_logger.LogDebug("No outstanding publish requests to cancel.");
return;
}

m_logger.LogInformation(
"Waiting for {Count} outstanding publish requests to complete before closing session.",
publishRequestHandles.Count);

// Wait for outstanding requests with timeout
if (PublishRequestCancelDelayOnCloseSession != 0)
{
int waitTimeout = PublishRequestCancelDelayOnCloseSession < 0
? int.MaxValue
: PublishRequestCancelDelayOnCloseSession;

int startTime = HiResClock.TickCount;
while (true)
{
// Check if all publish requests completed
int remainingCount = 0;
lock (m_outstandingRequests)
{
foreach (AsyncRequestState state in m_outstandingRequests)
{
if (state.RequestTypeId == DataTypes.PublishRequest && !state.Defunct)
{
remainingCount++;
}
}
}

if (remainingCount == 0)
{
m_logger.LogDebug("All outstanding publish requests completed.");
return;
}

// Check timeout
int elapsed = HiResClock.TickCount - startTime;
if (elapsed >= waitTimeout)
{
m_logger.LogWarning(
"Timeout waiting for {Count} publish requests to complete. Cancelling them.",
remainingCount);
break;
}

// Check cancellation
if (ct.IsCancellationRequested)
{
m_logger.LogWarning("Cancellation requested while waiting for publish requests.");
break;
}

// Wait a bit before checking again
try
{
await Task.Delay(100, ct).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
m_logger.LogWarning("Cancellation requested while waiting for publish requests.");
break;
}
}
}

// Cancel remaining outstanding publish requests
List<uint> requestsToCancel = [];
lock (m_outstandingRequests)
{
foreach (AsyncRequestState state in m_outstandingRequests)
{
if (state.RequestTypeId == DataTypes.PublishRequest && !state.Defunct)
{
requestsToCancel.Add(state.RequestId);
}
}
}

if (requestsToCancel.Count > 0)
{
m_logger.LogInformation(
"Cancelling {Count} outstanding publish requests.",
requestsToCancel.Count);

// Cancel each outstanding publish request
foreach (uint requestHandle in requestsToCancel)
{
try
{
var requestHeader = new RequestHeader
{
TimeoutHint = (uint)OperationTimeout
};

await CancelAsync(requestHeader, requestHandle, ct).ConfigureAwait(false);

m_logger.LogDebug("Cancelled publish request with handle {Handle}.", requestHandle);
}
catch (Exception ex)
{
// Log but don't throw - we're closing anyway
m_logger.LogWarning(
ex,
"Error cancelling publish request with handle {Handle}.",
requestHandle);
}
}
}
}

/// <summary>
/// Removes a completed async request.
/// </summary>
Expand Down Expand Up @@ -5529,6 +5684,12 @@ public IAsyncResult BeginPublish(int timeout)
return null;
}

if (Closing)
{
m_logger.LogWarning("Publish skipped due to session closing");
return null;
}

// get event handler to modify ack list
PublishSequenceNumbersToAcknowledgeEventHandler callback
= m_PublishSequenceNumbersToAcknowledge;
Expand Down
7 changes: 7 additions & 0 deletions Libraries/Opc.Ua.Client/Session/TraceableSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ public bool DeleteSubscriptionsOnClose
set => Session.DeleteSubscriptionsOnClose = value;
}

/// <inheritdoc/>
public int PublishRequestCancelDelayOnCloseSession
{
get => Session.PublishRequestCancelDelayOnCloseSession;
set => Session.PublishRequestCancelDelayOnCloseSession = value;
}

/// <inheritdoc/>
public Subscription DefaultSubscription
{
Expand Down
Loading
Loading