use crate::config::Stream; use hidapi::HidApi; use rusb::UsbContext; use gilrs::{Gilrs, Button, Event}; use serde_json::json; use handlebars::Handlebars; use std::time::{SystemTime, UNIX_EPOCH}; pub trait InputStream { fn get(&mut self) -> Vec; } pub fn build_input_stream(cfg: &Stream) -> Box { match cfg.engine.name.as_str() { "stdin" => { return Box::new(InputStreamStdin{}); }, "kafka" => { return Box::new(build_input_stream_kafka(&cfg).unwrap()); }, "udp" => return Box::new(build_input_stream_udp(&cfg).unwrap()), "device" => return Box::new(build_input_stream_device(&cfg).unwrap()), _ => {}, }; assert!(false); Box::new(InputStreamStdin{}) } pub struct InputStreamDevice { } pub fn build_input_stream_device(cfg: &Stream) -> Result { return build_input_stream_device_gilrs(cfg) } pub fn build_input_stream_device_gilrs(cfg: &Stream) -> Result { let _device_cfg = cfg.engine.device.as_ref().unwrap(); let mut gilrs = Gilrs::new().unwrap(); eprintln!("printing gamepads"); for (_id, gamepad) in gilrs.gamepads() { eprintln!("{} is {:?}", gamepad.name(), gamepad.power_info()); } eprintln!("printing gamepads events"); loop { // eprintln!("reading gamepads events"); // Examine new events while let Some(Event { id, event, time }) = gilrs.next_event() { eprintln!("{:?} New event from {}: {:?}", time, id, event); let active_gamepad = Some(id); eprintln!("inspecting event"); // You can also use cached gamepad state if let Some(gamepad) = active_gamepad.map(|id| gilrs.gamepad(id)) { if gamepad.is_pressed(Button::South) { eprintln!("Button South is pressed (XBox - A, PS - X)"); } } break; } std::thread::sleep(std::time::Duration::from_millis(15)); break; } return Err("do what".to_string()); } pub fn build_input_stream_device_hidapi(cfg: &Stream) -> Result { let _device_cfg = cfg.engine.device.as_ref().unwrap(); match HidApi::new() { Ok(api) => { for device in api.devices() { eprintln!("{:#?}", device); } }, Err(e) => { eprintln!("Error: {}", e); }, }; return Err("do what".to_string()); } pub fn build_input_stream_device_rusb(cfg: &Stream) -> Result { let _device_cfg = cfg.engine.device.as_ref().unwrap(); assert!(rusb::has_capability()); let ctx = rusb::Context::new().unwrap(); assert!(ctx.devices().unwrap().len() > 0); for device in ctx.devices().unwrap().iter() { let device_desc = device.device_descriptor().unwrap(); eprintln!("Bus {:03} Device {:03} ID {:04x}:{:04x}", device.bus_number(), device.address(), device_desc.vendor_id(), device_desc.product_id()); } return Err("do what".to_string()); } impl InputStream for InputStreamDevice { fn get(&mut self) -> Vec { Err("end".to_string()).unwrap() } } pub struct InputStreamUDP { last_socket: Option, port: i32, } pub fn build_input_stream_udp(cfg: &Stream) -> Result { let udp_cfg = cfg.engine.udp.as_ref().unwrap(); eprintln!("$ echo -n 'hello world' | nc -4u -w0 localhost {}", &udp_cfg.port.to_string()); return Ok(InputStreamUDP{ last_socket: None, port: udp_cfg.port, }); } impl InputStream for InputStreamUDP { fn get(&mut self) -> Vec { let addr = "0.0.0.0:".to_string() + &self.port.to_string(); if self.last_socket.is_none() { self.last_socket = Some(std::net::UdpSocket::bind(&addr).unwrap()); } let mut buf = [0; 128]; let result = self.last_socket.as_ref().unwrap().recv_from(&mut buf); if result.is_err() { eprintln!("InputStreamUDP: failed to recv: {:?}", result.err()); self.last_socket = None; return Vec::::new(); } let (amt, _) = result.unwrap(); let buf = &mut buf[..amt]; return std::str::from_utf8(buf).unwrap().chars().collect(); } } pub struct InputStreamKafka { } pub fn build_input_stream_kafka(cfg: &Stream) -> Result { let _kafka_cfg = cfg.engine.kafka.as_ref().unwrap(); return Err("do what".to_string()); } impl InputStream for InputStreamKafka { fn get(&mut self) -> Vec { Err("end".to_string()).unwrap() } } pub struct InputStreamStdin {} impl InputStream for InputStreamStdin { fn get(&mut self) -> Vec { let stdin = std::io::stdin(); let mut result = String::new(); stdin.read_line(&mut result).unwrap(); return result.trim().chars().collect(); } } #[cfg(test)] mod test_input { use super::*; struct InputStreamTest {} impl InputStream for InputStreamTest { fn get(&mut self) -> Vec { return "hello world".chars().collect(); } } #[test] fn test_input_stream_impl() { let mut input_stream_test = InputStreamTest{}; _test_input_stream_impl(&mut input_stream_test); } fn _test_input_stream_impl(engine: &mut dyn InputStream) { assert_eq!("hello world".to_string(), engine.get().iter().cloned().collect::()); } } pub trait OutputStream { fn put(&mut self, v: Vec); } pub fn build_output_stream(cfg: &Stream) -> Box { return build_formatted_output_stream(cfg, _build_output_stream(cfg)) } pub struct OutputStreamFormatted { format: Option, stream: Box, debug: bool, } pub fn build_formatted_output_stream(cfg: &Stream, stream: Box) -> Box { return Box::new(OutputStreamFormatted{ format: cfg.format.clone(), debug: cfg.debug, stream: stream, }); } impl OutputStream for OutputStreamFormatted { fn put(&mut self, v: Vec) { let v2 = match self.format.as_ref() { Some(x) => sprintf(x, v), None => v, }; if self.debug { eprintln!("output: {}", v2.iter().collect::()); } self.stream.put(v2); } } fn sprintf(x: &String, v: Vec) -> Vec { let reg = Handlebars::new(); return reg.render_template(x, &json!({ "VALUE": v.iter().collect::(), "ms_since_epoch": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(), })).unwrap().chars().collect(); } pub fn _build_output_stream(cfg: &Stream) -> Box { if cfg.engine.name.as_str() == "stdout" { return Box::new(OutputStreamStdout{}); } else if cfg.engine.name.as_str() == "udp" { return Box::new(build_output_stream_udp(&cfg).unwrap()); } assert!(false); Box::new(OutputStreamStdout{}) } pub struct OutputStreamStdout {} impl OutputStream for OutputStreamStdout { fn put(&mut self, v: Vec) { println!("{}", v.iter().cloned().collect::()); } } #[cfg(test)] mod test_output { use super::*; struct OutputStreamTest {} impl OutputStream for OutputStreamTest { fn put(&mut self, _v: Vec) { } } #[test] fn test_output_stream_impl() { let mut output_stream_test = OutputStreamTest{}; _test_output_stream_impl(&mut output_stream_test); } fn _test_output_stream_impl(engine: &mut dyn OutputStream) { engine.put("teehee".to_string().chars().collect()); } #[test] fn test_output_stream_formatted() { assert_eq!( "hello world".to_string().chars().collect::>(), sprintf( &String::from("hello {{ VALUE }}"), String::from("world").chars().collect(), ), ); } } pub struct OutputStreamUDP { last_socket: Option, host: String, port: i32, } pub fn build_output_stream_udp(cfg: &Stream) -> Result { let udp_cfg = cfg.engine.udp.as_ref().unwrap(); return Ok(OutputStreamUDP{ last_socket: None, host: udp_cfg.host.clone().unwrap(), port: udp_cfg.port, }); } impl OutputStream for OutputStreamUDP { fn put(&mut self, v: Vec) { if self.last_socket.is_none() { let result = std::net::UdpSocket::bind("0.0.0.0:".to_string() + &(self.port+10).to_string()); if result.is_err() { eprintln!("OutputStreamUDP: failed to bind to 127.0.0.1:{}: {:?}", &(self.port+10).to_string(), result.err()); return; } self.last_socket = Some(result.unwrap()); let result = self.last_socket.as_ref().unwrap().connect(self.host.to_string() + ":" + &self.port.to_string()); if result.is_err() { eprintln!("OutputStreamUDP: failed to connect to {}:{}: {:?}", self.host.to_string(), self.port.to_string(), result.err()); self.last_socket = None; return; } } let result = self.last_socket.as_ref().unwrap().send(&v.iter().cloned().collect::().as_bytes()); if result.is_err() { eprintln!( "OutputStreamUDP: failed to send to {}:{}: {:?}", self.host.to_string(), self.port.to_string(), result.err(), ); self.last_socket = None; } } }