< Summary

Information
Class: NostrSure.Infrastructure.Client.Implementation.NostrClient
Assembly: NostrSure.Infrastructure
File(s): /home/runner/work/NostrSure/NostrSure/NostrSure.Infrastructure/Client/Implementation/NostrClient.cs
Line coverage
65%
Covered lines: 123
Uncovered lines: 65
Coverable lines: 188
Total lines: 289
Line coverage: 65.4%
Branch coverage
41%
Covered branches: 43
Total branches: 104
Branch coverage: 41.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)50%1010100%
get_RelayUrl()100%11100%
get_IsConnected()100%22100%
ConnectAsync()45%372064.86%
SubscribeAsync()50%10868.42%
CloseSubscriptionAsync()37.5%11864.7%
PublishAsync()50%7673.33%
StreamAsync()50%10866.66%
SetupConnectionEvents()100%22100%
OnMessageReceived(...)0%5458.33%
OnConnectionError(...)0%2040%
OnConnectionDisconnected(...)0%620%
IsMessageForSubscription(...)33.33%7666.66%
Dispose()100%44100%

File(s)

/home/runner/work/NostrSure/NostrSure/NostrSure.Infrastructure/Client/Implementation/NostrClient.cs

#LineLine coverage
 1using Microsoft.Extensions.Logging;
 2using NostrSure.Domain.Entities;
 3using NostrSure.Infrastructure.Client.Abstractions;
 4using NostrSure.Infrastructure.Client.Messages;
 5using System.Collections.Concurrent;
 6using System.Net.WebSockets;
 7using System.Runtime.CompilerServices;
 8
 9namespace NostrSure.Infrastructure.Client.Implementation;
 10
 11/// <summary>
 12/// Main Nostr client implementation
 13/// </summary>
 14public class NostrClient : INostrClient
 15{
 16    private readonly IWebSocketFactory _webSocketFactory;
 17    private readonly IMessageSerializer _messageSerializer;
 18    private readonly ISubscriptionManager _subscriptionManager;
 19    private readonly IEventDispatcher _eventDispatcher;
 20    private readonly IHealthPolicy _healthPolicy;
 21    private readonly ILogger<NostrClient>? _logger;
 22
 23    private IWebSocketConnection? _connection;
 2824    private readonly ConcurrentQueue<NostrMessage> _messageQueue = new();
 2825    private readonly CancellationTokenSource _cancellationTokenSource = new();
 26    private bool _disposed;
 27
 28    // Events
 29    public event Func<RelayEventMessage, Task>? OnEvent;
 30    public event Func<EoseMessage, Task>? OnEndOfStoredEvents;
 31    public event Func<NoticeMessage, Task>? OnNotice;
 32    public event Func<ClosedMessage, Task>? OnClosed;
 33    public event Func<OkMessage, Task>? OnOk;
 34    public event Func<Exception, Task>? OnError;
 35
 1136    public string? RelayUrl { get; private set; }
 1237    public bool IsConnected => _connection?.State == WebSocketState.Open;
 38
 2839    public NostrClient(
 2840        IWebSocketFactory webSocketFactory,
 2841        IMessageSerializer messageSerializer,
 2842        ISubscriptionManager subscriptionManager,
 2843        IEventDispatcher eventDispatcher,
 2844        IHealthPolicy healthPolicy,
 2845        ILogger<NostrClient>? logger = null)
 2846    {
 2847        _webSocketFactory = webSocketFactory ?? throw new ArgumentNullException(nameof(webSocketFactory));
 2848        _messageSerializer = messageSerializer ?? throw new ArgumentNullException(nameof(messageSerializer));
 2849        _subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager));
 2850        _eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
 2851        _healthPolicy = healthPolicy ?? throw new ArgumentNullException(nameof(healthPolicy));
 2852        _logger = logger;
 53
 54        // Wire up event dispatcher to our events
 2855        _eventDispatcher.OnEvent += (msg) => OnEvent?.Invoke(msg) ?? Task.CompletedTask;
 2956        _eventDispatcher.OnEndOfStoredEvents += (msg) => OnEndOfStoredEvents?.Invoke(msg) ?? Task.CompletedTask;
 2957        _eventDispatcher.OnNotice += (msg) => OnNotice?.Invoke(msg) ?? Task.CompletedTask;
 2858        _eventDispatcher.OnClosed += (msg) => OnClosed?.Invoke(msg) ?? Task.CompletedTask;
 2859        _eventDispatcher.OnOk += (msg) => OnOk?.Invoke(msg) ?? Task.CompletedTask;
 2860    }
 61
 62    public async Task ConnectAsync(string relayUrl, CancellationToken cancellationToken = default)
 1363    {
 1364        if (string.IsNullOrWhiteSpace(relayUrl))
 165            throw new ArgumentException("Relay URL cannot be null or empty", nameof(relayUrl));
 66
 1267        if (!Uri.TryCreate(relayUrl, UriKind.Absolute, out var uri))
 168            throw new ArgumentException("Invalid relay URL", nameof(relayUrl));
 69
 1170        if (uri.Scheme != "ws" && uri.Scheme != "wss")
 171            throw new ArgumentException("Relay URL must use ws:// or wss:// scheme", nameof(relayUrl));
 72
 1073        var combinedToken = CancellationTokenSource
 1074            .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)
 1075            .Token;
 76
 1077        var attempt = 0;
 1078        while (!combinedToken.IsCancellationRequested)
 1079        {
 80            try
 1081            {
 1082                _connection = _webSocketFactory.Create();
 1083                SetupConnectionEvents();
 84
 85                // Set timeout for connection (requirement R1: within 5 seconds)
 1086                using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
 1087                using var timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(
 1088                    combinedToken, timeoutCts.Token);
 89
 1090                await _connection.ConnectAsync(uri, timeoutToken.Token);
 91
 1092                RelayUrl = relayUrl;
 1093                _logger?.LogInformation("Connected to relay: {RelayUrl}", relayUrl);
 94
 1095                return; // Success
 96            }
 097            catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
 098            {
 099                attempt++;
 0100                _logger?.LogWarning(ex, "Connection attempt {Attempt} failed to {RelayUrl}", attempt, relayUrl);
 101
 0102                if (!_healthPolicy.ShouldRetry(attempt))
 0103                {
 0104                    _logger?.LogError("Max connection attempts reached for {RelayUrl}", relayUrl);
 0105                    if (OnError != null)
 0106                        await OnError.Invoke(ex);
 0107                    throw;
 108                }
 109
 0110                await _healthPolicy.DelayAsync(attempt, combinedToken);
 0111            }
 0112        }
 10113    }
 114
 115    public async Task SubscribeAsync(string subscriptionId, Dictionary<string, object> filter,
 116                                   CancellationToken cancellationToken = default)
 5117    {
 5118        if (string.IsNullOrWhiteSpace(subscriptionId))
 0119            throw new ArgumentException("Subscription ID cannot be null or empty", nameof(subscriptionId));
 120
 5121        ArgumentNullException.ThrowIfNull(filter);
 122
 5123        if (!IsConnected)
 1124            throw new InvalidOperationException("Not connected to a relay");
 125
 126        try
 4127        {
 4128            _subscriptionManager.AddSubscription(subscriptionId);
 129
 4130            var reqMessage = new object[] { "REQ", subscriptionId, filter };
 4131            var json = _messageSerializer.Serialize(reqMessage);
 132
 4133            await _connection!.SendAsync(json, cancellationToken);
 4134            _logger?.LogDebug("Sent subscription: {SubscriptionId}", subscriptionId);
 4135        }
 0136        catch (Exception ex)
 0137        {
 0138            _subscriptionManager.RemoveSubscription(subscriptionId);
 0139            _logger?.LogError(ex, "Failed to send subscription: {SubscriptionId}", subscriptionId);
 0140            throw;
 141        }
 4142    }
 143
 144    public async Task CloseSubscriptionAsync(string subscriptionId, CancellationToken cancellationToken = default)
 2145    {
 2146        if (string.IsNullOrWhiteSpace(subscriptionId))
 0147            throw new ArgumentException("Subscription ID cannot be null or empty", nameof(subscriptionId));
 148
 2149        if (!IsConnected)
 0150            throw new InvalidOperationException("Not connected to a relay");
 151
 152        try
 2153        {
 2154            var closeMessage = new object[] { "CLOSE", subscriptionId };
 2155            var json = _messageSerializer.Serialize(closeMessage);
 156
 2157            await _connection!.SendAsync(json, cancellationToken);
 2158            _subscriptionManager.RemoveSubscription(subscriptionId);
 159
 2160            _logger?.LogDebug("Closed subscription: {SubscriptionId}", subscriptionId);
 2161        }
 0162        catch (Exception ex)
 0163        {
 0164            _logger?.LogError(ex, "Failed to close subscription: {SubscriptionId}", subscriptionId);
 0165            throw;
 166        }
 2167    }
 168
 169    public async Task PublishAsync(NostrEvent nostrEvent, CancellationToken cancellationToken = default)
 3170    {
 3171        ArgumentNullException.ThrowIfNull(nostrEvent);
 172
 3173        if (!IsConnected)
 1174            throw new InvalidOperationException("Not connected to a relay");
 175
 176        try
 2177        {
 2178            var eventMessage = new object[] { "EVENT", nostrEvent };
 2179            var json = _messageSerializer.Serialize(eventMessage);
 180
 2181            await _connection!.SendAsync(json, cancellationToken);
 2182            _logger?.LogDebug("Published event: {EventId}", nostrEvent.Id);
 2183        }
 0184        catch (Exception ex)
 0185        {
 0186            _logger?.LogError(ex, "Failed to publish event: {EventId}", nostrEvent.Id);
 0187            throw;
 188        }
 2189    }
 190
 191    public async IAsyncEnumerable<NostrMessage> StreamAsync(string? subscriptionId = null,
 192                                                          [EnumeratorCancellation] CancellationToken cancellationToken =
 1193    {
 1194        var combinedToken = CancellationTokenSource
 1195            .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)
 1196            .Token;
 197
 1198        while (!combinedToken.IsCancellationRequested)
 1199        {
 1200            if (_messageQueue.TryDequeue(out var message))
 1201            {
 202                // Filter by subscription ID if specified
 1203                if (subscriptionId == null || IsMessageForSubscription(message, subscriptionId))
 1204                {
 1205                    yield return message;
 0206                }
 0207            }
 208            else
 0209            {
 210                // Wait a bit before checking again
 0211                await Task.Delay(10, combinedToken);
 0212            }
 0213        }
 1214    }
 215
 216    private void SetupConnectionEvents()
 10217    {
 10218        if (_connection == null) return;
 219
 10220        _connection.MessageReceived += OnMessageReceived;
 10221        _connection.ErrorOccurred += OnConnectionError;
 10222        _connection.Disconnected += OnConnectionDisconnected;
 10223    }
 224
 225    private void OnMessageReceived(object? sender, string json)
 2226    {
 227        try
 2228        {
 2229            var message = _messageSerializer.Deserialize(json);
 2230            _messageQueue.Enqueue(message);
 2231            _eventDispatcher.Dispatch(message);
 2232        }
 0233        catch (Exception ex)
 0234        {
 0235            _logger?.LogError(ex, "Failed to process received message: {Json}", json);
 0236            OnError?.Invoke(ex);
 0237        }
 2238    }
 239
 240    private void OnConnectionError(object? sender, Exception ex)
 0241    {
 0242        _logger?.LogError(ex, "WebSocket connection error");
 0243        OnError?.Invoke(ex);
 0244    }
 245
 246    private void OnConnectionDisconnected(object? sender, EventArgs e)
 0247    {
 0248        _logger?.LogWarning("WebSocket disconnected from {RelayUrl}", RelayUrl);
 249
 250        // Attempt reconnection in background
 0251        _ = Task.Run(async () =>
 0252        {
 0253            if (RelayUrl != null && !_cancellationTokenSource.Token.IsCancellationRequested)
 0254            {
 0255                try
 0256                {
 0257                    await ConnectAsync(RelayUrl, _cancellationTokenSource.Token);
 0258                }
 0259                catch (Exception ex)
 0260                {
 0261                    _logger?.LogError(ex, "Failed to reconnect to {RelayUrl}", RelayUrl);
 0262                    await OnError?.Invoke(ex)!;
 0263                }
 0264            }
 0265        });
 0266    }
 267
 268    private static bool IsMessageForSubscription(NostrMessage message, string subscriptionId)
 1269    {
 1270        return message switch
 1271        {
 0272            RelayEventMessage eventMsg => eventMsg.SubscriptionId == subscriptionId,
 1273            EoseMessage eoseMsg => eoseMsg.SubscriptionId == subscriptionId,
 0274            ClosedMessage closedMsg => closedMsg.SubscriptionId == subscriptionId,
 0275            _ => true // NOTICE, OK messages are global
 1276        };
 1277    }
 278
 279    public void Dispose()
 23280    {
 23281        if (!_disposed)
 23282        {
 23283            _cancellationTokenSource.Cancel();
 23284            _connection?.Dispose();
 23285            _cancellationTokenSource.Dispose();
 23286            _disposed = true;
 23287        }
 23288    }
 289}