From a40ff70c7ac144323f72cbdababef5f2c2c755ae Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Tue, 21 Mar 2023 15:11:58 -0600 Subject: [PATCH] impl udp receiver --- src/config.rs | 8 ++++++++ src/engine.rs | 24 ++++++++++++++++++++++++ src/testdata/config-udp-to-stdout.yaml | 9 +++++++++ 3 files changed, 41 insertions(+) create mode 100644 src/testdata/config-udp-to-stdout.yaml diff --git a/src/config.rs b/src/config.rs index 103c62e..969a591 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,6 +24,7 @@ pub struct Engine { pub name: String, pub kafka: Option, pub device: Option, + pub udp: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -38,6 +39,11 @@ pub struct Kafka { pub crt: String, } +#[derive(Serialize, Deserialize, Debug)] +pub struct UDP { + pub port: i32, +} + pub fn build_config() -> Result { let config_path = env::var("CONFIG_PATH"); match config_path { @@ -64,6 +70,7 @@ fn build_config_std() -> Config { name: String::from("stdin"), kafka: None, device: None, + udp: None, }, }, output: Stream { @@ -71,6 +78,7 @@ fn build_config_std() -> Config { name: String::from("stdout"), kafka: None, device: None, + udp: None, }, }, }, diff --git a/src/engine.rs b/src/engine.rs index 45fdb71..6f7bab1 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -12,6 +12,7 @@ pub fn build_input_engine(cfg: &Engine) -> Result, String> match cfg.name.as_str() { "stdin" => return Ok(Box::new(InputEngineStdin{})), "kafka" => return build_input_engine_kafka(&cfg), + "udp" => return build_input_engine_udp(&cfg), "device" => return build_input_engine_device(&cfg), _ => return Err("unknown input engine name".to_string() + &cfg.name), } @@ -99,6 +100,29 @@ impl InputEngine for InputEngineDevice { } } +struct InputEngineUDP { + port: i32, +} + +pub fn build_input_engine_udp(cfg: &Engine) -> Result, String> { + let udp_cfg = cfg.udp.as_ref().unwrap(); + return Ok(Box::new(InputEngineUDP{ + port: udp_cfg.port, + })); +} + +impl InputEngine for InputEngineUDP { + fn get(&mut self) -> Vec { + let addr = "0.0.0.0:".to_string() + &self.port.to_string(); + println!("$ echo -n 'hello world' | nc -4u -w0 localhost {}", &self.port.to_string()); + let socket = std::net::UdpSocket::bind(addr).unwrap(); + let mut buf = [0; 128]; + let (amt, src) = socket.recv_from(&mut buf).unwrap(); + let buf = &mut buf[..amt]; + return std::str::from_utf8(buf).unwrap().chars().collect(); + } +} + struct InputEngineKafka { } diff --git a/src/testdata/config-udp-to-stdout.yaml b/src/testdata/config-udp-to-stdout.yaml new file mode 100644 index 0000000..24e6ab1 --- /dev/null +++ b/src/testdata/config-udp-to-stdout.yaml @@ -0,0 +1,9 @@ +streams: + input: + engine: + name: udp + udp: + port: 11231 + output: + engine: + name: stdout