Files
rusty-pipe/src/stream.rs

257 lines
7.1 KiB
Rust

use crate::config::Stream;
use serde_json::json;
use handlebars::Handlebars;
use std::time::{SystemTime, UNIX_EPOCH};
pub trait InputStream {
fn get(&mut self) -> Vec<char>;
}
pub fn build_input_stream(cfg: &Stream) -> Box<dyn InputStream> {
match cfg.engine.name.as_str() {
"stdin" => {
return Box::new(InputStreamStdin{});
},
"kafka" => {
return Box::new(build_input_stream_kafka(&cfg).unwrap());
},
"udp" => return Box::new(build_input_stream_udp(&cfg).unwrap()),
_ => {},
};
assert!(false);
Box::new(InputStreamStdin{})
}
pub struct InputStreamDevice {
}
//pub fn build_input_stream_device_gilrs(cfg: &Stream) -> Result<InputStreamDevice, String> {
//pub fn build_input_stream_device_hidapi(cfg: &Stream) -> Result<InputStreamDevice, String> {
//pub fn build_input_stream_device_rusb(cfg: &Stream) -> Result<InputStreamDevice, String> {
impl InputStream for InputStreamDevice {
fn get(&mut self) -> Vec<char> {
Err("end".to_string()).unwrap()
}
}
pub struct InputStreamUDP {
last_socket: Option<std::net::UdpSocket>,
port: i32,
}
pub fn build_input_stream_udp(cfg: &Stream) -> Result<InputStreamUDP, String> {
let udp_cfg = cfg.engine.udp.as_ref().unwrap();
eprintln!("$ echo -n 'hello world' | nc -4u -w0 localhost {}", &udp_cfg.port.to_string());
return Ok(InputStreamUDP{
last_socket: None,
port: udp_cfg.port,
});
}
impl InputStream for InputStreamUDP {
fn get(&mut self) -> Vec<char> {
let addr = "0.0.0.0:".to_string() + &self.port.to_string();
if self.last_socket.is_none() {
self.last_socket = Some(std::net::UdpSocket::bind(&addr).unwrap());
}
let mut buf = [0; 128];
let result = self.last_socket.as_ref().unwrap().recv_from(&mut buf);
if result.is_err() {
eprintln!("InputStreamUDP: failed to recv: {:?}", result.err());
self.last_socket = None;
return Vec::<char>::new();
}
let (amt, _) = result.unwrap();
let buf = &mut buf[..amt];
return std::str::from_utf8(buf).unwrap().chars().collect();
}
}
pub struct InputStreamKafka {
}
pub fn build_input_stream_kafka(cfg: &Stream) -> Result<InputStreamKafka, String> {
let _kafka_cfg = cfg.engine.kafka.as_ref().unwrap();
return Err("do what".to_string());
}
impl InputStream for InputStreamKafka {
fn get(&mut self) -> Vec<char> {
Err("end".to_string()).unwrap()
}
}
pub struct InputStreamStdin {}
impl InputStream for InputStreamStdin {
fn get(&mut self) -> Vec<char> {
let stdin = std::io::stdin();
let mut result = String::new();
stdin.read_line(&mut result).unwrap();
return result.trim().chars().collect();
}
}
#[cfg(test)]
mod test_input {
use super::*;
struct InputStreamTest {}
impl InputStream for InputStreamTest {
fn get(&mut self) -> Vec<char> {
return "hello world".chars().collect();
}
}
#[test]
fn test_input_stream_impl() {
let mut input_stream_test = InputStreamTest{};
_test_input_stream_impl(&mut input_stream_test);
}
fn _test_input_stream_impl(engine: &mut dyn InputStream) {
assert_eq!("hello world".to_string(), engine.get().iter().cloned().collect::<String>());
}
}
pub trait OutputStream {
fn put(&mut self, v: Vec<char>);
}
pub fn build_output_stream(cfg: &Stream) -> Box<dyn OutputStream> {
return build_formatted_output_stream(cfg, _build_output_stream(cfg))
}
pub struct OutputStreamFormatted {
format: Option<String>,
stream: Box<dyn OutputStream>,
debug: bool,
}
pub fn build_formatted_output_stream(cfg: &Stream, stream: Box<dyn OutputStream>) -> Box<dyn OutputStream> {
return Box::new(OutputStreamFormatted{
format: cfg.format.clone(),
debug: cfg.debug,
stream: stream,
});
}
impl OutputStream for OutputStreamFormatted {
fn put(&mut self, v: Vec<char>) {
let v2 = match self.format.as_ref() {
Some(x) => sprintf(x, v),
None => v,
};
if self.debug {
eprintln!("output: {}", v2.iter().collect::<String>());
}
self.stream.put(v2);
}
}
fn sprintf(x: &String, v: Vec<char>) -> Vec<char> {
let reg = Handlebars::new();
return reg.render_template(x, &json!({
"VALUE": v.iter().collect::<String>(),
"ms_since_epoch": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(),
})).unwrap().chars().collect();
}
pub fn _build_output_stream(cfg: &Stream) -> Box<dyn OutputStream> {
if cfg.engine.name.as_str() == "stdout" {
return Box::new(OutputStreamStdout{});
} else if cfg.engine.name.as_str() == "udp" {
return Box::new(build_output_stream_udp(&cfg).unwrap());
}
assert!(false);
Box::new(OutputStreamStdout{})
}
pub struct OutputStreamStdout {}
impl OutputStream for OutputStreamStdout {
fn put(&mut self, v: Vec<char>) {
println!("{}", v.iter().cloned().collect::<String>());
}
}
#[cfg(test)]
mod test_output {
use super::*;
struct OutputStreamTest {}
impl OutputStream for OutputStreamTest {
fn put(&mut self, _v: Vec<char>) {
}
}
#[test]
fn test_output_stream_impl() {
let mut output_stream_test = OutputStreamTest{};
_test_output_stream_impl(&mut output_stream_test);
}
fn _test_output_stream_impl(engine: &mut dyn OutputStream) {
engine.put("teehee".to_string().chars().collect());
}
#[test]
fn test_output_stream_formatted() {
assert_eq!(
"hello world".to_string().chars().collect::<Vec<char>>(),
sprintf(
&String::from("hello {{ VALUE }}"),
String::from("world").chars().collect(),
),
);
}
}
pub struct OutputStreamUDP {
last_socket: Option<std::net::UdpSocket>,
host: String,
port: i32,
}
pub fn build_output_stream_udp(cfg: &Stream) -> Result<OutputStreamUDP, String> {
let udp_cfg = cfg.engine.udp.as_ref().unwrap();
return Ok(OutputStreamUDP{
last_socket: None,
host: udp_cfg.host.clone().unwrap(),
port: udp_cfg.port,
});
}
impl OutputStream for OutputStreamUDP {
fn put(&mut self, v: Vec<char>) {
if self.last_socket.is_none() {
let result = std::net::UdpSocket::bind("0.0.0.0:".to_string() + &(self.port+10).to_string());
if result.is_err() {
eprintln!("OutputStreamUDP: failed to bind to 127.0.0.1:{}: {:?}", &(self.port+10).to_string(), result.err());
return;
}
self.last_socket = Some(result.unwrap());
let result = self.last_socket.as_ref().unwrap().connect(self.host.to_string() + ":" + &self.port.to_string());
if result.is_err() {
eprintln!("OutputStreamUDP: failed to connect to {}:{}: {:?}", self.host.to_string(), self.port.to_string(), result.err());
self.last_socket = None;
return;
}
}
let result = self.last_socket.as_ref().unwrap().send(&v.iter().cloned().collect::<String>().as_bytes());
if result.is_err() {
eprintln!(
"OutputStreamUDP: failed to send to {}:{}: {:?}",
self.host.to_string(),
self.port.to_string(),
result.err(),
);
self.last_socket = None;
}
}
}