master
Bel LaPointe 2025-11-13 16:04:26 -07:00
parent 4884d551e2
commit 56d0628ece
1 changed files with 65 additions and 21 deletions

View File

@ -153,6 +153,19 @@ impl File {
Events::new(&self.file) Events::new(&self.file)
} }
fn events_non_future(&self) -> Result<Events, String> {
let events = self.events()?;
let now = Delta::now_time();
Ok(Events(
events
.0
.iter()
.filter(|x| x.ts <= now)
.map(|x| x.clone())
.collect(),
))
}
pub fn persist_unpersisted_stage(&self) -> Result<(), String> { pub fn persist_unpersisted_stage(&self) -> Result<(), String> {
let events = self.events()?; let events = self.events()?;
let stage_mod_time = std::fs::metadata(&self.file) let stage_mod_time = std::fs::metadata(&self.file)
@ -165,18 +178,27 @@ impl File {
let old_persisted: Vec<Delta> = events let old_persisted: Vec<Delta> = events
.0 .0
.iter() .iter()
.filter(|x| x.ts < stage_mod_time) .filter(|x| x.ts <= stage_mod_time)
.map(|x| x.clone()) .map(|x| x.clone())
.collect(); .collect();
let old_events = Events(old_persisted); let old_events = Events(old_persisted);
eprintln!(
"stage_mod_time = {}, old events = {:?}",
stage_mod_time, old_events
);
let old_snapshot = old_events.snapshot()?; let old_snapshot = old_events.snapshot()?;
self.persist_delta_at(old_snapshot, self.stage()?, stage_mod_time)?; eprintln!(
self.stage_persisted() "self.persist_delta_at({:?}, {:?}",
old_snapshot,
self.stage().unwrap()
);
self.persist_delta_at(old_snapshot, self.stage()?, stage_mod_time)
//self.stage_persisted()
} }
pub fn stage_persisted(&self) -> Result<(), String> { pub fn stage_persisted(&self) -> Result<(), String> {
let stage = self.events()?.snapshot()?; let persisted_as_snapshot = self.events_non_future()?.snapshot()?;
let plaintext = serde_yaml::to_string(&stage).unwrap(); let plaintext = serde_yaml::to_string(&persisted_as_snapshot).unwrap();
let mut f = std::fs::File::create(&self.file).expect("failed to open file for writing"); let mut f = std::fs::File::create(&self.file).expect("failed to open file for writing");
writeln!(f, "{}", plaintext).expect("failed to write"); writeln!(f, "{}", plaintext).expect("failed to write");
Ok(()) Ok(())
@ -185,11 +207,15 @@ impl File {
pub fn persist_stage(&self) -> Result<(), String> { pub fn persist_stage(&self) -> Result<(), String> {
self.persist_unpersisted_stage()?; self.persist_unpersisted_stage()?;
let persisted = self.events()?.snapshot()?; let persisted = self.events_non_future()?.snapshot()?;
let stage = self.stage()?; let stage = self.stage()?;
self.persist_delta(persisted, stage) eprintln!("persist delta...");
eprintln!(" persisted=before={:?}", &persisted);
eprintln!(" stage=aftere={:?}", &stage);
self.persist_delta(persisted, stage)?;
Ok(())
} }
pub fn persist_delta(&self, before: Vec<Task>, after: Vec<Task>) -> Result<(), String> { pub fn persist_delta(&self, before: Vec<Task>, after: Vec<Task>) -> Result<(), String> {
@ -205,8 +231,9 @@ impl File {
for before in before.iter() { for before in before.iter() {
if !after.contains(before) { if !after.contains(before) {
self.append(Delta::remove_at(before.clone(), now))?; self.append(Delta::remove_at(before.clone(), now))?;
if let Some(due) = before.next_due(Delta::now_time()) { let now = Delta::now_time();
if due >= Delta::now_time() { if let Some(due) = before.next_due(now.clone()) {
if due >= now {
self.append(Delta::add_at(before.clone(), due))?; self.append(Delta::add_at(before.clone(), due))?;
} }
} }
@ -425,35 +452,40 @@ mod test_file {
r#" r#"
{{"ts":1, "op":"Add", "task": "removed"}} {{"ts":1, "op":"Add", "task": "removed"}}
{{"ts":2, "op":"Add", "task": "old"}} {{"ts":2, "op":"Add", "task": "old"}}
{{"ts":{}, "op":"Add", "task": "enqueued for persistence"}} {{"ts":{}, "op":"Add", "task": "was enqueued for persistence but obv removed"}}
{{"ts":{}, "op":"Add", "task": "will be enqueued for persistence"}}
"#, "#,
Delta::now_time() + 5, Delta::now_time() - 50,
Delta::now_time() + 50,
) )
.as_str(), .as_str(),
); );
let f = File::new(&d.path().join("plain").to_str().unwrap().to_string()); let f = File::new(&d.path().join("plain").to_str().unwrap().to_string());
assert_eq!(3, f.events().unwrap().0.len()); assert_eq!(4, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len()); assert_eq!(2, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "old"); tests::file_contains(&d, "plain", "old");
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
f.persist_stage().unwrap(); f.persist_stage().unwrap();
assert_eq!(5, f.events().unwrap().0.len()); assert_eq!(
assert_eq!(3, f.stage().unwrap().len()); 7,
tests::file_contains(&d, "plain", "enqueued"); f.events().unwrap().0.len(),
"events: {:?}",
f.events().unwrap()
);
assert_eq!(2, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
f.stage_persisted().unwrap(); f.stage_persisted().unwrap();
assert_eq!( assert_eq!(
5, 7,
f.events().unwrap().0.len(), f.events().unwrap().0.len(),
"{:?}", "{:?}",
f.events().unwrap().0 f.events().unwrap().0
); );
assert_eq!(3, f.stage().unwrap().len(), "{:?}", f.stage().unwrap()); assert_eq!(2, f.stage().unwrap().len(), "{:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "enqueued");
tests::file_contains(&d, "plain", "new"); tests::file_contains(&d, "plain", "new");
tests::file_contains(&d, "plain", "old"); tests::file_contains(&d, "plain", "old");
}); });
@ -519,14 +551,26 @@ mod test_file {
eprintln!("0 | {:?}", f.stage().unwrap()); eprintln!("0 | {:?}", f.stage().unwrap());
f.persist_stage().unwrap(); f.persist_stage().unwrap();
eprintln!("1 | {:?}", f.stage().unwrap()); eprintln!("0 | {:?}", f.stage().unwrap());
assert_eq!( assert_eq!(
1, 1,
f.events().unwrap().0.len(), f.events().unwrap().0.len(),
"{:?}", "events after 1 add scheduled: {:?}",
f.events().unwrap().0 f.events().unwrap().0
); );
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage()); assert_eq!(
1,
f.events().unwrap().snapshot().unwrap().len(),
"events.snapshot after 1 add scheduled: {:?}",
f.events().unwrap().snapshot().unwrap(),
);
tests::file_contains(&d, "plain", "[]");
assert_eq!(
0,
f.stage().unwrap().len(),
"stage after 1 add scheduled: {:?}",
f.stage()
);
f.stage_persisted().unwrap(); f.stage_persisted().unwrap();
assert_eq!( assert_eq!(