can read deltas

master
Bel LaPointe 2025-11-11 10:12:11 -07:00
parent 001712ed1c
commit c174181435
2 changed files with 60 additions and 29 deletions

View File

@ -1,7 +1,7 @@
use clap::Parser; use clap::Parser;
use serde::{Deserialize, Serialize};
use serde_yaml; use serde_yaml;
use std::io::{Read, Write}; use std::io::{BufRead, Read, Write};
use serde::{Serialize, Deserialize};
fn main() { fn main() {
for file in Flags::new() for file in Flags::new()
@ -11,7 +11,7 @@ fn main() {
.files .files
.iter() .iter()
{ {
file.reconcile_snapshot_changes().unwrap(); file.persist_stage().unwrap();
println!( println!(
"{} => {:?}", "{} => {:?}",
file.file, file.file,
@ -100,8 +100,9 @@ impl File {
Events::new(&self.file) Events::new(&self.file)
} }
fn stash_staged_changes(&self, stashed: Vec<Task>) -> Result<(), String> { fn persist_stage(&self) -> Result<(), String> {
let snapshot = serde_json::to_string(&stashed).unwrap(); let persisted = self.events()?.snapshot();
let snapshot = serde_json::to_string(&persisted).unwrap();
let snapshot: serde_json::Value = serde_json::from_str(snapshot.as_str()).unwrap(); let snapshot: serde_json::Value = serde_json::from_str(snapshot.as_str()).unwrap();
let stage = self.snapshot()?; let stage = self.snapshot()?;
@ -109,12 +110,13 @@ impl File {
let stage: serde_json::Value = serde_json::from_str(stage.as_str()).unwrap(); let stage: serde_json::Value = serde_json::from_str(stage.as_str()).unwrap();
let patches = json_patch::diff(&snapshot, &stage); let patches = json_patch::diff(&snapshot, &stage);
let deltas: Vec<Delta> = patches.iter() let deltas: Vec<Delta> = patches
.iter()
.map(|patch| patch.clone()) .map(|patch| patch.clone())
.map(|patch| Delta::now(patch.clone())) .map(|patch| Delta::now(patch.clone()))
.collect(); .collect();
for delta in deltas.iter() { for delta in deltas.iter() {
self.append(serde_json::to_string(delta).unwrap())?; self.append(delta.clone())?;
} }
Ok(()) Ok(())
} }
@ -146,17 +148,23 @@ impl File {
fn append(&self, delta: Delta) -> Result<(), String> { fn append(&self, delta: Delta) -> Result<(), String> {
use std::fs::OpenOptions; use std::fs::OpenOptions;
let hostname = gethostname::gethostname(); let hostname = gethostname::gethostname();
let log = format!("{}{}", Events::log_prefix(&self.file), gethostname::gethostname().into_string().unwrap()); assert!(hostname.len() > 0, "empty hostname");
let mut file = match OpenOptions::new().write(true).append(true).open(&log) { let log = format!(
"{}{}",
Events::log_prefix(&self.file),
hostname.into_string().unwrap()
);
let mut file = match OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&log)
{
Ok(f) => Ok(f), Ok(f) => Ok(f),
Err(msg) => Err(format!( Err(msg) => Err(format!("failed to open {} for appending: {}", &log, msg)),
"failed to open {} for appending: {}",
&self.file, msg
)),
}?; }?;
let line = serde_json::to_string(&delta).unwrap(); let line = serde_json::to_string(&delta).unwrap();
match match writeln!(file, "{}", line) {
writeln!(file, "{}", line) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(msg) => Err(format!("failed to append: {}", msg)), Err(msg) => Err(format!("failed to append: {}", msg)),
} }
@ -178,7 +186,15 @@ impl Delta {
} }
fn now(patch: json_patch::PatchOperation) -> Delta { fn now(patch: json_patch::PatchOperation) -> Delta {
Self::new(patch, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs().try_into().unwrap()) Self::new(
patch,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.try_into()
.unwrap(),
)
} }
} }
@ -201,10 +217,24 @@ impl Events {
Err(msg) => Err(format!("failed to read dir {}: {}", Self::dir(&file), msg)), Err(msg) => Err(format!("failed to read dir {}: {}", Self::dir(&file), msg)),
}?; }?;
let mut result = vec![]; let mut result: Vec<Delta> = vec![];
for log in logs.iter() { for log in logs.iter() {
panic!("{:?}", log); match std::fs::File::open(&log) {
Ok(f) => {
for line in std::io::BufReader::new(f).lines() {
let line = line.unwrap();
let delta = match serde_json::from_str(&line) {
Ok(v) => Ok(v),
Err(msg) => Err(format!("failed to parse line {}: {}", &line, msg)),
}?;
result.push(delta);
} }
Ok(())
}
Err(msg) => Err(format!("failed to read {}: {}", &log, msg)),
}?;
}
result.sort_by(|a, b| a.ts.cmp(&b.ts));
Ok(Events(result)) Ok(Events(result))
} }

View File

@ -0,0 +1 @@
{"ts":1762880643,"patch":{"op":"replace","path":"","value":["read; https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks","pglogical vs ha\n\n# api.git#breel/keys-620-pglogical-always-set-cr/2-user-survives-cr\n$ mise run pulsegres-new ^logical/toggl\n","drive; VERIFY spoc posts daily summary w/ unresolved","drive; VERIFY spoc refreshes summary w/ thread comment contianing 'refresh'","637; reconcile deploy if replicas wrong; https://github.com/renderinc/api/pull/26540/files","https://linear.app/render-com/issue/KEYS-633/add-3-when-max-connections-overridden-for-3-superuser-connections","https://linear.app/render-com/issue/KEYS-637/billing-resume-should-1-unsuspend-pg-in-cloudsql-2-unsuspend-pg-in-cr","https://linear.app/render-com/issue/KEYS-638/pgoperator-generates-new-ha-patroni-cert-every-reconcile-no-matter","pg; how2partition; https://renderinc.slack.com/archives/C0319NYCSSG/p1756357545556659?thread_ts=1756357467.613369&cid=C0319NYCSSG","pitr; backup purge cronjob for PL types","pg11 pgbackup doesnt write to envsetting mucked env key","incident io; teach spocbotvr to read slacks","userdb to internal; peer packages can use internal as userdb","fcr; cannot pitr because pgbackrest doesnt know wal spans thus pgexporter and friends cant know pitr works","etcd statefulset of 1 (for no random podname, no conflict, k8s ensures pod replace)\npatroni always\n","maher; https://slab.render.com/posts/hopes-and-dreams-blegf8fx#hdsyt-valkey-bundle","maher; shadow lizhi pm loops","maher; get more interviewers","maher; get concrete career and project plans so i can get promo in 2y; no manager to advocate","read; https://trychroma.com/engineering/wal3","read; https://github.com/renderinc/dashboard/pull/8883","read; https://litestream.io/getting-started/","kr\nto del gcloud old key\nie https://console.cloud.google.com/iam-admin/serviceaccounts/details/104206017956912104938/keys?hl=en&project=render-prod\n",{"subtasks":["","pitr\nhttps://slab.render.com/posts/pitr-as-a-service-health-abvnqx11\nmore aggressive alert autotune backup cores\nmore aggressive alert on MOAR backup cores\ncreate alert autotune archive-push cores\ncreate alert MOAR archive-push cores\n","cr; frontend","cr; cli.git","cr; public-api-schema.git; https://github.com/renderinc/public-api-schema/pull/407 STILL NEED EVENTS","cr; website.git","cr; changelog","ops; pgproxy rate limits 50ps 100burst; https://github.com/renderinc/dbproxy/pull/91","2873; no conn patroni if upgradeInProgressWithoutHA; https://github.com/renderinc/api/pull/26328","2733; only EnvSettings; https://github.com/renderinc/api/pull/25322/files","pg18; after cred rotation works, re enable e2e","655; pg18; pub api sch; https://github.com/renderinc/public-api-schema/pull/421","655; pg18; go generate pub api sch; https://github.com/renderinc/api/pull/26694","663; das; show status in /info; https://github.com/renderinc/dashboard/pull/9616","664; pg18; go gen terraform; https://github.com/renderinc/api/pull/26701","664; pg18; ga; push terraform.git#breel/keys-664-pg18","656; pg18; website; https://github.com/renderinc/website/pull/985/files","663; das; note disk cannot decrease even if autoscaled; https://github.com/renderinc/dashboard/pull/9621","pulsegres; pls let me keep my test emails; https://github.com/renderinc/api/pull/26741","pgup; restore view owner; https://github.com/renderinc/api/pull/26814","pgup; resync if missing resync; https://github.com/renderinc/api/pull/26817","pgup; replicas use $RESYNC; https://github.com/renderinc/api/pull/26878"],"todo":"blocked"}]}}