< Summary

Information
Class: NostrSure.Infrastructure.Client.Implementation.MessageReceiver
Assembly: NostrSure.Infrastructure
File(s): /home/runner/work/NostrSure/NostrSure/NostrSure.Infrastructure/Client/Implementation/MessageReceiver.cs
Line coverage
48%
Covered lines: 52
Uncovered lines: 56
Coverable lines: 108
Total lines: 165
Line coverage: 48.1%
Branch coverage
33%
Covered branches: 19
Total branches: 56
Branch coverage: 33.9%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%210%
.ctor(...)50%88100%
ReceiveAsync()6.25%212168.57%
StartReceivingAsync()50%10866.66%
StopReceivingAsync()50%9658.82%
ReceiveLoopAsync()28.57%491443.47%
Dispose()75%44100%

File(s)

/home/runner/work/NostrSure/NostrSure/NostrSure.Infrastructure/Client/Implementation/MessageReceiver.cs

#LineLine coverage
 1using Microsoft.Extensions.Logging;
 2using Microsoft.Extensions.ObjectPool;
 3using NostrSure.Infrastructure.Client.Abstractions;
 4using System.Buffers;
 5using System.Net.WebSockets;
 6using System.Text;
 7
 8namespace NostrSure.Infrastructure.Client.Implementation;
 9
 10/// <summary>
 11/// Handles WebSocket message reception with background polling capabilities and object pooling for performance
 12/// </summary>
 13public sealed class MessageReceiver : IMessageReceiver
 14{
 015    private static readonly ArrayPool<byte> BufferPool = ArrayPool<byte>.Shared;
 16    private readonly ObjectPool<StringBuilder> _stringBuilderPool;
 17
 18    private readonly ClientWebSocket _webSocket;
 19    private readonly IConnectionErrorHandler _errorHandler;
 20    private readonly IConnectionStateManager _stateManager;
 21    private readonly CancellationTokenSource _receiveCancellation;
 22    private readonly ILogger<MessageReceiver>? _logger;
 23    private Task? _receiveTask;
 24    private bool _disposed;
 25
 526    public MessageReceiver(
 527        ClientWebSocket webSocket,
 528        IConnectionErrorHandler errorHandler,
 529        IConnectionStateManager stateManager,
 530        ObjectPool<StringBuilder> stringBuilderPool,
 531        ILogger<MessageReceiver>? logger = null)
 532    {
 533        _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
 534        _errorHandler = errorHandler ?? throw new ArgumentNullException(nameof(errorHandler));
 535        _stateManager = stateManager ?? throw new ArgumentNullException(nameof(stateManager));
 536        _stringBuilderPool = stringBuilderPool ?? throw new ArgumentNullException(nameof(stringBuilderPool));
 537        _receiveCancellation = new CancellationTokenSource();
 538        _logger = logger;
 539    }
 40
 41    public event EventHandler<string>? MessageReceived;
 42
 43    public async Task<string> ReceiveAsync(CancellationToken cancellationToken = default)
 144    {
 145        if (_webSocket.State != WebSocketState.Open)
 146            throw new InvalidOperationException("WebSocket is not connected");
 47
 048        var buffer = BufferPool.Rent(8192);
 049        var stringBuilder = _stringBuilderPool.Get();
 50
 51        try
 052        {
 053            var combinedToken = CancellationTokenSource
 054                .CreateLinkedTokenSource(cancellationToken, _receiveCancellation.Token)
 055                .Token;
 56
 57            WebSocketReceiveResult receiveResult;
 58            do
 059            {
 060                var segment = new ArraySegment<byte>(buffer);
 061                receiveResult = await _webSocket.ReceiveAsync(segment, combinedToken);
 62
 063                if (receiveResult.MessageType == WebSocketMessageType.Text)
 064                {
 065                    stringBuilder.Append(Encoding.UTF8.GetString(buffer, 0, receiveResult.Count));
 066                }
 067                else if (receiveResult.MessageType == WebSocketMessageType.Close)
 068                {
 069                    _logger?.LogInformation("WebSocket connection closed by remote endpoint");
 070                    _stateManager.UpdateState(WebSocketState.Closed);
 071                    throw new WebSocketException("Connection closed by remote endpoint");
 72                }
 073            } while (!receiveResult.EndOfMessage && !combinedToken.IsCancellationRequested);
 74
 075            var message = stringBuilder.ToString();
 076            _logger?.LogDebug("Received message: {MessageLength} characters", message.Length);
 077            return message;
 78        }
 079        catch (Exception ex)
 080        {
 081            _logger?.LogError(ex, "Error receiving WebSocket message");
 082            await _errorHandler.HandleErrorAsync(ex, nameof(ReceiveAsync));
 083            throw;
 84        }
 85        finally
 086        {
 087            BufferPool.Return(buffer);
 088            _stringBuilderPool.Return(stringBuilder);
 089        }
 090    }
 91
 92    public async Task StartReceivingAsync(CancellationToken cancellationToken = default)
 393    {
 394        if (_receiveTask != null && !_receiveTask.IsCompleted)
 095        {
 096            _logger?.LogWarning("Message receiving is already active");
 097            return;
 98        }
 99
 3100        _logger?.LogDebug("Starting background message receiving");
 3101        _receiveTask = ReceiveLoopAsync(_receiveCancellation.Token);
 3102        await Task.CompletedTask;
 3103    }
 104
 105    public async Task StopReceivingAsync()
 1106    {
 1107        _logger?.LogDebug("Stopping background message receiving");
 1108        _receiveCancellation.Cancel();
 109
 1110        if (_receiveTask != null)
 1111        {
 112            try
 1113            {
 1114                await _receiveTask;
 1115            }
 0116            catch (OperationCanceledException)
 0117            {
 118                // Expected when cancelling
 0119            }
 0120            catch (Exception ex)
 0121            {
 0122                _logger?.LogError(ex, "Error occurred while stopping message receiving");
 0123            }
 1124        }
 1125    }
 126
 127    private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
 3128    {
 3129        _logger?.LogDebug("Message receive loop started");
 130
 131        try
 3132        {
 3133            while (!cancellationToken.IsCancellationRequested &&
 3134                   _webSocket.State == WebSocketState.Open)
 0135            {
 0136                var message = await ReceiveAsync(cancellationToken);
 0137                MessageReceived?.Invoke(this, message);
 0138            }
 3139        }
 0140        catch (OperationCanceledException)
 0141        {
 0142            _logger?.LogDebug("Message receive loop cancelled");
 0143        }
 0144        catch (Exception ex)
 0145        {
 0146            _logger?.LogError(ex, "Error in message receive loop");
 0147            await _errorHandler.HandleErrorAsync(ex, nameof(ReceiveLoopAsync));
 0148        }
 149        finally
 3150        {
 3151            _logger?.LogDebug("Message receive loop ended");
 3152        }
 3153    }
 154
 155    public void Dispose()
 1156    {
 1157        if (!_disposed)
 1158        {
 1159            _logger?.LogDebug("Disposing MessageReceiver");
 1160            _receiveCancellation.Cancel();
 1161            _receiveCancellation.Dispose();
 1162            _disposed = true;
 1163        }
 1164    }
 165}