From 8f5097d7b328231d0c1c2ba25ddc4ae19890f137 Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Thu, 16 Mar 2023 16:26:55 -0600 Subject: [PATCH] configuration accepted for kakfa --- Cargo.lock | 317 ++++++++++++++++++++++ Cargo.toml | 1 + src/config.rs | 21 +- src/engine.rs | 13 + src/testdata/config-kafka-to-stdout.yaml | 11 + src/testdata/config-kinesis-to-kafka.json | 3 +- 6 files changed, 361 insertions(+), 5 deletions(-) create mode 100644 src/testdata/config-kafka-to-stdout.yaml diff --git a/Cargo.lock b/Cargo.lock index 3665c0b..3877f9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,108 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "cc" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "flate2" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "getrandom" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -30,6 +126,103 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +[[package]] +name = "kafka" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b11c86b0c0c9a9d89b136b2938a5b46a35c40f66eced2f09c76458b17dadfc2a" +dependencies = [ + "byteorder", + "crc", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror", + "tracing", + "twox-hash", +] + +[[package]] +name = "libc" +version = "0.2.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" + +[[package]] +name = "miniz_oxide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +dependencies = [ + "adler", +] + +[[package]] +name = "once_cell" +version = "1.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" + +[[package]] +name = "openssl" +version = "0.10.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd2523381e46256e40930512c7fd25562b9eae4812cb52078f155e87217c9d1e" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-sys" +version = "0.9.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176be2629957c157240f68f61f2d0053ad3a4ecfdd9ebf1e6521d18d9635cf67" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pkg-config" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.52" @@ -48,10 +241,47 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + [[package]] name = "rusty-pipe" version = "0.1.0" dependencies = [ + "kafka", "serde", "serde_yaml", ] @@ -95,6 +325,18 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "snap" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "syn" version = "1.0.109" @@ -106,6 +348,69 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand", + "static_assertions", +] + [[package]] name = "unicode-ident" version = "1.0.8" @@ -117,3 +422,15 @@ name = "unsafe-libyaml" version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad2024452afd3874bf539695e04af6732ba06517424dbf958fdb16a01f3bef6c" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" diff --git a/Cargo.toml b/Cargo.toml index 2f6470c..38591f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,4 @@ edition = "2021" [dependencies] serde = { version = "1.0.156", features = ["derive"] } serde_yaml = "0.9.19" +kafka = "0.9.0" diff --git a/src/config.rs b/src/config.rs index 4dc15d4..5fea896 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,6 +22,13 @@ pub struct Stream { #[derive(Serialize, Deserialize, Debug)] pub struct Engine { pub name: String, + pub kafka: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Kafka { + topic: String, + consumer_group: String, } pub fn build_config() -> Result { @@ -36,9 +43,9 @@ fn build_config_yaml(path: String) -> Result { match fs::read_to_string(&path) { Ok(buffer) => match serde_yaml::from_str(&buffer) { Ok(result) => Ok(result), - Err(_) => Err("failed to read ".to_string()+&path.as_str()), + Err(err) => Err(err.to_string()), }, - Err(_) => Err("failed to read ".to_string()+&path.as_str()), + Err(err) => Err(err.to_string()), } } @@ -46,10 +53,16 @@ fn build_config_std() -> Config { return Config { streams: Streams{ input: Stream { - engine: Engine{ name: String::from("stdin") }, + engine: Engine{ + name: String::from("stdin"), + kafka: None, + }, }, output: Stream { - engine: Engine{ name: String::from("stdout") }, + engine: Engine{ + name: String::from("stdout"), + kafka: None, + }, }, }, }; diff --git a/src/engine.rs b/src/engine.rs index e936270..92abe61 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -7,10 +7,23 @@ pub trait InputEngine { pub fn build_input_engine(cfg: &Engine) -> Result, String> { match cfg.name.as_str() { "stdin" => return Ok(Box::new(InputEngineStdin{})), + "kafka" => return build_input_engine_kafka(&cfg), _ => return Err("unknown input engine name".to_string() + &cfg.name), } } +pub fn build_input_engine_kafka(_cfg: &Engine) -> Result, String> { + return Err("not impl".to_string()); +} + +struct InputEngineKafka {} + +impl InputEngine for InputEngineKafka { + fn get(&self) -> Vec { + Err("not impl".to_string()).unwrap() + } +} + struct InputEngineStdin {} impl InputEngine for InputEngineStdin { diff --git a/src/testdata/config-kafka-to-stdout.yaml b/src/testdata/config-kafka-to-stdout.yaml new file mode 100644 index 0000000..9405f0c --- /dev/null +++ b/src/testdata/config-kafka-to-stdout.yaml @@ -0,0 +1,11 @@ +streams: + input: + engine: + name: kafka + kafka: + topic: xyz + consumer_group: uvw + output: + engine: + name: stdout + diff --git a/src/testdata/config-kinesis-to-kafka.json b/src/testdata/config-kinesis-to-kafka.json index 8fc0e02..b2ea698 100644 --- a/src/testdata/config-kinesis-to-kafka.json +++ b/src/testdata/config-kinesis-to-kafka.json @@ -13,7 +13,8 @@ "engine": { "name": "kafka", "kafka": { - "topic": "topic-name" + "topic": "topic-name", + "consumer_group": "consumer-group-name" } } }