udp input, output COMPLETE

master
Bel LaPointe 2023-03-21 15:29:01 -06:00
parent a40ff70c7a
commit 4cd8ac47c8
4 changed files with 38 additions and 1 deletions

View File

@ -41,6 +41,7 @@ pub struct Kafka {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct UDP { pub struct UDP {
pub host: Option<String>,
pub port: i32, pub port: i32,
} }

View File

@ -177,6 +177,7 @@ pub trait OutputEngine {
pub fn build_output_engine(cfg: &Engine) -> Result<Box<dyn OutputEngine>, String> { pub fn build_output_engine(cfg: &Engine) -> Result<Box<dyn OutputEngine>, String> {
match cfg.name.as_str() { match cfg.name.as_str() {
"stdout" => return Ok(Box::new(OutputEngineStdout{})), "stdout" => return Ok(Box::new(OutputEngineStdout{})),
"udp" => return build_output_engine_udp(&cfg),
_ => return Err("unknown output engine name".to_string() + &cfg.name), _ => return Err("unknown output engine name".to_string() + &cfg.name),
} }
} }
@ -209,3 +210,24 @@ mod test_output {
engine.put("teehee".to_string().chars().collect()); engine.put("teehee".to_string().chars().collect());
} }
} }
struct OutputEngineUDP {
host: String,
port: i32,
}
pub fn build_output_engine_udp(cfg: &Engine) -> Result<Box<dyn OutputEngine>, String> {
let udp_cfg = cfg.udp.as_ref().unwrap();
return Ok(Box::new(OutputEngineUDP{
host: udp_cfg.host.clone().unwrap(),
port: udp_cfg.port,
}));
}
impl OutputEngine for OutputEngineUDP {
fn put(&self, v: Vec<char>) {
let socket = std::net::UdpSocket::bind("127.0.0.1:".to_string() + &(self.port+10).to_string()).unwrap();
socket.connect(self.host.to_string() + ":" + &self.port.to_string()).unwrap();
socket.send(&v.iter().cloned().collect::<String>().as_bytes()).unwrap();
}
}

View File

@ -7,5 +7,9 @@ fn main() {
let output_engine = engine::build_output_engine(&cfg.streams.output.engine); let output_engine = engine::build_output_engine(&cfg.streams.output.engine);
println!("{:?} => {}", cfg.streams.input.engine.name, input_engine.is_ok()); println!("{:?} => {}", cfg.streams.input.engine.name, input_engine.is_ok());
println!("{:?} => {}", cfg.streams.output.engine.name, output_engine.is_ok()); println!("{:?} => {}", cfg.streams.output.engine.name, output_engine.is_ok());
output_engine.unwrap().put(input_engine.unwrap().get()); let mut input_engine = input_engine.unwrap();
let mut output_engine = output_engine.unwrap();
loop {
output_engine.put(input_engine.get());
}
} }

10
src/testdata/config-stdin-to-udp.yaml vendored Normal file
View File

@ -0,0 +1,10 @@
streams:
input:
engine:
name: stdin
output:
engine:
name: udp
udp:
host: 127.0.0.1
port: 11231