From 2b6ff85ecfaed87ad6ac75ee2f6af027fc80433a Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Thu, 16 Mar 2023 17:12:25 -0600 Subject: [PATCH] doesnt support kafka 0.10.0 that i need for auth --- Cargo.lock | 1 + Cargo.toml | 1 + src/config.rs | 6 ++- src/engine.rs | 48 +++++++++++++++++++---- src/testdata/config-kafka-to-stdout.yaml | 6 ++- src/testdata/qualtrics_root_ca.b1-prv.crt | 40 +++++++++++++++++++ 6 files changed, 90 insertions(+), 12 deletions(-) create mode 100644 src/testdata/qualtrics_root_ca.b1-prv.crt diff --git a/Cargo.lock b/Cargo.lock index 3877f9b..16cb054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,6 +282,7 @@ name = "rusty-pipe" version = "0.1.0" dependencies = [ "kafka", + "openssl", "serde", "serde_yaml", ] diff --git a/Cargo.toml b/Cargo.toml index 38591f6..4853559 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" serde = { version = "1.0.156", features = ["derive"] } serde_yaml = "0.9.19" kafka = "0.9.0" +openssl = {} diff --git a/src/config.rs b/src/config.rs index 5fea896..8be004f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,8 +27,10 @@ pub struct Engine { #[derive(Serialize, Deserialize, Debug)] pub struct Kafka { - topic: String, - consumer_group: String, + pub addr: String, + pub topic: String, + pub consumer_group: String, + pub crt: String, } pub fn build_config() -> Result { diff --git a/src/engine.rs b/src/engine.rs index 92abe61..2937b66 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,7 +1,11 @@ use crate::config::Engine; +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; +use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage}; +use kafka::client::{KafkaClient, SecurityConfig}; + pub trait InputEngine { - fn get(&self) -> Vec; + fn get(&mut self) -> Vec; } pub fn build_input_engine(cfg: &Engine) -> Result, String> { @@ -12,22 +16,50 @@ pub fn build_input_engine(cfg: &Engine) -> Result, String> } } -pub fn build_input_engine_kafka(_cfg: &Engine) -> Result, String> { - return Err("not impl".to_string()); +pub fn build_input_engine_kafka(cfg: &Engine) -> Result, String> { + let kafka_cfg = cfg.kafka.as_ref().unwrap(); + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_cipher_list("DEFAULT").unwrap(); + builder.set_verify(SslVerifyMode::PEER); + builder.set_ca_file(kafka_cfg.crt.to_string()).unwrap(); + let mut client = KafkaClient::new_secure( + vec!(kafka_cfg.addr.to_owned()), + SecurityConfig::new(builder.build()) + .with_hostname_verification(false), + ); + client.load_metadata(&vec![kafka_cfg.topic.to_owned()]).unwrap(); + let consumer = Consumer::from_client(client) + .with_topic(kafka_cfg.topic.to_owned()) + .with_group(kafka_cfg.consumer_group.to_owned()) + .with_fallback_offset(FetchOffset::Earliest) + .with_offset_storage(GroupOffsetStorage::Kafka) + .with_client_id("breel".to_owned()) + .create() + .unwrap(); + return Ok(Box::new(InputEngineKafka{ + consumer: consumer, + })); } -struct InputEngineKafka {} +struct InputEngineKafka { + consumer: Consumer +} impl InputEngine for InputEngineKafka { - fn get(&self) -> Vec { - Err("not impl".to_string()).unwrap() + fn get(&mut self) -> Vec { + for ms in self.consumer.poll().unwrap().iter() { + for m in ms.messages() { + return std::str::from_utf8(m.value).unwrap().to_string().chars().collect(); + } + } + Err("end".to_string()).unwrap() } } struct InputEngineStdin {} impl InputEngine for InputEngineStdin { - fn get(&self) -> Vec { + fn get(&mut self) -> Vec { let stdin = std::io::stdin(); let mut result = String::new(); stdin.read_line(&mut result).unwrap(); @@ -41,7 +73,7 @@ mod test_input { struct InputEngineTest {} impl InputEngine for InputEngineTest { - fn get(&self) -> Vec { + fn get(&mut self) -> Vec { return "hello world".chars().collect(); } } diff --git a/src/testdata/config-kafka-to-stdout.yaml b/src/testdata/config-kafka-to-stdout.yaml index 9405f0c..e7e780f 100644 --- a/src/testdata/config-kafka-to-stdout.yaml +++ b/src/testdata/config-kafka-to-stdout.yaml @@ -3,8 +3,10 @@ streams: engine: name: kafka kafka: - topic: xyz - consumer_group: uvw + topic: qmp.atlas.record + consumer_group: breel + addr: core-kafka.service.consul:9192 + crt: ./src/testdata/qualtrics_root_ca.b1-prv.crt output: engine: name: stdout diff --git a/src/testdata/qualtrics_root_ca.b1-prv.crt b/src/testdata/qualtrics_root_ca.b1-prv.crt new file mode 100644 index 0000000..8713760 --- /dev/null +++ b/src/testdata/qualtrics_root_ca.b1-prv.crt @@ -0,0 +1,40 @@ +-----BEGIN CERTIFICATE----- +MIIDQzCCAiugAwIBAgIUOF/uFrh6DI/BWeZB2p6uRHXXzo0wDQYJKoZIhvcNAQEL +BQAwKTEnMCUGA1UEAxMeUXVhbHRyaWNzIFN0YWdpbmcgUm9vdCBDQSAyMDE5MB4X +DTE5MTEyNTE5NTk0OFoXDTI5MTEyMjIwMDAxOFowKTEnMCUGA1UEAxMeUXVhbHRy +aWNzIFN0YWdpbmcgUm9vdCBDQSAyMDE5MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAy6nAAmO1BJ1WR71oPKKi3jwRAdGsbzHczYonQzUHGUZJrGLUlj9m +ihsDokXBNhKgABFbVFaaxXBtR0FafAYyserA2oEgsRN9xAN4EwNbKurkIFN+zUPC +M02fGL6ofGq+eo4At0L/LwImOcf9kBA7XAZc0oioegPmlS7LTqGQsVMzB6H/gjwZ +viAddelZmZKXPIwnuqnmLfeTNa3hrkKjTf/qDxzE0xdCAY5iknS3hxKsDn1mTeqk +pc+fEEnCk9n6XBsd89y9HFGT2nzFwyY4ejtGVFnOZ1ll2Y7qSHGFn8NjbSjQ33YW +NTucpfK/7zs5v2BtCKVMQd+NV+cOu0luAQIDAQABo2MwYTAOBgNVHQ8BAf8EBAMC +AQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUXSfqwhbIhEEHgCwYeSm25pPB +q5cwHwYDVR0jBBgwFoAUXSfqwhbIhEEHgCwYeSm25pPBq5cwDQYJKoZIhvcNAQEL +BQADggEBALYdO4s+or5FoOgRgNhppqFLsczCmYAwvfq43mpJ4+SkHkV0pIgzenFy +jAuvNUxVD+r+vBLrkwgXqOlvvEuMl0bVtu+mGYXdKQm/JpJ6UOOyisnP53EXICBd +rvcdC/DNSmY8nWE+KIlr0nuu6MQZWdOI7xCMqrsIx1EQ0XfGDY+aZO7Ir1kbpbA4 +ey03rKxWMFPh1LJSkPD+xKd+ykzZH1EtBQFNbA3SSvCyovBOsW8twgpw3dpf6jVc +JbRqS0ZuNKzloaVuiGptB4L0mort9tWffPw9Xezfk3P45t6BSRTsEKU8dVDxfBb0 +n5ta1krSRh4BuqztbvsVNvsK8kfewsg= +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIDSTCCAjGgAwIBAgIUfkUZZIdZ9jQLs/S4OYuqpnDPVUMwDQYJKoZIhvcNAQEL +BQAwLDEqMCgGA1UEAxMhUXVhbHRyaWNzIFByb2R1Y3Rpb24gUm9vdCBDQSAyMDE5 +MB4XDTE5MTIwMjIzNTAxMFoXDTI5MTEyOTIzNTA0MFowLDEqMCgGA1UEAxMhUXVh +bHRyaWNzIFByb2R1Y3Rpb24gUm9vdCBDQSAyMDE5MIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAvkrICcFOfTGsjuurVevZh8QrGJh2QeICf2Y2vvIhc1BV +JI8PQ6o3cj3mkwCX6WNHnc+I9uMYsjREPLecM8rB6UPL11eFwTgC+BVcQ1SmxcjF +EDr4rlYGnbD6268wd628tqpGu9rGTT9sc+PgegE791xxS3sEgcWLxQM8it7VCXCy +GmQqVkQhcUMpJJ9A/0JPCJgSu5V2DD67Cw8jqtT2AG1uwXcfgR0sg1alpuld1IED +VVu/qCzeddui3JaU+G2eyg3oDip/uW0y5x6eA18M4TQoxq76JNBweNVpVS7PYiO/ +c7embXQ8YjR7CHumAzDjFcrUkOebmLZdubWHrAFuXwIDAQABo2MwYTAOBgNVHQ8B +Af8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUU4fm8rplaOLntfKV +OjPK0jUNwPUwHwYDVR0jBBgwFoAUU4fm8rplaOLntfKVOjPK0jUNwPUwDQYJKoZI +hvcNAQELBQADggEBAFZ8w9ZgX+tZyaGJNuZo9MRn3dCy1trlzBqVapgx1tDMxf56 +2kxen+m4rF6wDf7KcfcmAgn1AZ4ExrqyLokp6bz4Dkv7F6QzYyStZ2rCeM1P7+UK +9vXaf17uc6EAvx/VMKyPnZZQGdMrCqU1wolc345L+UmxHCf2VUgm6OMWlB8Xk/oK +10c1LSgGQLagmmcjxGpWKsxHtnbaaEONalnUZkty72E2/YfkstzQscykMDBNrXYI +6C99R7R9zsso+JanwPB3xEElVGIS7veuXpHqCzslPDohSXNLQHc7Anl5eI85Svqo +4iWAdm6ESyr/LCqyHH8iHTvdkTqlvxB9/rJC+F8= +-----END CERTIFICATE----- \ No newline at end of file