using Newtonsoft.Json; using Newtonsoft.Json.Linq; using StackExchange.Redis; using System; using System.Reflection.Metadata.Ecma335; namespace BlazorCanvas.Server.Components.Data; // https://developer.redis.com/develop/dotnet/streams/stream-basics/ public class RedisService : IRedisService { const string STREAM_NAME = "steamie", GROUP_NAME = "groupie", SUB_NAME = "servant"; private NameValueEntry[] arnve = new NameValueEntry[1]; private IConnectionMultiplexer _cache; private IDatabase _database; private ChannelMessageQueue _channel; public RedisService(IConnectionMultiplexer cache) { _cache = cache; } /// /// Init Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// /// Si la connexion a réussi. public bool InitSubscriber() { try { _channel = _cache.GetSubscriber().Subscribe(RedisChannel.Literal(SUB_NAME)); } catch (Exception ex) { Console.WriteLine(ex); return false; } return true; } /// /// Init Streamer - Devrait être plus près de Kafka comme comportement. /// /// Si la connexion a réussi. public async Task InitStreamer() { try { _database = _cache.GetDatabase(); if (!(await _database.KeyExistsAsync(STREAM_NAME)) || (await _database.StreamGroupInfoAsync(STREAM_NAME)).All(x => x.Name != GROUP_NAME)) { await _database.StreamCreateConsumerGroupAsync(STREAM_NAME, GROUP_NAME, "0-0", true); } } catch (Exception ex) { Console.WriteLine(ex); return false; } return true; } public async void Produce(CanvasCommand command) { arnve[0] = new NameValueEntry("command", JsonConvert.SerializeObject(command)); await _database.StreamAddAsync(STREAM_NAME, arnve); } public async Task> Consume() { List lsComm = new(); CanvasCommand? comm; var result = await _database.StreamReadGroupAsync(STREAM_NAME, GROUP_NAME, "commCon", ">"); foreach (var c in result) { await _database.StreamAcknowledgeAsync(STREAM_NAME, GROUP_NAME, c.Id); try { string json = c.Values.FirstOrDefault(x => x.Name == "command").Value.ToString(); comm = JsonConvert.DeserializeObject(json); } catch { continue; } if (comm is not null) lsComm.Add(comm); } return lsComm; } /// /// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// public async Task Subscribe(CancellationToken cToken) { var mess = await _channel.ReadAsync(cToken); CanvasCommand? comm; try { comm = JsonConvert.DeserializeObject(mess.Message); } catch { return null; } if (comm is not null) return comm; return null; } /// /// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire, /// donc les commandes faites avant la souscription ne sont pas copiées. /// /// La commande à publier public async void Publish(CanvasCommand command) { await _cache.GetSubscriber().PublishAsync(_channel.Channel, JsonConvert.SerializeObject(command)); } }