Fixing an issue with missing internal client on TCP (caused by AddressFamily.Unknown in default constructor)
This commit is contained in:
@@ -11,7 +11,7 @@ namespace System.IO
|
||||
int offset = 0;
|
||||
do
|
||||
{
|
||||
int count = await stream.ReadAsync(buffer, offset, expectedBytes - offset, cancellationToken).ConfigureAwait(false);
|
||||
int count = await stream.ReadAsync(buffer, offset, expectedBytes - offset, cancellationToken);
|
||||
if (count < 1)
|
||||
throw new EndOfStreamException();
|
||||
|
||||
|
||||
@@ -26,13 +26,17 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
private bool _isDisposed;
|
||||
private readonly CancellationTokenSource _disposeCts = new();
|
||||
|
||||
private readonly TcpClientWrapperFactory _tcpClientFactory = new();
|
||||
private readonly SemaphoreSlim _clientLock = new(1, 1);
|
||||
private readonly TcpClientWrapper _tcpClient = new();
|
||||
private TcpClientWrapper _tcpClient = null;
|
||||
private readonly Timer _idleTimer;
|
||||
|
||||
private readonly Task _processingTask;
|
||||
private readonly AsyncQueue<RequestQueueItem> _requestQueue = new();
|
||||
|
||||
private TimeSpan _readTimeout = TimeSpan.FromMilliseconds(1);
|
||||
private TimeSpan _writeTimeout = TimeSpan.FromMilliseconds(1);
|
||||
|
||||
#endregion Fields
|
||||
|
||||
/// <summary>
|
||||
@@ -58,15 +62,33 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
/// <inheritdoc/>
|
||||
public virtual TimeSpan ReadTimeout
|
||||
{
|
||||
get => TimeSpan.FromMilliseconds(_tcpClient.ReceiveTimeout);
|
||||
set => _tcpClient.ReceiveTimeout = (int)value.TotalMilliseconds;
|
||||
get => _readTimeout;
|
||||
set
|
||||
{
|
||||
if (value < TimeSpan.Zero)
|
||||
throw new ArgumentOutOfRangeException(nameof(value));
|
||||
|
||||
_readTimeout = value;
|
||||
|
||||
if (_tcpClient != null)
|
||||
_tcpClient.ReceiveTimeout = (int)value.TotalMilliseconds;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual TimeSpan WriteTimeout
|
||||
{
|
||||
get => TimeSpan.FromMilliseconds(_tcpClient.SendTimeout);
|
||||
set => _tcpClient.SendTimeout = (int)value.TotalMilliseconds;
|
||||
get => _writeTimeout;
|
||||
set
|
||||
{
|
||||
if (value < TimeSpan.Zero)
|
||||
throw new ArgumentOutOfRangeException(nameof(value));
|
||||
|
||||
_writeTimeout = value;
|
||||
|
||||
if (_tcpClient != null)
|
||||
_tcpClient.SendTimeout = (int)value.TotalMilliseconds;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -116,7 +138,6 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
|
||||
try
|
||||
{
|
||||
_processingTask.Wait();
|
||||
_processingTask.Dispose();
|
||||
}
|
||||
catch
|
||||
@@ -124,7 +145,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
|
||||
OnIdleTimer(null);
|
||||
|
||||
_tcpClient.Dispose();
|
||||
_tcpClient?.Dispose();
|
||||
_clientLock.Dispose();
|
||||
|
||||
while (_requestQueue.TryDequeue(out var item))
|
||||
@@ -164,7 +185,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
{
|
||||
Request = [.. request],
|
||||
ValidateResponseComplete = validateResponseComplete,
|
||||
TaskCompletionSource = new(),
|
||||
TaskCompletionSource = new TaskCompletionSource<IReadOnlyList<byte>>(),
|
||||
CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)
|
||||
};
|
||||
|
||||
@@ -187,7 +208,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
try
|
||||
{
|
||||
// Get next request to process
|
||||
var item = await _requestQueue.DequeueAsync(cancellationToken).ConfigureAwait(false);
|
||||
var item = await _requestQueue.DequeueAsync(cancellationToken);
|
||||
|
||||
// Remove registration => already removed from queue
|
||||
item.CancellationTokenRegistration.Dispose();
|
||||
@@ -195,19 +216,19 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
// Build combined cancellation token
|
||||
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, item.CancellationTokenSource.Token);
|
||||
// Wait for exclusive access
|
||||
await _clientLock.WaitAsync(linkedCts.Token).ConfigureAwait(false);
|
||||
await _clientLock.WaitAsync(linkedCts.Token);
|
||||
try
|
||||
{
|
||||
// Ensure connection is up
|
||||
await AssertConnection(linkedCts.Token).ConfigureAwait(false);
|
||||
await AssertConnection(linkedCts.Token);
|
||||
|
||||
var stream = _tcpClient.GetStream();
|
||||
await stream.FlushAsync(linkedCts.Token).ConfigureAwait(false);
|
||||
await stream.FlushAsync(linkedCts.Token);
|
||||
|
||||
#if NET6_0_OR_GREATER
|
||||
await stream.WriteAsync(item.Request, linkedCts.Token).ConfigureAwait(false);
|
||||
await stream.WriteAsync(item.Request, linkedCts.Token);
|
||||
#else
|
||||
await stream.WriteAsync(item.Request, 0, item.Request.Length, linkedCts.Token).ConfigureAwait(false);
|
||||
await stream.WriteAsync(item.Request, 0, item.Request.Length, linkedCts.Token);
|
||||
#endif
|
||||
|
||||
linkedCts.Token.ThrowIfCancellationRequested();
|
||||
@@ -218,9 +239,9 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
do
|
||||
{
|
||||
#if NET6_0_OR_GREATER
|
||||
int readCount = await stream.ReadAsync(buffer, linkedCts.Token).ConfigureAwait(false);
|
||||
int readCount = await stream.ReadAsync(buffer, linkedCts.Token);
|
||||
#else
|
||||
int readCount = await stream.ReadAsync(buffer, 0, buffer.Length, linkedCts.Token).ConfigureAwait(false);
|
||||
int readCount = await stream.ReadAsync(buffer, 0, buffer.Length, linkedCts.Token);
|
||||
#endif
|
||||
if (readCount < 1)
|
||||
throw new EndOfStreamException();
|
||||
@@ -267,7 +288,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
// Has to be called within _clientLock!
|
||||
private async Task AssertConnection(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_tcpClient.Connected)
|
||||
if (_tcpClient?.Connected == true)
|
||||
return;
|
||||
|
||||
int delay = 1;
|
||||
@@ -284,14 +305,16 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
{
|
||||
foreach (var ipAddress in ipAddresses)
|
||||
{
|
||||
_tcpClient.Close();
|
||||
_tcpClient?.Close();
|
||||
_tcpClient?.Dispose();
|
||||
|
||||
_tcpClient = _tcpClientFactory.Create(ipAddress.AddressFamily, _readTimeout, _writeTimeout);
|
||||
#if NET6_0_OR_GREATER
|
||||
using var connectTask = _tcpClient.ConnectAsync(ipAddress, Port, cancellationToken);
|
||||
var connectTask = _tcpClient.ConnectAsync(ipAddress, Port, cancellationToken);
|
||||
#else
|
||||
using var connectTask = _tcpClient.ConnectAsync(ipAddress, Port);
|
||||
var connectTask = _tcpClient.ConnectAsync(ipAddress, Port);
|
||||
#endif
|
||||
if (await Task.WhenAny(connectTask, Task.Delay(ReadTimeout, cancellationToken)) == connectTask)
|
||||
if (await Task.WhenAny(connectTask, Task.Delay(_readTimeout, cancellationToken)) == connectTask)
|
||||
{
|
||||
await connectTask;
|
||||
if (_tcpClient.Connected)
|
||||
@@ -309,7 +332,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(delay), cancellationToken).ConfigureAwait(false);
|
||||
await Task.Delay(TimeSpan.FromSeconds(delay), cancellationToken);
|
||||
}
|
||||
catch
|
||||
{ /* keep it quiet */ }
|
||||
|
||||
@@ -218,11 +218,11 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
try
|
||||
{
|
||||
#if NET8_0_OR_GREATER
|
||||
var client = await _listener.AcceptTcpClientAsync(cancellationToken).ConfigureAwait(false);
|
||||
var client = await _listener.AcceptTcpClientAsync(cancellationToken);
|
||||
#else
|
||||
var client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
|
||||
var client = await _listener.AcceptTcpClientAsync();
|
||||
#endif
|
||||
await _clientListLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await _clientListLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
_clients.Add(client);
|
||||
@@ -252,20 +252,20 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
using (var cts = new CancellationTokenSource(ReadWriteTimeout))
|
||||
using (cancellationToken.Register(cts.Cancel))
|
||||
{
|
||||
byte[] headerBytes = await stream.ReadExpectedBytesAsync(6, cts.Token).ConfigureAwait(false);
|
||||
byte[] headerBytes = await stream.ReadExpectedBytesAsync(6, cts.Token);
|
||||
requestBytes.AddRange(headerBytes);
|
||||
|
||||
byte[] followingCountBytes = headerBytes.Skip(4).Take(2).ToArray();
|
||||
followingCountBytes.SwapBigEndian();
|
||||
int followingCount = BitConverter.ToUInt16(followingCountBytes, 0);
|
||||
|
||||
byte[] bodyBytes = await stream.ReadExpectedBytesAsync(followingCount, cts.Token).ConfigureAwait(false);
|
||||
byte[] bodyBytes = await stream.ReadExpectedBytesAsync(followingCount, cts.Token);
|
||||
requestBytes.AddRange(bodyBytes);
|
||||
}
|
||||
|
||||
byte[] responseBytes = HandleRequest([.. requestBytes]);
|
||||
if (responseBytes != null)
|
||||
await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken).ConfigureAwait(false);
|
||||
await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken);
|
||||
}
|
||||
}
|
||||
catch
|
||||
@@ -274,7 +274,7 @@ namespace AMWD.Protocols.Modbus.Tcp
|
||||
}
|
||||
finally
|
||||
{
|
||||
await _clientListLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
await _clientListLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
_clients.Remove(client);
|
||||
|
||||
@@ -8,11 +8,11 @@ namespace AMWD.Protocols.Modbus.Tcp.Utils
|
||||
{
|
||||
/// <inheritdoc cref="TcpClient" />
|
||||
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
|
||||
internal class TcpClientWrapper : IDisposable
|
||||
internal class TcpClientWrapper(AddressFamily addressFamily) : IDisposable
|
||||
{
|
||||
#region Fields
|
||||
|
||||
private readonly TcpClient _client = new();
|
||||
private readonly TcpClient _client = new(addressFamily);
|
||||
|
||||
#endregion Fields
|
||||
|
||||
|
||||
26
AMWD.Protocols.Modbus.Tcp/Utils/TcpClientWrapperFactory.cs
Normal file
26
AMWD.Protocols.Modbus.Tcp/Utils/TcpClientWrapperFactory.cs
Normal file
@@ -0,0 +1,26 @@
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace AMWD.Protocols.Modbus.Tcp.Utils
|
||||
{
|
||||
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
|
||||
internal class TcpClientWrapperFactory
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates a new instance of <see cref="TcpClientWrapper"/>.
|
||||
/// </summary>
|
||||
/// <param name="addressFamily">The <see cref="AddressFamily"/> of the <see cref="TcpClient"/> to use.</param>
|
||||
/// <param name="readTimeout">The read timeout.</param>
|
||||
/// <param name="writeTimeout">The write timeout.</param>
|
||||
/// <returns>A new <see cref="TcpClientWrapper"/> instance.</returns>
|
||||
public virtual TcpClientWrapper Create(AddressFamily addressFamily, TimeSpan readTimeout, TimeSpan writeTimeout)
|
||||
{
|
||||
var client = new TcpClientWrapper(addressFamily)
|
||||
{
|
||||
ReceiveTimeout = (int)readTimeout.TotalMilliseconds,
|
||||
SendTimeout = (int)writeTimeout.TotalMilliseconds
|
||||
};
|
||||
return client;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user