| | 1 | | using Microsoft.Extensions.Logging; |
| | 2 | | using Microsoft.Extensions.ObjectPool; |
| | 3 | | using NostrSure.Infrastructure.Client.Abstractions; |
| | 4 | | using System.Buffers; |
| | 5 | | using System.Net.WebSockets; |
| | 6 | | using System.Text; |
| | 7 | |
|
| | 8 | | namespace NostrSure.Infrastructure.Client.Implementation; |
| | 9 | |
|
| | 10 | | /// <summary> |
| | 11 | | /// Handles WebSocket message reception with background polling capabilities and object pooling for performance |
| | 12 | | /// </summary> |
| | 13 | | public sealed class MessageReceiver : IMessageReceiver |
| | 14 | | { |
| 0 | 15 | | private static readonly ArrayPool<byte> BufferPool = ArrayPool<byte>.Shared; |
| | 16 | | private readonly ObjectPool<StringBuilder> _stringBuilderPool; |
| | 17 | |
|
| | 18 | | private readonly ClientWebSocket _webSocket; |
| | 19 | | private readonly IConnectionErrorHandler _errorHandler; |
| | 20 | | private readonly IConnectionStateManager _stateManager; |
| | 21 | | private readonly CancellationTokenSource _receiveCancellation; |
| | 22 | | private readonly ILogger<MessageReceiver>? _logger; |
| | 23 | | private Task? _receiveTask; |
| | 24 | | private bool _disposed; |
| | 25 | |
|
| 5 | 26 | | public MessageReceiver( |
| 5 | 27 | | ClientWebSocket webSocket, |
| 5 | 28 | | IConnectionErrorHandler errorHandler, |
| 5 | 29 | | IConnectionStateManager stateManager, |
| 5 | 30 | | ObjectPool<StringBuilder> stringBuilderPool, |
| 5 | 31 | | ILogger<MessageReceiver>? logger = null) |
| 5 | 32 | | { |
| 5 | 33 | | _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); |
| 5 | 34 | | _errorHandler = errorHandler ?? throw new ArgumentNullException(nameof(errorHandler)); |
| 5 | 35 | | _stateManager = stateManager ?? throw new ArgumentNullException(nameof(stateManager)); |
| 5 | 36 | | _stringBuilderPool = stringBuilderPool ?? throw new ArgumentNullException(nameof(stringBuilderPool)); |
| 5 | 37 | | _receiveCancellation = new CancellationTokenSource(); |
| 5 | 38 | | _logger = logger; |
| 5 | 39 | | } |
| | 40 | |
|
| | 41 | | public event EventHandler<string>? MessageReceived; |
| | 42 | |
|
| | 43 | | public async Task<string> ReceiveAsync(CancellationToken cancellationToken = default) |
| 1 | 44 | | { |
| 1 | 45 | | if (_webSocket.State != WebSocketState.Open) |
| 1 | 46 | | throw new InvalidOperationException("WebSocket is not connected"); |
| | 47 | |
|
| 0 | 48 | | var buffer = BufferPool.Rent(8192); |
| 0 | 49 | | var stringBuilder = _stringBuilderPool.Get(); |
| | 50 | |
|
| | 51 | | try |
| 0 | 52 | | { |
| 0 | 53 | | var combinedToken = CancellationTokenSource |
| 0 | 54 | | .CreateLinkedTokenSource(cancellationToken, _receiveCancellation.Token) |
| 0 | 55 | | .Token; |
| | 56 | |
|
| | 57 | | WebSocketReceiveResult receiveResult; |
| | 58 | | do |
| 0 | 59 | | { |
| 0 | 60 | | var segment = new ArraySegment<byte>(buffer); |
| 0 | 61 | | receiveResult = await _webSocket.ReceiveAsync(segment, combinedToken); |
| | 62 | |
|
| 0 | 63 | | if (receiveResult.MessageType == WebSocketMessageType.Text) |
| 0 | 64 | | { |
| 0 | 65 | | stringBuilder.Append(Encoding.UTF8.GetString(buffer, 0, receiveResult.Count)); |
| 0 | 66 | | } |
| 0 | 67 | | else if (receiveResult.MessageType == WebSocketMessageType.Close) |
| 0 | 68 | | { |
| 0 | 69 | | _logger?.LogInformation("WebSocket connection closed by remote endpoint"); |
| 0 | 70 | | _stateManager.UpdateState(WebSocketState.Closed); |
| 0 | 71 | | throw new WebSocketException("Connection closed by remote endpoint"); |
| | 72 | | } |
| 0 | 73 | | } while (!receiveResult.EndOfMessage && !combinedToken.IsCancellationRequested); |
| | 74 | |
|
| 0 | 75 | | var message = stringBuilder.ToString(); |
| 0 | 76 | | _logger?.LogDebug("Received message: {MessageLength} characters", message.Length); |
| 0 | 77 | | return message; |
| | 78 | | } |
| 0 | 79 | | catch (Exception ex) |
| 0 | 80 | | { |
| 0 | 81 | | _logger?.LogError(ex, "Error receiving WebSocket message"); |
| 0 | 82 | | await _errorHandler.HandleErrorAsync(ex, nameof(ReceiveAsync)); |
| 0 | 83 | | throw; |
| | 84 | | } |
| | 85 | | finally |
| 0 | 86 | | { |
| 0 | 87 | | BufferPool.Return(buffer); |
| 0 | 88 | | _stringBuilderPool.Return(stringBuilder); |
| 0 | 89 | | } |
| 0 | 90 | | } |
| | 91 | |
|
| | 92 | | public async Task StartReceivingAsync(CancellationToken cancellationToken = default) |
| 3 | 93 | | { |
| 3 | 94 | | if (_receiveTask != null && !_receiveTask.IsCompleted) |
| 0 | 95 | | { |
| 0 | 96 | | _logger?.LogWarning("Message receiving is already active"); |
| 0 | 97 | | return; |
| | 98 | | } |
| | 99 | |
|
| 3 | 100 | | _logger?.LogDebug("Starting background message receiving"); |
| 3 | 101 | | _receiveTask = ReceiveLoopAsync(_receiveCancellation.Token); |
| 3 | 102 | | await Task.CompletedTask; |
| 3 | 103 | | } |
| | 104 | |
|
| | 105 | | public async Task StopReceivingAsync() |
| 1 | 106 | | { |
| 1 | 107 | | _logger?.LogDebug("Stopping background message receiving"); |
| 1 | 108 | | _receiveCancellation.Cancel(); |
| | 109 | |
|
| 1 | 110 | | if (_receiveTask != null) |
| 1 | 111 | | { |
| | 112 | | try |
| 1 | 113 | | { |
| 1 | 114 | | await _receiveTask; |
| 1 | 115 | | } |
| 0 | 116 | | catch (OperationCanceledException) |
| 0 | 117 | | { |
| | 118 | | // Expected when cancelling |
| 0 | 119 | | } |
| 0 | 120 | | catch (Exception ex) |
| 0 | 121 | | { |
| 0 | 122 | | _logger?.LogError(ex, "Error occurred while stopping message receiving"); |
| 0 | 123 | | } |
| 1 | 124 | | } |
| 1 | 125 | | } |
| | 126 | |
|
| | 127 | | private async Task ReceiveLoopAsync(CancellationToken cancellationToken) |
| 3 | 128 | | { |
| 3 | 129 | | _logger?.LogDebug("Message receive loop started"); |
| | 130 | |
|
| | 131 | | try |
| 3 | 132 | | { |
| 3 | 133 | | while (!cancellationToken.IsCancellationRequested && |
| 3 | 134 | | _webSocket.State == WebSocketState.Open) |
| 0 | 135 | | { |
| 0 | 136 | | var message = await ReceiveAsync(cancellationToken); |
| 0 | 137 | | MessageReceived?.Invoke(this, message); |
| 0 | 138 | | } |
| 3 | 139 | | } |
| 0 | 140 | | catch (OperationCanceledException) |
| 0 | 141 | | { |
| 0 | 142 | | _logger?.LogDebug("Message receive loop cancelled"); |
| 0 | 143 | | } |
| 0 | 144 | | catch (Exception ex) |
| 0 | 145 | | { |
| 0 | 146 | | _logger?.LogError(ex, "Error in message receive loop"); |
| 0 | 147 | | await _errorHandler.HandleErrorAsync(ex, nameof(ReceiveLoopAsync)); |
| 0 | 148 | | } |
| | 149 | | finally |
| 3 | 150 | | { |
| 3 | 151 | | _logger?.LogDebug("Message receive loop ended"); |
| 3 | 152 | | } |
| 3 | 153 | | } |
| | 154 | |
|
| | 155 | | public void Dispose() |
| 1 | 156 | | { |
| 1 | 157 | | if (!_disposed) |
| 1 | 158 | | { |
| 1 | 159 | | _logger?.LogDebug("Disposing MessageReceiver"); |
| 1 | 160 | | _receiveCancellation.Cancel(); |
| 1 | 161 | | _receiveCancellation.Dispose(); |
| 1 | 162 | | _disposed = true; |
| 1 | 163 | | } |
| 1 | 164 | | } |
| | 165 | | } |