diff --git a/src/config.rs b/src/config.rs index 969a591..35a2d4e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,7 @@ pub struct Kafka { #[derive(Serialize, Deserialize, Debug)] pub struct UDP { + pub host: Option, pub port: i32, } diff --git a/src/engine.rs b/src/engine.rs index 6f7bab1..f5486f6 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -177,6 +177,7 @@ pub trait OutputEngine { pub fn build_output_engine(cfg: &Engine) -> Result, String> { match cfg.name.as_str() { "stdout" => return Ok(Box::new(OutputEngineStdout{})), + "udp" => return build_output_engine_udp(&cfg), _ => return Err("unknown output engine name".to_string() + &cfg.name), } } @@ -209,3 +210,24 @@ mod test_output { engine.put("teehee".to_string().chars().collect()); } } + +struct OutputEngineUDP { + host: String, + port: i32, +} + +pub fn build_output_engine_udp(cfg: &Engine) -> Result, String> { + let udp_cfg = cfg.udp.as_ref().unwrap(); + return Ok(Box::new(OutputEngineUDP{ + host: udp_cfg.host.clone().unwrap(), + port: udp_cfg.port, + })); +} + +impl OutputEngine for OutputEngineUDP { + fn put(&self, v: Vec) { + let socket = std::net::UdpSocket::bind("127.0.0.1:".to_string() + &(self.port+10).to_string()).unwrap(); + socket.connect(self.host.to_string() + ":" + &self.port.to_string()).unwrap(); + socket.send(&v.iter().cloned().collect::().as_bytes()).unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index 6189a58..ef4fe41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,5 +7,9 @@ fn main() { let output_engine = engine::build_output_engine(&cfg.streams.output.engine); println!("{:?} => {}", cfg.streams.input.engine.name, input_engine.is_ok()); println!("{:?} => {}", cfg.streams.output.engine.name, output_engine.is_ok()); - output_engine.unwrap().put(input_engine.unwrap().get()); + let mut input_engine = input_engine.unwrap(); + let mut output_engine = output_engine.unwrap(); + loop { + output_engine.put(input_engine.get()); + } } diff --git a/src/testdata/config-stdin-to-udp.yaml b/src/testdata/config-stdin-to-udp.yaml new file mode 100644 index 0000000..051adef --- /dev/null +++ b/src/testdata/config-stdin-to-udp.yaml @@ -0,0 +1,10 @@ +streams: + input: + engine: + name: stdin + output: + engine: + name: udp + udp: + host: 127.0.0.1 + port: 11231