use futures::stream::StreamExt; // 引入 StreamExt 以使用 next() 方法 use rdkafka::config::ClientConfig; use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer}; use rdkafka::error::KafkaResult; use rdkafka::message::{Message}; async fn run_consumer() -> KafkaResult<()> { let consumer: StreamConsumer = ClientConfig::new() .set("group.id", "test_group") .set("bootstrap.servers", "localhost:9092") .set("enable.auto.commit", "true") .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .create() .expect("Consumer creation failed"); consumer.subscribe(&["test_topic"]).expect("Can't subscribe to specified topics"); let mut message_stream = consumer.stream(); while let Some(message) = message_stream.next().await { match message { Ok(m) => { match m.payload_view::() { Some(Ok(payload)) => { println!("Key: '{:?}', Payload: '{}'", m.key(), payload); } Some(Err(e)) => { eprintln!("Error while deserializing message payload: {:?}", e); } None => { println!("Key: '{:?}', Payload: ", m.key()); } } consumer.commit_message(&m, CommitMode::Async)?; } Err(e) => eprintln!("Kafka error: {}", e), } } Ok(()) } fn main() { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(run_consumer()).unwrap(); }
[dependencies]
rdkafka = "0.36.2"
tokio = { version = "1.36.0", features = ["full"] }
futures = "0.3.30"
[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"