doesnt support kafka 0.10.0 that i need for auth

master
Bel LaPointe 2023-03-16 17:12:25 -06:00
parent 8f5097d7b3
commit 2b6ff85ecf
6 changed files with 90 additions and 12 deletions

1
Cargo.lock generated
View File

@ -282,6 +282,7 @@ name = "rusty-pipe"
version = "0.1.0"
dependencies = [
"kafka",
"openssl",
"serde",
"serde_yaml",
]

View File

@ -9,3 +9,4 @@ edition = "2021"
serde = { version = "1.0.156", features = ["derive"] }
serde_yaml = "0.9.19"
kafka = "0.9.0"
openssl = {}

View File

@ -27,8 +27,10 @@ pub struct Engine {
#[derive(Serialize, Deserialize, Debug)]
pub struct Kafka {
topic: String,
consumer_group: String,
pub addr: String,
pub topic: String,
pub consumer_group: String,
pub crt: String,
}
pub fn build_config() -> Result<Config, String> {

View File

@ -1,7 +1,11 @@
use crate::config::Engine;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafka::client::{KafkaClient, SecurityConfig};
pub trait InputEngine {
fn get(&self) -> Vec<char>;
fn get(&mut self) -> Vec<char>;
}
pub fn build_input_engine(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
@ -12,22 +16,50 @@ pub fn build_input_engine(cfg: &Engine) -> Result<Box<dyn InputEngine>, String>
}
}
pub fn build_input_engine_kafka(_cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
return Err("not impl".to_string());
pub fn build_input_engine_kafka(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
let kafka_cfg = cfg.kafka.as_ref().unwrap();
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_cipher_list("DEFAULT").unwrap();
builder.set_verify(SslVerifyMode::PEER);
builder.set_ca_file(kafka_cfg.crt.to_string()).unwrap();
let mut client = KafkaClient::new_secure(
vec!(kafka_cfg.addr.to_owned()),
SecurityConfig::new(builder.build())
.with_hostname_verification(false),
);
client.load_metadata(&vec![kafka_cfg.topic.to_owned()]).unwrap();
let consumer = Consumer::from_client(client)
.with_topic(kafka_cfg.topic.to_owned())
.with_group(kafka_cfg.consumer_group.to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.with_client_id("breel".to_owned())
.create()
.unwrap();
return Ok(Box::new(InputEngineKafka{
consumer: consumer,
}));
}
struct InputEngineKafka {}
struct InputEngineKafka {
consumer: Consumer
}
impl InputEngine for InputEngineKafka {
fn get(&self) -> Vec<char> {
Err("not impl".to_string()).unwrap()
fn get(&mut self) -> Vec<char> {
for ms in self.consumer.poll().unwrap().iter() {
for m in ms.messages() {
return std::str::from_utf8(m.value).unwrap().to_string().chars().collect();
}
}
Err("end".to_string()).unwrap()
}
}
struct InputEngineStdin {}
impl InputEngine for InputEngineStdin {
fn get(&self) -> Vec<char> {
fn get(&mut self) -> Vec<char> {
let stdin = std::io::stdin();
let mut result = String::new();
stdin.read_line(&mut result).unwrap();
@ -41,7 +73,7 @@ mod test_input {
struct InputEngineTest {}
impl InputEngine for InputEngineTest {
fn get(&self) -> Vec<char> {
fn get(&mut self) -> Vec<char> {
return "hello world".chars().collect();
}
}

View File

@ -3,8 +3,10 @@ streams:
engine:
name: kafka
kafka:
topic: xyz
consumer_group: uvw
topic: qmp.atlas.record
consumer_group: breel
addr: core-kafka.service.consul:9192
crt: ./src/testdata/qualtrics_root_ca.b1-prv.crt
output:
engine:
name: stdout

View File

@ -0,0 +1,40 @@
-----BEGIN CERTIFICATE-----
MIIDQzCCAiugAwIBAgIUOF/uFrh6DI/BWeZB2p6uRHXXzo0wDQYJKoZIhvcNAQEL
BQAwKTEnMCUGA1UEAxMeUXVhbHRyaWNzIFN0YWdpbmcgUm9vdCBDQSAyMDE5MB4X
DTE5MTEyNTE5NTk0OFoXDTI5MTEyMjIwMDAxOFowKTEnMCUGA1UEAxMeUXVhbHRy
aWNzIFN0YWdpbmcgUm9vdCBDQSAyMDE5MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAy6nAAmO1BJ1WR71oPKKi3jwRAdGsbzHczYonQzUHGUZJrGLUlj9m
ihsDokXBNhKgABFbVFaaxXBtR0FafAYyserA2oEgsRN9xAN4EwNbKurkIFN+zUPC
M02fGL6ofGq+eo4At0L/LwImOcf9kBA7XAZc0oioegPmlS7LTqGQsVMzB6H/gjwZ
viAddelZmZKXPIwnuqnmLfeTNa3hrkKjTf/qDxzE0xdCAY5iknS3hxKsDn1mTeqk
pc+fEEnCk9n6XBsd89y9HFGT2nzFwyY4ejtGVFnOZ1ll2Y7qSHGFn8NjbSjQ33YW
NTucpfK/7zs5v2BtCKVMQd+NV+cOu0luAQIDAQABo2MwYTAOBgNVHQ8BAf8EBAMC
AQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUXSfqwhbIhEEHgCwYeSm25pPB
q5cwHwYDVR0jBBgwFoAUXSfqwhbIhEEHgCwYeSm25pPBq5cwDQYJKoZIhvcNAQEL
BQADggEBALYdO4s+or5FoOgRgNhppqFLsczCmYAwvfq43mpJ4+SkHkV0pIgzenFy
jAuvNUxVD+r+vBLrkwgXqOlvvEuMl0bVtu+mGYXdKQm/JpJ6UOOyisnP53EXICBd
rvcdC/DNSmY8nWE+KIlr0nuu6MQZWdOI7xCMqrsIx1EQ0XfGDY+aZO7Ir1kbpbA4
ey03rKxWMFPh1LJSkPD+xKd+ykzZH1EtBQFNbA3SSvCyovBOsW8twgpw3dpf6jVc
JbRqS0ZuNKzloaVuiGptB4L0mort9tWffPw9Xezfk3P45t6BSRTsEKU8dVDxfBb0
n5ta1krSRh4BuqztbvsVNvsK8kfewsg=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDSTCCAjGgAwIBAgIUfkUZZIdZ9jQLs/S4OYuqpnDPVUMwDQYJKoZIhvcNAQEL
BQAwLDEqMCgGA1UEAxMhUXVhbHRyaWNzIFByb2R1Y3Rpb24gUm9vdCBDQSAyMDE5
MB4XDTE5MTIwMjIzNTAxMFoXDTI5MTEyOTIzNTA0MFowLDEqMCgGA1UEAxMhUXVh
bHRyaWNzIFByb2R1Y3Rpb24gUm9vdCBDQSAyMDE5MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAvkrICcFOfTGsjuurVevZh8QrGJh2QeICf2Y2vvIhc1BV
JI8PQ6o3cj3mkwCX6WNHnc+I9uMYsjREPLecM8rB6UPL11eFwTgC+BVcQ1SmxcjF
EDr4rlYGnbD6268wd628tqpGu9rGTT9sc+PgegE791xxS3sEgcWLxQM8it7VCXCy
GmQqVkQhcUMpJJ9A/0JPCJgSu5V2DD67Cw8jqtT2AG1uwXcfgR0sg1alpuld1IED
VVu/qCzeddui3JaU+G2eyg3oDip/uW0y5x6eA18M4TQoxq76JNBweNVpVS7PYiO/
c7embXQ8YjR7CHumAzDjFcrUkOebmLZdubWHrAFuXwIDAQABo2MwYTAOBgNVHQ8B
Af8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUU4fm8rplaOLntfKV
OjPK0jUNwPUwHwYDVR0jBBgwFoAUU4fm8rplaOLntfKVOjPK0jUNwPUwDQYJKoZI
hvcNAQELBQADggEBAFZ8w9ZgX+tZyaGJNuZo9MRn3dCy1trlzBqVapgx1tDMxf56
2kxen+m4rF6wDf7KcfcmAgn1AZ4ExrqyLokp6bz4Dkv7F6QzYyStZ2rCeM1P7+UK
9vXaf17uc6EAvx/VMKyPnZZQGdMrCqU1wolc345L+UmxHCf2VUgm6OMWlB8Xk/oK
10c1LSgGQLagmmcjxGpWKsxHtnbaaEONalnUZkty72E2/YfkstzQscykMDBNrXYI
6C99R7R9zsso+JanwPB3xEElVGIS7veuXpHqCzslPDohSXNLQHc7Anl5eI85Svqo
4iWAdm6ESyr/LCqyHH8iHTvdkTqlvxB9/rJC+F8=
-----END CERTIFICATE-----