wip to rdkafka and kafka_serde

This commit is contained in:
Bel LaPointe
2023-03-16 17:16:33 -06:00
parent 2b6ff85ecf
commit a41a156d18
3 changed files with 209 additions and 265 deletions

View File

@@ -1,9 +1,5 @@
use crate::config::Engine;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafka::client::{KafkaClient, SecurityConfig};
pub trait InputEngine {
fn get(&mut self) -> Vec<char>;
}
@@ -16,42 +12,16 @@ pub fn build_input_engine(cfg: &Engine) -> Result<Box<dyn InputEngine>, String>
}
}
pub fn build_input_engine_kafka(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
let kafka_cfg = cfg.kafka.as_ref().unwrap();
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_cipher_list("DEFAULT").unwrap();
builder.set_verify(SslVerifyMode::PEER);
builder.set_ca_file(kafka_cfg.crt.to_string()).unwrap();
let mut client = KafkaClient::new_secure(
vec!(kafka_cfg.addr.to_owned()),
SecurityConfig::new(builder.build())
.with_hostname_verification(false),
);
client.load_metadata(&vec![kafka_cfg.topic.to_owned()]).unwrap();
let consumer = Consumer::from_client(client)
.with_topic(kafka_cfg.topic.to_owned())
.with_group(kafka_cfg.consumer_group.to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.with_client_id("breel".to_owned())
.create()
.unwrap();
return Ok(Box::new(InputEngineKafka{
consumer: consumer,
}));
struct InputEngineKafka {
}
struct InputEngineKafka {
consumer: Consumer
pub fn build_input_engine_kafka(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
let kafka_cfg = cfg.kafka.as_ref().unwrap();
return Err("do what".to_string());
}
impl InputEngine for InputEngineKafka {
fn get(&mut self) -> Vec<char> {
for ms in self.consumer.poll().unwrap().iter() {
for m in ms.messages() {
return std::str::from_utf8(m.value).unwrap().to_string().chars().collect();
}
}
Err("end".to_string()).unwrap()
}
}