impl udp receiver
parent
93e3046118
commit
a40ff70c7a
|
|
@ -24,6 +24,7 @@ pub struct Engine {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub kafka: Option<Kafka>,
|
pub kafka: Option<Kafka>,
|
||||||
pub device: Option<Device>,
|
pub device: Option<Device>,
|
||||||
|
pub udp: Option<UDP>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
|
@ -38,6 +39,11 @@ pub struct Kafka {
|
||||||
pub crt: String,
|
pub crt: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct UDP {
|
||||||
|
pub port: i32,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn build_config() -> Result<Config, String> {
|
pub fn build_config() -> Result<Config, String> {
|
||||||
let config_path = env::var("CONFIG_PATH");
|
let config_path = env::var("CONFIG_PATH");
|
||||||
match config_path {
|
match config_path {
|
||||||
|
|
@ -64,6 +70,7 @@ fn build_config_std() -> Config {
|
||||||
name: String::from("stdin"),
|
name: String::from("stdin"),
|
||||||
kafka: None,
|
kafka: None,
|
||||||
device: None,
|
device: None,
|
||||||
|
udp: None,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
output: Stream {
|
output: Stream {
|
||||||
|
|
@ -71,6 +78,7 @@ fn build_config_std() -> Config {
|
||||||
name: String::from("stdout"),
|
name: String::from("stdout"),
|
||||||
kafka: None,
|
kafka: None,
|
||||||
device: None,
|
device: None,
|
||||||
|
udp: None,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ pub fn build_input_engine(cfg: &Engine) -> Result<Box<dyn InputEngine>, String>
|
||||||
match cfg.name.as_str() {
|
match cfg.name.as_str() {
|
||||||
"stdin" => return Ok(Box::new(InputEngineStdin{})),
|
"stdin" => return Ok(Box::new(InputEngineStdin{})),
|
||||||
"kafka" => return build_input_engine_kafka(&cfg),
|
"kafka" => return build_input_engine_kafka(&cfg),
|
||||||
|
"udp" => return build_input_engine_udp(&cfg),
|
||||||
"device" => return build_input_engine_device(&cfg),
|
"device" => return build_input_engine_device(&cfg),
|
||||||
_ => return Err("unknown input engine name".to_string() + &cfg.name),
|
_ => return Err("unknown input engine name".to_string() + &cfg.name),
|
||||||
}
|
}
|
||||||
|
|
@ -99,6 +100,29 @@ impl InputEngine for InputEngineDevice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct InputEngineUDP {
|
||||||
|
port: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_input_engine_udp(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
|
||||||
|
let udp_cfg = cfg.udp.as_ref().unwrap();
|
||||||
|
return Ok(Box::new(InputEngineUDP{
|
||||||
|
port: udp_cfg.port,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InputEngine for InputEngineUDP {
|
||||||
|
fn get(&mut self) -> Vec<char> {
|
||||||
|
let addr = "0.0.0.0:".to_string() + &self.port.to_string();
|
||||||
|
println!("$ echo -n 'hello world' | nc -4u -w0 localhost {}", &self.port.to_string());
|
||||||
|
let socket = std::net::UdpSocket::bind(addr).unwrap();
|
||||||
|
let mut buf = [0; 128];
|
||||||
|
let (amt, src) = socket.recv_from(&mut buf).unwrap();
|
||||||
|
let buf = &mut buf[..amt];
|
||||||
|
return std::str::from_utf8(buf).unwrap().chars().collect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct InputEngineKafka {
|
struct InputEngineKafka {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
streams:
|
||||||
|
input:
|
||||||
|
engine:
|
||||||
|
name: udp
|
||||||
|
udp:
|
||||||
|
port: 11231
|
||||||
|
output:
|
||||||
|
engine:
|
||||||
|
name: stdout
|
||||||
Loading…
Reference in New Issue