Exemple_Kafka/dotnet/gregorsamsa_consumer/Program.cs

90 lines
2.8 KiB
C#

using System;
using System.Threading;
using Confluent.Kafka;
class Program
{
static public List<IConsumer<Ignore, string>> buildConsumers(string[] arStr) {
List<IConsumer<Ignore, string>> arCon = new();
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
foreach (var str in arStr) {
var con = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: {string.Join(", ", partitions)}");
// return (IEnumerable<TopicPartitionOffset>)partitions;
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoked partitions: {string.Join(", ", partitions)}");
}).Build();
con.Subscribe(str);
arCon.Add(con);
}
return arCon;
}
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var topics = new string[] { "test-topic" };
var cancellationTokenSource = new CancellationTokenSource();
var consumers = buildConsumers(topics);
Console.WriteLine($"Starting {consumers.Count} consumers...");
try
{
foreach (var consumer in consumers)
{
var thread = new Thread(() =>
{
try
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var message = consumer.Consume(cancellationTokenSource.Token);
Console.WriteLine($"Received message: {message.Value}, Partition: {message.Partition}, Offset: {message.Offset}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"Consumer thread canceled.");
}
finally
{
consumer.Close();
}
});
thread.Start();
}
Console.WriteLine("Press any key to stop...");
Console.ReadKey();
cancellationTokenSource.Cancel();
foreach (var consumer in consumers)
{
consumer.Close();
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}