new tests with just add-remove and persist_stage collapses finding missed persists

master
Bel LaPointe 2025-11-12 13:01:35 -07:00
parent c1a5934215
commit 1a9f052396
1 changed files with 137 additions and 83 deletions

View File

@ -9,21 +9,15 @@ fn main() {
if !flags.dry_run { if !flags.dry_run {
for file in files.files.iter() { for file in files.files.iter() {
file.stage_new_persisted()
.expect("failed to stage new log files");
file.persist_stage() file.persist_stage()
.expect("failed to persist staged changes to log file"); .expect("failed to persist staged changes to log file");
file.stage_persisted().expect("failed to stage log files"); file.stage_persisted().expect("failed to stage log files");
} }
if let Some(add) = flags.add { if let Some(add) = flags.add {
let patch: json_patch::PatchOperation = let task = Task(serde_yaml::Value::String(add));
json_patch::PatchOperation::Add(json_patch::AddOperation {
path: jsonptr::PointerBuf::parse("/-").expect("cannot create path to /-"),
value: serde_json::json!(add),
});
files.files[0] files.files[0]
.append(Delta::now(patch)) .append(Delta::add(task))
.expect("failed to add"); .expect("failed to add");
if !flags.enqueue_add { if !flags.enqueue_add {
files.files[0] files.files[0]
@ -159,7 +153,7 @@ impl File {
Events::new(&self.file) Events::new(&self.file)
} }
pub fn stage_new_persisted(&self) -> Result<(), String> { pub fn persist_unpersisted_stage(&self) -> Result<(), String> {
let events = self.events()?; let events = self.events()?;
let stage_mod_time = std::fs::metadata(&self.file) let stage_mod_time = std::fs::metadata(&self.file)
.unwrap() .unwrap()
@ -168,14 +162,16 @@ impl File {
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs(); .as_secs();
let new_persisted: Vec<Delta> = events let old_persisted: Vec<Delta> = events
.0 .0
.iter() .iter()
.filter(|x| x.ts > stage_mod_time) .filter(|x| x.ts < stage_mod_time)
.map(|x| x.clone()) .map(|x| x.clone())
.collect(); .collect();
panic!("not impl: apply filtered deltas to stage"); let old_events = Events(old_persisted);
Ok(()) let old_snapshot = old_events.snapshot()?;
self.persist_delta_at(old_snapshot, self.stage()?, stage_mod_time)?;
self.stage_persisted()
} }
pub fn stage_persisted(&self) -> Result<(), String> { pub fn stage_persisted(&self) -> Result<(), String> {
@ -187,6 +183,8 @@ impl File {
} }
pub fn persist_stage(&self) -> Result<(), String> { pub fn persist_stage(&self) -> Result<(), String> {
self.persist_unpersisted_stage()?;
let persisted = self.events()?.snapshot()?; let persisted = self.events()?.snapshot()?;
let stage = self.stage()?; let stage = self.stage()?;
@ -195,21 +193,24 @@ impl File {
} }
pub fn persist_delta(&self, before: Vec<Task>, after: Vec<Task>) -> Result<(), String> { pub fn persist_delta(&self, before: Vec<Task>, after: Vec<Task>) -> Result<(), String> {
let before = serde_json::to_string(&before).unwrap(); self.persist_delta_at(before, after, Delta::now_time())
let before = before.as_str(); }
let before: serde_json::Value = serde_json::from_str(&before).unwrap();
let after = serde_json::to_string(&after).unwrap(); fn persist_delta_at(
let after: serde_json::Value = serde_json::from_str(after.as_str()).unwrap(); &self,
before: Vec<Task>,
let patches = json_patch::diff(&before, &after); after: Vec<Task>,
let deltas: Vec<Delta> = patches now: u64,
.iter() ) -> Result<(), String> {
.map(|patch| patch.clone()) for before in before.iter() {
.map(|patch| Delta::now(patch.clone())) if !after.contains(before) {
.collect(); self.append(Delta::remove_at(before.clone(), now));
for delta in deltas.iter() { }
self.append(delta.clone())?; }
for after in after.iter() {
if !before.contains(after) {
self.append(Delta::add_at(after.clone(), now));
}
} }
Ok(()) Ok(())
} }
@ -305,7 +306,8 @@ mod test_file {
f.persist_stage().unwrap(); f.persist_stage().unwrap();
assert_eq!(2, f.events().unwrap().0.len()); assert_eq!(2, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len()); assert_eq!(2, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "[hello, world]"); tests::file_contains(&d, "plain", "hello");
tests::file_contains(&d, "plain", "world");
f.stage_persisted().unwrap(); f.stage_persisted().unwrap();
assert_eq!(2, f.events().unwrap().0.len()); assert_eq!(2, f.events().unwrap().0.len());
@ -322,15 +324,15 @@ mod test_file {
&d, &d,
".plain.host_a", ".plain.host_a",
r#" r#"
{"ts":1, "patch":{"op":"replace", "path":"", "value": ["initial"]}} {"ts":1, "op":"Add", "task": "initial"}
{"ts":3, "patch":{"op":"add", "path":"/-", "value": {"k":"v"}}} {"ts":3, "op":"Add", "task": {"k":"v"}}
"#, "#,
); );
tests::write_file( tests::write_file(
&d, &d,
".plain.host_b", ".plain.host_b",
r#" r#"
{"ts":2, "patch":{"op":"add", "path":"/-", "value": 1}} {"ts":2, "op":"Add", "task": 1}
"#, "#,
); );
@ -343,11 +345,28 @@ mod test_file {
f.persist_stage().unwrap(); f.persist_stage().unwrap();
assert_eq!(6, f.events().unwrap().0.len()); assert_eq!(6, f.events().unwrap().0.len());
assert_eq!(0, f.stage().unwrap().len()); assert_eq!(0, f.stage().unwrap().len());
eprintln!("persist_stage | events | {:?}", f.events().unwrap().0);
eprintln!(
"persist_stage | events.snapshot | {:?}",
f.events().unwrap().snapshot()
);
eprintln!("persist_stage | stage | {:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "[]"); tests::file_contains(&d, "plain", "[]");
f.stage_persisted().unwrap(); f.stage_persisted().unwrap();
assert_eq!(6, f.events().unwrap().0.len()); assert_eq!(
assert_eq!(0, f.stage().unwrap().len()); 0,
f.events().unwrap().snapshot().unwrap().len(),
"{:?}",
f.events().unwrap().snapshot().unwrap(),
);
assert_eq!(
6,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "[]"); tests::file_contains(&d, "plain", "[]");
}); });
} }
@ -360,15 +379,15 @@ mod test_file {
&d, &d,
".plain.host_a", ".plain.host_a",
r#" r#"
{"ts":1, "patch":{"op":"replace", "path":"", "value": ["initial"]}} {"ts":1, "op":"Add", "task": "initial"}
{"ts":3, "patch":{"op":"add", "path":"/-", "value": {"k":"v"}}} {"ts":3, "op":"Add", "task": {"k":"v"}}
"#, "#,
); );
tests::write_file( tests::write_file(
&d, &d,
".plain.host_b", ".plain.host_b",
r#" r#"
{"ts":2, "patch":{"op":"add", "path":"/-", "value": 1}} {"ts":2, "op":"Add", "task": 1}
"#, "#,
); );
@ -391,7 +410,7 @@ mod test_file {
} }
#[test] #[test]
fn test_stage_new_persisted() { fn test_persist_unpersisted_stage() {
tests::with_dir(|d| { tests::with_dir(|d| {
tests::write_file(&d, "plain", "- old\n- new"); tests::write_file(&d, "plain", "- old\n- new");
tests::write_file( tests::write_file(
@ -399,36 +418,39 @@ mod test_file {
".plain.host", ".plain.host",
format!( format!(
r#" r#"
{{"ts":{}, "patch":{{"op":"replace", "path":"/0", "value": "enqueued for persistence"}}}} {{"ts":1, "op":"Add", "task": "removed"}}
{{"ts":2, "op":"Add", "task": "old"}}
{{"ts":{}, "op":"Add", "task": "enqueued for persistence"}}
"#, "#,
2147483647, 2147483647,
).as_str(), )
.as_str(),
); );
let f = File::new(&d.path().join("plain").to_str().unwrap().to_string()); let f = File::new(&d.path().join("plain").to_str().unwrap().to_string());
assert_eq!(1, f.events().unwrap().0.len()); assert_eq!(3, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len()); assert_eq!(2, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "old"); tests::file_contains(&d, "plain", "old");
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
f.stage_new_persisted().unwrap();
tests::file_contains(&d, "plain", "enqueued");
tests::file_contains(&d, "plain", "new");
assert_eq!(1, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len());
f.persist_stage().unwrap(); f.persist_stage().unwrap();
assert_eq!(3, f.events().unwrap().0.len()); assert_eq!(5, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len()); assert_eq!(3, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "enqueued"); tests::file_contains(&d, "plain", "enqueued");
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
f.stage_persisted().unwrap(); f.stage_persisted().unwrap();
assert_eq!(3, f.events().unwrap().0.len()); assert_eq!(
assert_eq!(2, f.stage().unwrap().len()); 5,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(3, f.stage().unwrap().len(), "{:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "enqueued"); tests::file_contains(&d, "plain", "enqueued");
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
tests::file_contains(&d, "plain", "old");
}); });
} }
} }
@ -436,27 +458,52 @@ mod test_file {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct Delta { struct Delta {
ts: u64, ts: u64,
patch: json_patch::PatchOperation, op: Op,
task: Task,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum Op {
Add,
Remove,
} }
impl Delta { impl Delta {
pub fn new(patch: json_patch::PatchOperation, ts: u64) -> Delta { pub fn new(ts: u64, op: Op, task: Task) -> Delta {
Delta { Delta {
patch: patch,
ts: ts, ts: ts,
op: op,
task: task,
} }
} }
pub fn now(patch: json_patch::PatchOperation) -> Delta { pub fn add(task: Task) -> Delta {
Self::new( Self::add_at(task, Self::now_time())
patch, }
pub fn add_at(task: Task, at: u64) -> Delta {
Self::new(at, Op::Add, task)
}
pub fn remove(task: Task) -> Delta {
Self::remove_at(task, Self::now_time())
}
pub fn remove_at(task: Task, at: u64) -> Delta {
Self::new(at, Op::Remove, task)
}
pub fn now(op: Op, task: Task) -> Delta {
Self::new(Self::now_time(), op, task)
}
fn now_time() -> u64 {
std::time::SystemTime::now() std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs() .as_secs()
.try_into() .try_into()
.unwrap(), .unwrap()
)
} }
} }
@ -528,22 +575,28 @@ impl Events {
} }
fn snapshot(&self) -> Result<Vec<Task>, String> { fn snapshot(&self) -> Result<Vec<Task>, String> {
let mut result = serde_json::json!([]); let mut result = vec![];
for event in self.0.iter() { for event in self.0.iter() {
match json_patch::patch(&mut result, vec![event.patch.clone()].as_slice()) { match event.op {
Ok(_) => Ok(()), Op::Add => result.push(event.task.clone()),
Err(msg) => Err(format!( Op::Remove => {
"failed to patch {} onto {}: {}", let mut i = (result.len() - 1) as i32;
&event.patch, &result, msg while i >= 0 {
)), if event.task == result[i as usize] {
}?; result.remove(i as usize);
if i == result.len() as i32 {
i -= 1
} }
match serde_json::from_str(serde_json::to_string(&result).unwrap().as_str()) { } else {
Ok(v) => Ok(v), i -= 1;
Err(msg) => Err(format!("failed turning patched into events: {}", msg)),
} }
} }
} }
};
}
Ok(result)
}
}
#[cfg(test)] #[cfg(test)]
mod test_events { mod test_events {
@ -557,7 +610,7 @@ mod test_events {
&d, &d,
".plain.some_host", ".plain.some_host",
r#" r#"
{"ts":1, "patch":{"op":"replace", "path":"", "value":["persisted"]}} {"ts":1, "op":"Add", "task":"persisted"}
"#, "#,
); );
@ -582,19 +635,20 @@ mod test_events {
&d, &d,
".plain.host_a", ".plain.host_a",
r#" r#"
{"ts":1, "patch":{"op":"replace", "path":"", "value":["persisted"]}} {"ts":1, "op":"Add", "task":"persisted"}
{"ts":3, "patch":{"op":"add", "path":"/-", "value":"persisted 3"}} {"ts":3, "op":"Add", "task":"persisted 3"}
{"ts":2, "patch":{"op":"add", "path":"/-", "value":"persisted 2"}} {"ts":2, "op":"Add", "task":"persisted 2"}
{"ts":6, "patch":{"op":"replace", "path":"/4", "value":"persisted 5'"}} {"ts":6, "op":"Remove", "task":"persisted 5"}
{"ts":7, "patch":{"op":"remove", "path":"/3"}} {"ts":6, "op":"Add", "task":"persisted 5'"}
{"ts":7, "op":"Remove", "task":"persisted 4"}
"#, "#,
); );
tests::write_file( tests::write_file(
&d, &d,
".plain.host_b", ".plain.host_b",
r#" r#"
{"ts":4, "patch":{"op":"add", "path":"/-", "value":"persisted 4"}} {"ts":4, "op":"Add", "task":"persisted 4"}
{"ts":5, "patch":{"op":"add", "path":"/-", "value":"persisted 5"}} {"ts":5, "op":"Add", "task":"persisted 5"}
"#, "#,
); );