configuration accepted for kakfa

This commit is contained in:
Bel LaPointe
2023-03-16 16:26:55 -06:00
parent 1c8c399aaa
commit 8f5097d7b3
6 changed files with 361 additions and 5 deletions

View File

@@ -22,6 +22,13 @@ pub struct Stream {
#[derive(Serialize, Deserialize, Debug)]
pub struct Engine {
pub name: String,
pub kafka: Option<Kafka>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Kafka {
topic: String,
consumer_group: String,
}
pub fn build_config() -> Result<Config, String> {
@@ -36,9 +43,9 @@ fn build_config_yaml(path: String) -> Result<Config, String> {
match fs::read_to_string(&path) {
Ok(buffer) => match serde_yaml::from_str(&buffer) {
Ok(result) => Ok(result),
Err(_) => Err("failed to read ".to_string()+&path.as_str()),
Err(err) => Err(err.to_string()),
},
Err(_) => Err("failed to read ".to_string()+&path.as_str()),
Err(err) => Err(err.to_string()),
}
}
@@ -46,10 +53,16 @@ fn build_config_std() -> Config {
return Config {
streams: Streams{
input: Stream {
engine: Engine{ name: String::from("stdin") },
engine: Engine{
name: String::from("stdin"),
kafka: None,
},
},
output: Stream {
engine: Engine{ name: String::from("stdout") },
engine: Engine{
name: String::from("stdout"),
kafka: None,
},
},
},
};

View File

@@ -7,10 +7,23 @@ pub trait InputEngine {
pub fn build_input_engine(cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
match cfg.name.as_str() {
"stdin" => return Ok(Box::new(InputEngineStdin{})),
"kafka" => return build_input_engine_kafka(&cfg),
_ => return Err("unknown input engine name".to_string() + &cfg.name),
}
}
pub fn build_input_engine_kafka(_cfg: &Engine) -> Result<Box<dyn InputEngine>, String> {
return Err("not impl".to_string());
}
struct InputEngineKafka {}
impl InputEngine for InputEngineKafka {
fn get(&self) -> Vec<char> {
Err("not impl".to_string()).unwrap()
}
}
struct InputEngineStdin {}
impl InputEngine for InputEngineStdin {

View File

@@ -0,0 +1,11 @@
streams:
input:
engine:
name: kafka
kafka:
topic: xyz
consumer_group: uvw
output:
engine:
name: stdout

View File

@@ -13,7 +13,8 @@
"engine": {
"name": "kafka",
"kafka": {
"topic": "topic-name"
"topic": "topic-name",
"consumer_group": "consumer-group-name"
}
}
}