diff --git a/Cargo.lock b/Cargo.lock index 16cb054..acf5035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,30 +2,12 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "byteorder" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" - [[package]] name = "cc" version = "1.0.79" @@ -39,69 +21,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "crc" -version = "2.1.0" +name = "futures-channel" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ - "crc-catalog", + "futures-core", ] [[package]] -name = "crc-catalog" -version = "1.1.1" +name = "futures-core" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] -name = "crc32fast" -version = "1.3.2" +name = "futures-task" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" + +[[package]] +name = "futures-util" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ - "cfg-if", -] - -[[package]] -name = "flate2" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - -[[package]] -name = "getrandom" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" -dependencies = [ - "cfg-if", - "libc", - "wasi", + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", ] [[package]] @@ -126,25 +75,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" -[[package]] -name = "kafka" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b11c86b0c0c9a9d89b136b2938a5b46a35c40f66eced2f09c76458b17dadfc2a" -dependencies = [ - "byteorder", - "crc", - "flate2", - "fnv", - "openssl", - "openssl-sys", - "ref_slice", - "snap", - "thiserror", - "tracing", - "twox-hash", -] - [[package]] name = "libc" version = "0.2.140" @@ -152,12 +82,51 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" [[package]] -name = "miniz_oxide" -version = "0.6.2" +name = "libz-sys" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" dependencies = [ - "adler", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -166,51 +135,18 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" -[[package]] -name = "openssl" -version = "0.10.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd2523381e46256e40930512c7fd25562b9eae4812cb52078f155e87217c9d1e" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "openssl-sys" -version = "0.9.81" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176be2629957c157240f68f61f2d0053ad3a4ecfdd9ebf1e6521d18d9635cf67" -dependencies = [ - "autocfg", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "pin-project-lite" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.26" @@ -218,10 +154,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" [[package]] -name = "ppv-lite86" -version = "0.2.17" +name = "proc-macro-crate" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit", +] [[package]] name = "proc-macro2" @@ -242,47 +182,40 @@ dependencies = [ ] [[package]] -name = "rand" -version = "0.8.5" +name = "rdkafka" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +checksum = "bd7c5d6d17442bcb9f943aae96d67d98c6d36af60442dd5da62aaa7fcbb25c48" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.3.0+1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "libz-sys", + "num_enum", + "pkg-config", ] -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom", -] - -[[package]] -name = "ref_slice" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" - [[package]] name = "rusty-pipe" version = "0.1.0" dependencies = [ - "kafka", - "openssl", + "rdkafka", "serde", "serde_yaml", ] @@ -313,6 +246,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.9.19" @@ -327,16 +271,13 @@ dependencies = [ ] [[package]] -name = "snap" -version = "1.1.0" +name = "slab" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" - -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] [[package]] name = "syn" @@ -350,66 +291,31 @@ dependencies = [ ] [[package]] -name = "thiserror" -version = "1.0.39" +name = "tokio" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ - "thiserror-impl", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing" -version = "0.1.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" -dependencies = [ - "cfg-if", + "autocfg", "pin-project-lite", - "tracing-attributes", - "tracing-core", + "windows-sys", ] [[package]] -name = "tracing-attributes" -version = "0.1.23" +name = "toml_datetime" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] +checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" [[package]] -name = "tracing-core" -version = "0.1.30" +name = "toml_edit" +version = "0.19.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +checksum = "dc18466501acd8ac6a3f615dd29a3438f8ca6bb3b19537138b3106e575621274" dependencies = [ - "once_cell", -] - -[[package]] -name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "rand", - "static_assertions", + "indexmap", + "toml_datetime", + "winnow", ] [[package]] @@ -431,7 +337,76 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] -name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" +name = "windows-sys" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + +[[package]] +name = "winnow" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23d020b441f92996c80d94ae9166e8501e59c7bb56121189dc9eab3bd8216966" +dependencies = [ + "memchr", +] diff --git a/Cargo.toml b/Cargo.toml index 4853559..57065f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,4 @@ edition = "2021" [dependencies] serde = { version = "1.0.156", features = ["derive"] } serde_yaml = "0.9.19" -kafka = "0.9.0" -openssl = {} +rdkafka = "0.29.0" diff --git a/src/engine.rs b/src/engine.rs index 2937b66..2fc4830 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,9 +1,5 @@ use crate::config::Engine; -use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; -use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage}; -use kafka::client::{KafkaClient, SecurityConfig}; - pub trait InputEngine { fn get(&mut self) -> Vec; } @@ -16,42 +12,16 @@ pub fn build_input_engine(cfg: &Engine) -> Result, String> } } -pub fn build_input_engine_kafka(cfg: &Engine) -> Result, String> { - let kafka_cfg = cfg.kafka.as_ref().unwrap(); - let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); - builder.set_cipher_list("DEFAULT").unwrap(); - builder.set_verify(SslVerifyMode::PEER); - builder.set_ca_file(kafka_cfg.crt.to_string()).unwrap(); - let mut client = KafkaClient::new_secure( - vec!(kafka_cfg.addr.to_owned()), - SecurityConfig::new(builder.build()) - .with_hostname_verification(false), - ); - client.load_metadata(&vec![kafka_cfg.topic.to_owned()]).unwrap(); - let consumer = Consumer::from_client(client) - .with_topic(kafka_cfg.topic.to_owned()) - .with_group(kafka_cfg.consumer_group.to_owned()) - .with_fallback_offset(FetchOffset::Earliest) - .with_offset_storage(GroupOffsetStorage::Kafka) - .with_client_id("breel".to_owned()) - .create() - .unwrap(); - return Ok(Box::new(InputEngineKafka{ - consumer: consumer, - })); +struct InputEngineKafka { } -struct InputEngineKafka { - consumer: Consumer +pub fn build_input_engine_kafka(cfg: &Engine) -> Result, String> { + let kafka_cfg = cfg.kafka.as_ref().unwrap(); + return Err("do what".to_string()); } impl InputEngine for InputEngineKafka { fn get(&mut self) -> Vec { - for ms in self.consumer.poll().unwrap().iter() { - for m in ms.messages() { - return std::str::from_utf8(m.value).unwrap().to_string().chars().collect(); - } - } Err("end".to_string()).unwrap() } }