1
0

Implementing call monitor endpoint

This commit is contained in:
2025-08-24 17:29:25 +02:00
parent 776753dae5
commit 1153741e0b
9 changed files with 646 additions and 0 deletions

View File

@@ -0,0 +1,154 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AMWD.Net.Api.Fritz.CallMonitor.Utils;
using Microsoft.Extensions.Logging;
namespace AMWD.Net.Api.Fritz.CallMonitor
{
/// <summary>
/// Represents a client for monitoring call events from a FRITZ!Box.
/// </summary>
/// <remarks>
/// The FRITZ!Box has a built-in realtime call monitoring feature that can be accessed via TCP on port 1012.
/// </remarks>
public class CallMonitorClient : IDisposable
{
private bool _isDisposed;
private ILogger? _logger;
private readonly ReconnectTcpClient _client;
private readonly CancellationTokenSource _disposeCts;
private Task _monitorTask = Task.CompletedTask;
/// <summary>
/// Initializes a new instance of the <see cref="CallMonitorClient"/> class.
/// </summary>
/// <param name="host">The hostname or IP address of the FRITZ!Box to monitor.</param>
/// <param name="port">The port to connect to (Default: 1012).</param>
/// <exception cref="ArgumentNullException">The hostname is not set.</exception>
/// <exception cref="ArgumentOutOfRangeException">The port is not in valid range of 1 to 65535.</exception>
public CallMonitorClient(string host, int port = 1012)
{
if (string.IsNullOrWhiteSpace(host))
throw new ArgumentNullException(nameof(host));
if (port <= ushort.MinValue || ushort.MaxValue < port)
throw new ArgumentOutOfRangeException(nameof(port));
_disposeCts = new CancellationTokenSource();
_client = new ReconnectTcpClient(host, port) { OnConnected = OnConnected };
// Start the client in the background
_client.StartAsync(_disposeCts.Token).Forget();
}
/// <summary>
/// Occurs when a call monitoring event is raised.
/// </summary>
/// <remarks>
/// The event provides details using the <see cref="CallMonitorEventArgs"/> parameter.
/// </remarks>
public event EventHandler<CallMonitorEventArgs>? OnEvent;
/// <summary>
/// Gets or sets a logger instance.
/// </summary>
public ILogger? Logger
{
get => _logger;
set
{
_logger = value;
_client.Logger = value;
}
}
/// <summary>
/// Releases all resources used by the current instance of the <see cref="CallMonitorClient"/>.
/// </summary>
public void Dispose()
{
if (_isDisposed)
return;
_isDisposed = true;
_disposeCts.Cancel();
try
{
_monitorTask.Wait();
}
catch
{ }
_client.Dispose();
_disposeCts.Dispose();
GC.SuppressFinalize(this);
}
private Task OnConnected(ReconnectTcpClient client)
{
_monitorTask = Task.Run(async () =>
{
try
{
var stream = client.GetStream();
if (stream == null)
return;
string? buffer = null;
byte[] rawBuffer = new byte[4096];
while (!_disposeCts.IsCancellationRequested && client.IsConnected)
{
try
{
int bytesRead = await stream.ReadAsync(rawBuffer, 0, rawBuffer.Length, _disposeCts.Token);
string data = Encoding.UTF8.GetString(rawBuffer, 0, bytesRead);
if (buffer != null)
{
data = buffer + data;
buffer = null;
}
if (!data.EndsWith("\n"))
{
buffer = data;
continue;
}
string[] lines = data.Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries);
foreach (string line in lines)
{
var eventArgs = CallMonitorEventArgs.Parse(line);
if (eventArgs == null)
continue;
Task.Run(() => OnEvent?.Invoke(this, eventArgs), _disposeCts.Token).Forget();
}
}
catch (OperationCanceledException) when (_disposeCts.IsCancellationRequested)
{
// Client was stopped or disposed.
return;
}
catch (Exception ex) when (!_disposeCts.IsCancellationRequested)
{
Logger?.LogError(ex, "Error while reading from the call monitor stream.");
return;
}
}
}
catch
{ }
});
return Task.CompletedTask;
}
}
}

View File

@@ -0,0 +1,34 @@
using System.Runtime.Serialization;
namespace AMWD.Net.Api.Fritz.CallMonitor
{
/// <summary>
/// Represents the types of events that can occur during a call lifecycle on a FRITZ!Box.
/// </summary>
public enum EventType
{
/// <summary>
/// A call is incoming to the Fritz!Box.
/// </summary>
[EnumMember(Value = "RING")]
Ring = 1,
/// <summary>
/// A call is connected - the parties are now talking.
/// </summary>
[EnumMember(Value = "CONNECT")]
Connect = 2,
/// <summary>
/// A call is disconnected - one party has hung up.
/// </summary>
[EnumMember(Value = "DISCONNECT")]
Disconnect = 3,
/// <summary>
/// A call is outgoing from the Fritz!Box.
/// </summary>
[EnumMember(Value = "CALL")]
Call = 4,
}
}

View File

@@ -0,0 +1,98 @@
using System;
using System.Globalization;
namespace AMWD.Net.Api.Fritz.CallMonitor
{
/// <summary>
/// Provides data for call monitoring events, including details about the call type, identifiers, and optional metadata.
/// </summary>
public class CallMonitorEventArgs : EventArgs
{
/// <summary>
/// Gets the timestamp of the event.
/// </summary>
public DateTimeOffset? Timestamp { get; private set; }
/// <summary>
/// Gets the type of event.
/// </summary>
public EventType? Event { get; private set; }
/// <summary>
/// Gets the connection ID.
/// </summary>
public int? ConnectionId { get; private set; }
/// <summary>
/// Gets the line / port of signaled.
/// </summary>
public int? LinePort { get; private set; }
/// <summary>
/// Gets the external number displayed in the FRITZ!Box.
/// </summary>
public string? CallerNumber { get; private set; }
/// <summary>
/// Gets the internal number registered in the FRITZ!Box.
/// </summary>
public string? CalleeNumber { get; private set; }
/// <summary>
/// Gets the duarion of the call (only on <see cref="EventType.Disconnect"/> event).
/// </summary>
public TimeSpan? Duration { get; private set; }
/// <summary>
/// Tries to parse a line from the call monitor output into a <see cref="CallMonitorEventArgs"/> instance.
/// </summary>
/// <param name="line">The line from the call monitor output.</param>
/// <returns><see langword="null"/> when parsing fails, otherwise a new instance of the <see cref="CallMonitorEventArgs"/>.</returns>
internal static CallMonitorEventArgs? Parse(string line)
{
string[] columns = line.Trim().Split(';');
if (!DateTimeOffset.TryParseExact(columns[0], "dd.MM.yy HH:mm:ss", null, DateTimeStyles.None, out var timestamp))
return null;
if (!Enum.TryParse<EventType>(columns[1], true, out var eventType))
return null;
if (!int.TryParse(columns[2], out int connectionId))
return null;
var args = new CallMonitorEventArgs
{
Timestamp = timestamp,
Event = eventType,
ConnectionId = connectionId
};
switch (eventType)
{
case EventType.Ring:
args.CallerNumber = columns[3];
args.CalleeNumber = columns[4];
break;
case EventType.Connect:
args.LinePort = int.TryParse(columns[3], out int connectLinePort) ? connectLinePort : null;
args.CallerNumber = columns[4];
break;
case EventType.Disconnect:
if (int.TryParse(columns[3], out int durationSeconds))
args.Duration = TimeSpan.FromSeconds(durationSeconds);
break;
case EventType.Call:
args.LinePort = int.TryParse(columns[3], out int callLinePort) ? callLinePort : null;
args.CalleeNumber = columns[4];
args.CallerNumber = columns[5];
break;
}
return args;
}
}
}

View File

@@ -0,0 +1,21 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace AMWD.Net.Api.Fritz.CallMonitor.Utils
{
internal static class Extensions
{
public static async void Forget(this Task task, ILogger? logger = null)
{
try
{
await task.ConfigureAwait(false);
}
catch (Exception ex)
{
logger?.LogError(ex, "An error occurred in a fire-and-forget task.");
}
}
}
}

View File

@@ -0,0 +1,222 @@
using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using AMWD.Net.Api.Fritz.CallMonitor.Wrappers;
using Microsoft.Extensions.Logging;
namespace AMWD.Net.Api.Fritz.CallMonitor.Utils
{
internal class ReconnectTcpClient : IDisposable
{
private bool _isDisposed;
private readonly SemaphoreSlim _connectLock = new(1, 1);
private readonly string _host;
private readonly int _port;
private TcpClientWrapper? _tcpClient;
private readonly TcpClientWrapperFactory _tcpClientFactory = new();
private CancellationTokenSource? _stopCts;
private Task _monitorTask = Task.CompletedTask;
public ReconnectTcpClient(string host, int port)
{
if (string.IsNullOrWhiteSpace(host))
throw new ArgumentNullException(nameof(host), "The host is required.");
if (port <= ushort.MinValue || ushort.MaxValue < port)
throw new ArgumentOutOfRangeException(nameof(port), $"The port must be between {ushort.MinValue + 1} and {ushort.MaxValue}.");
_host = host;
_port = port;
}
public virtual bool IsConnected => _tcpClient?.Connected ?? false;
public virtual ILogger? Logger { get; set; }
public virtual Func<ReconnectTcpClient, Task>? OnConnected { get; set; }
public virtual void Dispose()
{
if (_isDisposed)
return;
_isDisposed = true;
StopAsyncInternally(CancellationToken.None).Wait();
_connectLock.Dispose();
}
public virtual NetworkStreamWrapper? GetStream()
{
ThrowIfDisposed();
return _tcpClient?.GetStream();
}
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
_stopCts = new CancellationTokenSource();
using (var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _stopCts.Token))
{
await ConnectWithRetryAsync(combinedTokenSource.Token).ConfigureAwait(false);
if (combinedTokenSource.IsCancellationRequested)
return;
}
_monitorTask = Task.Run(() => MonitorConnectionAsync(_stopCts.Token), _stopCts.Token);
}
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
return StopAsyncInternally(cancellationToken);
}
private async Task StopAsyncInternally(CancellationToken cancellationToken)
{
var stopTask = Task.Run(async () =>
{
_stopCts?.Cancel();
try
{
await _monitorTask.ConfigureAwait(false);
}
catch
{ }
_monitorTask = Task.CompletedTask;
_stopCts?.Dispose();
_stopCts = null;
_tcpClient?.Dispose();
_tcpClient = null;
});
try
{
await Task.WhenAny(stopTask, Task.Delay(Timeout.Infinite, cancellationToken)).ConfigureAwait(false);
}
catch
{ }
}
private async Task ConnectWithRetryAsync(CancellationToken cancellationToken)
{
await _connectLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_isDisposed || IsConnected)
return;
int delay = 250;
while (!cancellationToken.IsCancellationRequested && !_isDisposed)
{
try
{
_tcpClient?.Dispose();
_tcpClient = _tcpClientFactory.Create();
_tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
#if NET6_0_OR_GREATER
var connectTask = _tcpClient.ConnectAsync(_host, _port, cancellationToken);
#else
var connectTask = _tcpClient.ConnectAsync(_host, _port);
#endif
var completedTask = await Task.WhenAny(connectTask, Task.Delay(1000, cancellationToken)).ConfigureAwait(false);
if (completedTask != connectTask)
throw new TimeoutException("Connection attempt timed out.");
if (OnConnected != null)
await OnConnected(this).ConfigureAwait(false);
return;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Client was stopped or disposed.
return;
}
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
delay *= 2;
// Limit the delay to a maximum of 1 minute.
if (delay > 60 * 1000)
delay = 60 * 1000;
Logger?.LogWarning(ex, $"Failed to connect to {_host}:{_port}. Retrying in {delay}ms...");
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch
{ }
}
}
}
finally
{
_connectLock.Release();
}
}
private async Task MonitorConnectionAsync(CancellationToken cancellationToken)
{
try
{
byte[] buffer = new byte[1];
while (!cancellationToken.IsCancellationRequested && !_isDisposed)
{
if (!IsConnected)
{
await ConnectWithRetryAsync(cancellationToken).ConfigureAwait(false);
continue;
}
var stream = _tcpClient?.GetStream();
if (stream != null && stream.CanRead)
{
bool disconnected = false;
try
{
// Attempt to read zero bytes to check if the connection is still alive.
// Should return immediately if the connection is still active.
// So the timeout of 1sec is more than enough.
var readTask = stream.ReadAsync(buffer, 0, 0, cancellationToken);
var completedTask = await Task.WhenAny(readTask, Task.Delay(1000, cancellationToken)).ConfigureAwait(false);
if (completedTask != readTask)
continue; // Timeout
}
catch
{
disconnected = true;
}
if (disconnected || !IsConnected)
await ConnectWithRetryAsync(cancellationToken).ConfigureAwait(false);
}
// Check for an active connection every 5 seconds.
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{ }
}
private void ThrowIfDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName);
}
}
}

View File

@@ -0,0 +1,32 @@
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace AMWD.Net.Api.Fritz.CallMonitor.Wrappers
{
/// <inheritdoc cref="NetworkStream"/>
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal class NetworkStreamWrapper : IDisposable
{
private readonly NetworkStream _networkStream;
public NetworkStreamWrapper(NetworkStream networkStream)
{
_networkStream = networkStream;
}
/// <inheritdoc cref="Stream.Dispose()"/>
public virtual void Dispose()
=> _networkStream.Dispose();
/// <inheritdoc cref="NetworkStream.CanRead"/>
public virtual bool CanRead =>
_networkStream.CanRead;
/// <inheritdoc cref="Stream.ReadAsync(byte[], int, int, CancellationToken)"/>
public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> _networkStream.ReadAsync(buffer, offset, count, cancellationToken);
}
}

View File

@@ -0,0 +1,25 @@
using System;
using System.Net.Sockets;
namespace AMWD.Net.Api.Fritz.CallMonitor.Wrappers
{
/// <inheritdoc cref="Socket"/>
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal class SocketWrapper : IDisposable
{
private readonly Socket _socket;
public SocketWrapper(Socket socket)
{
_socket = socket;
}
/// <inheritdoc cref="Socket.Dispose()"/>
public virtual void Dispose()
=> _socket.Dispose();
/// <inheritdoc cref="Socket.SetSocketOption(SocketOptionLevel, SocketOptionName, bool)"/>
public virtual void SetSocketOption(SocketOptionLevel optionLevel, SocketOptionName optionName, bool optionValue)
=> _socket.SetSocketOption(optionLevel, optionName, optionValue);
}
}

View File

@@ -0,0 +1,42 @@
using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace AMWD.Net.Api.Fritz.CallMonitor.Wrappers
{
/// <inheritdoc cref="TcpClient"/>
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal class TcpClientWrapper : IDisposable
{
private readonly TcpClient _tcpClient;
public TcpClientWrapper()
{
_tcpClient = new TcpClient();
}
/// <inheritdoc cref="TcpClient.Dispose()"/>
public virtual void Dispose()
=> _tcpClient.Dispose();
/// <inheritdoc cref="TcpClient.Client"/>
public virtual SocketWrapper Client => new(_tcpClient.Client);
/// <inheritdoc cref="TcpClient.Connected"/>
public virtual bool Connected => _tcpClient.Connected;
/// <inheritdoc cref="TcpClient.ConnectAsync(string, int)"/>
public virtual Task ConnectAsync(string host, int port)
=> _tcpClient.ConnectAsync(host, port);
/// <inheritdoc cref="TcpClient.GetStream()"/>
public virtual NetworkStreamWrapper GetStream()
=> new(_tcpClient.GetStream());
#if NET6_0_OR_GREATER
public virtual Task ConnectAsync(string host, int port, CancellationToken cancellationToken)
=> _tcpClient.ConnectAsync(host, port, cancellationToken).AsTask();
#endif
}
}

View File

@@ -0,0 +1,18 @@
namespace AMWD.Net.Api.Fritz.CallMonitor.Wrappers
{
/// <summary>
/// Factory for creating instances of <see cref="TcpClientWrapper"/>.
/// </summary>
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
internal class TcpClientWrapperFactory
{
/// <summary>
/// Create a new instance of <see cref="TcpClientWrapper"/>.
/// </summary>
public virtual TcpClientWrapper Create()
{
var client = new TcpClientWrapper();
return client;
}
}
}