1
0
Files
Blazor_Canvas/BlazorCanvas/BlazorCanvas.Server/Components/Data/RedisService.cs
MarcEricMartel 93450a9f49 OH YEAH.
2023-11-19 10:54:37 -05:00

109 lines
3.6 KiB
C#

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System;
using System.Reflection.Metadata.Ecma335;
namespace BlazorCanvas.Server.Components.Data;
// https://developer.redis.com/develop/dotnet/streams/stream-basics/
public class RedisService : IRedisService {
private const string STREAM_NAME = "steamie",
GROUP_NAME = "groupie",
SUB_NAME = "servant";
private string _lastId = "0-0";
private NameValueEntry[] arNve = new NameValueEntry[1];
private IConnectionMultiplexer _cache;
private IDatabase _database;
private ChannelMessageQueue _channel;
public RedisService(IConnectionMultiplexer cache) { _cache = cache; }
/// <summary>
/// Init Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire,
/// donc les commandes faites avant la souscription ne sont pas copiées.
/// </summary>
/// <returns>Si la connexion a réussi.</returns>
public bool InitSubscriber() {
try {
_channel = _cache.GetSubscriber().Subscribe(RedisChannel.Literal(SUB_NAME));
} catch {
return false;
}
if (_channel is null)
return false;
return true;
}
/// <summary>
/// Init Streamer - Devrait être plus près de Kafka comme comportement.
/// </summary>
/// <returns>Si la connexion a réussi.</returns>
public async Task<bool> InitStreamer() {
try {
_database = _cache.GetDatabase();
} catch {
return false;
}
if (_database is null)
return false;
return true;
}
public async void Produce(CanvasCommand command) {
arNve[0] = new NameValueEntry("command", JsonConvert.SerializeObject(command));
_database.StreamAddAsync(STREAM_NAME, arNve);
}
public async Task<IEnumerable<CanvasCommand>> Consume() {
List<CanvasCommand> lsComm = new();
CanvasCommand? comm;
var result = await _database.StreamReadAsync(STREAM_NAME, _lastId, 100);
string json = "";
bool ok = false;
foreach (var c in result) {
try {
json = c.Values.FirstOrDefault(x => x.Name == "command").Value.ToString();
comm = JsonConvert.DeserializeObject<CanvasCommand>(json);
} catch {
Console.WriteLine($"OH NO {json}");
continue; }
if (comm is not null)
lsComm.Add(comm);
if (!ok)
ok = true;
}
if (ok)
_lastId = result.LastOrDefault().Id.ToString()?? "0-0";
return lsComm;
}
/// <summary>
/// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire,
/// donc les commandes faites avant la souscription ne sont pas copiées.
/// </summary>
public async Task<CanvasCommand?> Subscribe(CancellationToken cToken) {
var mess = await _channel.ReadAsync(cToken);
CanvasCommand? comm;
try {
comm = JsonConvert.DeserializeObject<CanvasCommand>(mess.Message);
} catch { return null; }
if (comm is not null)
return comm;
return null;
}
/// <summary>
/// Version Pub/Sub - Redis en mode Pub/Sub ne garde pas ses messages en mémoire,
/// donc les commandes faites avant la souscription ne sont pas copiées.
/// </summary>
/// <param name="command">La commande à publier</param>
public async void Publish(CanvasCommand command) {
await _cache.GetSubscriber().PublishAsync(_channel.Channel, JsonConvert.SerializeObject(command));
}
}