using Newtonsoft.Json; using Newtonsoft.Json.Linq; using StackExchange.Redis; using System; using System.Reflection.Metadata.Ecma335; namespace BlazorCanvas.Server.Components.Data; // https://developer.redis.com/develop/dotnet/streams/stream-basics/ public class RedisService : IRedisService { private const string STREAM_NAME = "steamie", GROUP_NAME = "groupie", SUB_NAME = "servant"; private string _lastId = "0-0"; private NameValueEntry[] arNve = new NameValueEntry[1]; private IConnectionMultiplexer _cache; private IDatabase _database; private ChannelMessageQueue _channel; public RedisService(IConnectionMultiplexer cache) { _cache = cache; } /// /// Init Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// /// Si la connexion a réussi. public bool InitSubscriber() { try { _channel = _cache.GetSubscriber().Subscribe(RedisChannel.Literal(SUB_NAME)); } catch { return false; } if (_channel is null) return false; return true; } /// /// Init Streamer - Devrait être plus près de Kafka comme comportement. /// /// Si la connexion a réussi. public async Task InitStreamer() { try { _database = _cache.GetDatabase(); } catch { return false; } if (_database is null) return false; return true; } public async void Produce(CanvasCommand command) { arNve[0] = new NameValueEntry("command", JsonConvert.SerializeObject(command)); _database.StreamAddAsync(STREAM_NAME, arNve); } public async Task> Consume() { List lsComm = new(); CanvasCommand? comm; var result = await _database.StreamReadAsync(STREAM_NAME, _lastId, 100); string json = ""; bool ok = false; foreach (var c in result) { try { json = c.Values.FirstOrDefault(x => x.Name == "command").Value.ToString(); comm = JsonConvert.DeserializeObject(json); } catch { Console.WriteLine($"OH NO {json}"); continue; } if (comm is not null) lsComm.Add(comm); if (!ok) ok = true; } if (ok) _lastId = result.LastOrDefault().Id.ToString()?? "0-0"; return lsComm; } /// /// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// public async Task Subscribe(CancellationToken cToken) { var mess = await _channel.ReadAsync(cToken); CanvasCommand? comm; try { comm = JsonConvert.DeserializeObject(mess.Message); } catch { return null; } if (comm is not null) return comm; return null; } /// /// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// /// La commande à publier public async void Publish(CanvasCommand command) { await _cache.GetSubscriber().PublishAsync(_channel.Channel, JsonConvert.SerializeObject(command)); } }