using System; using System.Threading; using Confluent.Kafka; class Program { static public List> buildConsumers(string[] arStr) { List> arCon = new(); var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "test-group", AutoOffsetReset = AutoOffsetReset.Earliest }; foreach (var str in arStr) { var con = new ConsumerBuilder(config) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .SetPartitionsAssignedHandler((c, partitions) => { Console.WriteLine($"Assigned partitions: {string.Join(", ", partitions)}"); // return (IEnumerable)partitions; }) .SetPartitionsRevokedHandler((c, partitions) => { Console.WriteLine($"Revoked partitions: {string.Join(", ", partitions)}"); }).Build(); con.Subscribe(str); arCon.Add(con); } return arCon; } static void Main(string[] args) { var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "test-group", AutoOffsetReset = AutoOffsetReset.Earliest }; var topics = new string[] { "test-topic" }; var cancellationTokenSource = new CancellationTokenSource(); var consumers = buildConsumers(topics); Console.WriteLine($"Starting {consumers.Count} consumers..."); try { foreach (var consumer in consumers) { var thread = new Thread(() => { try { while (!cancellationTokenSource.Token.IsCancellationRequested) { var message = consumer.Consume(cancellationTokenSource.Token); Console.WriteLine($"Received message: {message.Value}, Partition: {message.Partition}, Offset: {message.Offset}"); } } catch (OperationCanceledException) { Console.WriteLine($"Consumer thread canceled."); } finally { consumer.Close(); } }); thread.Start(); } Console.WriteLine("Press any key to stop..."); Console.ReadKey(); cancellationTokenSource.Cancel(); foreach (var consumer in consumers) { consumer.Close(); } } catch (Exception ex) { Console.WriteLine($"Error: {ex.Message}"); } } }