La Métamorphose
This commit is contained in:
89
dotnet/gregorsamsa_consumer/Program.cs
Normal file
89
dotnet/gregorsamsa_consumer/Program.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using Confluent.Kafka;
|
||||
class Program
|
||||
{
|
||||
|
||||
static public List<IConsumer<Ignore, string>> buildConsumers(string[] arStr) {
|
||||
List<IConsumer<Ignore, string>> arCon = new();
|
||||
|
||||
var config = new ConsumerConfig
|
||||
{
|
||||
BootstrapServers = "localhost:9092",
|
||||
GroupId = "test-group",
|
||||
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||
};
|
||||
foreach (var str in arStr) {
|
||||
var con = new ConsumerBuilder<Ignore, string>(config)
|
||||
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
|
||||
.SetPartitionsAssignedHandler((c, partitions) =>
|
||||
{
|
||||
Console.WriteLine($"Assigned partitions: {string.Join(", ", partitions)}");
|
||||
// return (IEnumerable<TopicPartitionOffset>)partitions;
|
||||
})
|
||||
.SetPartitionsRevokedHandler((c, partitions) =>
|
||||
{
|
||||
Console.WriteLine($"Revoked partitions: {string.Join(", ", partitions)}");
|
||||
}).Build();
|
||||
|
||||
con.Subscribe(str);
|
||||
|
||||
arCon.Add(con);
|
||||
}
|
||||
return arCon;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void Main(string[] args)
|
||||
{
|
||||
var config = new ConsumerConfig
|
||||
{
|
||||
BootstrapServers = "localhost:9092",
|
||||
GroupId = "test-group",
|
||||
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||
};
|
||||
var topics = new string[] { "test-topic" };
|
||||
var cancellationTokenSource = new CancellationTokenSource();
|
||||
var consumers = buildConsumers(topics);
|
||||
Console.WriteLine($"Starting {consumers.Count} consumers...");
|
||||
try
|
||||
{
|
||||
foreach (var consumer in consumers)
|
||||
{
|
||||
var thread = new Thread(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!cancellationTokenSource.Token.IsCancellationRequested)
|
||||
{
|
||||
var message = consumer.Consume(cancellationTokenSource.Token);
|
||||
Console.WriteLine($"Received message: {message.Value}, Partition: {message.Partition}, Offset: {message.Offset}");
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Console.WriteLine($"Consumer thread canceled.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
consumer.Close();
|
||||
}
|
||||
});
|
||||
thread.Start();
|
||||
}
|
||||
Console.WriteLine("Press any key to stop...");
|
||||
Console.ReadKey();
|
||||
cancellationTokenSource.Cancel();
|
||||
foreach (var consumer in consumers)
|
||||
{
|
||||
consumer.Close();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Error: {ex.Message}");
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user