90 lines
2.8 KiB
C#
90 lines
2.8 KiB
C#
using System;
|
|
using System.Threading;
|
|
using Confluent.Kafka;
|
|
class Program
|
|
{
|
|
|
|
static public List<IConsumer<Ignore, string>> buildConsumers(string[] arStr) {
|
|
List<IConsumer<Ignore, string>> arCon = new();
|
|
|
|
var config = new ConsumerConfig
|
|
{
|
|
BootstrapServers = "localhost:9092",
|
|
GroupId = "test-group",
|
|
AutoOffsetReset = AutoOffsetReset.Earliest
|
|
};
|
|
foreach (var str in arStr) {
|
|
var con = new ConsumerBuilder<Ignore, string>(config)
|
|
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
|
|
.SetPartitionsAssignedHandler((c, partitions) =>
|
|
{
|
|
Console.WriteLine($"Assigned partitions: {string.Join(", ", partitions)}");
|
|
// return (IEnumerable<TopicPartitionOffset>)partitions;
|
|
})
|
|
.SetPartitionsRevokedHandler((c, partitions) =>
|
|
{
|
|
Console.WriteLine($"Revoked partitions: {string.Join(", ", partitions)}");
|
|
}).Build();
|
|
|
|
con.Subscribe(str);
|
|
|
|
arCon.Add(con);
|
|
}
|
|
return arCon;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void Main(string[] args)
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
BootstrapServers = "localhost:9092",
|
|
GroupId = "test-group",
|
|
AutoOffsetReset = AutoOffsetReset.Earliest
|
|
};
|
|
var topics = new string[] { "test-topic" };
|
|
var cancellationTokenSource = new CancellationTokenSource();
|
|
var consumers = buildConsumers(topics);
|
|
Console.WriteLine($"Starting {consumers.Count} consumers...");
|
|
try
|
|
{
|
|
foreach (var consumer in consumers)
|
|
{
|
|
var thread = new Thread(() =>
|
|
{
|
|
try
|
|
{
|
|
while (!cancellationTokenSource.Token.IsCancellationRequested)
|
|
{
|
|
var message = consumer.Consume(cancellationTokenSource.Token);
|
|
Console.WriteLine($"Received message: {message.Value}, Partition: {message.Partition}, Offset: {message.Offset}");
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Console.WriteLine($"Consumer thread canceled.");
|
|
}
|
|
finally
|
|
{
|
|
consumer.Close();
|
|
}
|
|
});
|
|
thread.Start();
|
|
}
|
|
Console.WriteLine("Press any key to stop...");
|
|
Console.ReadKey();
|
|
cancellationTokenSource.Cancel();
|
|
foreach (var consumer in consumers)
|
|
{
|
|
consumer.Close();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"Error: {ex.Message}");
|
|
}
|
|
}
|
|
}
|