Compare commits

...

5 Commits

Author SHA1 Message Date
Bel LaPointe 053071f4be hrm an enqueu eoperator is hard because a cron fires many times but looks removed but isnt and hrm 2025-12-02 17:34:04 -07:00
Bel LaPointe eccaa06d98 can add, add past, add future 2025-12-02 17:30:29 -07:00
Bel LaPointe ee9377d6da reconcile on add too 2025-12-02 17:21:51 -07:00
Bel LaPointe 7da6aa8ae9 still unsure about triggered vs snapshot time... 2025-12-02 17:17:31 -07:00
Bel LaPointe a5553d75f4 add w schedule via ts after now 2025-12-02 17:03:06 -07:00
3 changed files with 65 additions and 90 deletions

View File

@ -8,28 +8,27 @@ fn main() {
let files = flags.files().expect("failed to files");
if !flags.dry_run {
for file in files.files.iter() {
file.persist_stage()
.expect("failed to persist staged changes to log file");
file.stage_persisted().expect("failed to stage log files");
}
files.reconcile().expect("failed to reconcile");
if let Some(add) = flags.add {
let task = match flags.add_schedule {
let task = match flags.add_schedule.clone() {
None => Task(serde_yaml::Value::String(add)),
Some(add_schedule) => {
let mut m = serde_yaml::Mapping::new();
m.insert("schedule".into(), add_schedule.into());
m.insert("todo".into(), add.into());
m.insert("do".into(), add.into());
Task(serde_yaml::Value::Mapping(m))
}
};
let now = Delta::now_time();
files.files[0]
.append(Delta::add(task))
.append(match task.next_due(now.clone()) {
None => Delta::add(task),
Some(due) => Delta::add_at(task, if due > now { due } else { now }),
})
.expect("failed to add");
files.files[0]
.stage_persisted()
.expect("failed to stage added");
files.reconcile().expect("failed to reconcile");
}
}
@ -143,6 +142,14 @@ impl Files {
files: files.into_iter().map(|x| File::new(&x)).collect(),
}
}
pub fn reconcile(&self) -> Result<(), String> {
for file in self.files.iter() {
file.persist_stage()?;
file.stage_persisted()?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
@ -198,7 +205,7 @@ impl File {
let now = Delta::now_time();
if let Some(due) = before.next_due(now.clone()) {
if due >= now {
self.append(Delta::add_at(before.clone(), now))?;
self.append(Delta::add_at(before.clone(), due))?;
}
}
}
@ -351,12 +358,7 @@ mod test_file {
"{:?}",
f.events().unwrap().snapshot().unwrap(),
);
assert_eq!(
8,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(8, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "[]");
});
@ -414,7 +416,9 @@ mod test_file {
{{"ts":2, "op":"Add", "task": "old"}}
{{"ts":2, "op":"Snapshot", "task": null, "tasks": ["removed", "old"]}}
{{"ts":{}, "op":"Add", "task": "persisted but not snapshotted"}}
{{"ts":{}, "op":"Add", "task": "doesnt exist yet"}}
"#,
Delta::now_time() - 50,
Delta::now_time() + 50,
)
.as_str(),
@ -422,14 +426,14 @@ mod test_file {
let f = File::new(&d.path().join("plain").to_str().unwrap().to_string());
assert_eq!(4, f.events().unwrap().0.len());
assert_eq!(5, f.events().unwrap().0.len());
assert_eq!(2, f.stage().unwrap().len());
tests::file_contains(&d, "plain", "old");
tests::file_contains(&d, "plain", "new");
f.persist_stage().unwrap();
assert_eq!(
6,
7,
f.events().unwrap().0.len(),
"events: {:?}",
f.events().unwrap()
@ -438,12 +442,7 @@ mod test_file {
tests::file_contains(&d, "plain", "new");
f.stage_persisted().unwrap();
assert_eq!(
7,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(8, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(3, f.stage().unwrap().len(), "{:?}", f.stage().unwrap());
tests::file_contains(&d, "plain", "new");
tests::file_contains(&d, "plain", "old");
@ -462,30 +461,15 @@ mod test_file {
let task = Task(serde_yaml::Value::Mapping(m));
f.append(Delta::add(task)).unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(1, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
f.persist_stage().unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(1, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
f.stage_persisted().unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(1, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
});
}
@ -499,45 +483,24 @@ mod test_file {
".plain.host",
format!(
r#"
{{"ts":1, "op":"Add", "task": "stage"}}
{{"ts":2, "op":"Snapshot", "task": null, "tasks": ["removed", "old"]}}
{{"ts":3, "op":"Add", "task": "scheduled add for after snapshot"}}
{{"ts":2, "op":"Snapshot", "task": null, "tasks": ["removed"]}}
"#,
Delta::now_time() + 50,
)
.as_str(),
);
let f = File::new(&d.path().join("plain").to_str().unwrap().to_string());
let mut m = serde_yaml::Mapping::new();
m.insert("schedule".into(), "2036-01-02".into());
let task = Task(serde_yaml::Value::Mapping(m));
f.append(Delta::add(task)).unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
assert_eq!(2, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(1, f.stage().unwrap().len(), "{:?}", f.stage());
f.persist_stage().unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
assert_eq!(4, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(1, f.stage().unwrap().len(), "{:?}", f.stage());
f.stage_persisted().unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
assert_eq!(5, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(2, f.stage().unwrap().len(), "{:?}", f.stage());
});
}
@ -552,12 +515,7 @@ mod test_file {
let task = Task(serde_yaml::Value::Mapping(m));
f.append(Delta::add(task)).unwrap();
assert_eq!(
1,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(1, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(0, f.stage().unwrap().len(), "{:?}", f.stage());
f.persist_stage().unwrap();
@ -565,7 +523,7 @@ mod test_file {
1,
f.events().unwrap().0.len(),
"events after 1 add scheduled: {:?}",
f.events().unwrap().0
f.events().unwrap()
);
assert_eq!(
1,
@ -582,12 +540,7 @@ mod test_file {
);
f.stage_persisted().unwrap();
assert_eq!(
2,
f.events().unwrap().0.len(),
"{:?}",
f.events().unwrap().0
);
assert_eq!(2, f.events().unwrap().0.len(), "{:?}", f.events().unwrap());
assert_eq!(1, f.stage().unwrap().len(), "{:?}", f.stage());
});
}
@ -660,10 +613,14 @@ impl Task {
}
}
pub fn must_next_due(&self, after: u64) -> u64 {
self.next_due(after).unwrap_or(1)
}
pub fn next_due(&self, after: u64) -> Option<u64> {
match self.schedule() {
Some(schedule) => self.parse_schedule_next(schedule, after),
None => Some(1),
None => None,
}
}
@ -741,7 +698,7 @@ mod test_task {
fn test_unscheduled() {
let task = Task(serde_yaml::Value::String("hello world".to_string()));
assert_eq!(None, task.schedule());
assert_eq!(Some(1 as u64), task.next_due(100));
assert_eq!(1 as u64, task.must_next_due(100));
assert!(task._due(100));
}
@ -796,9 +753,19 @@ mod test_task {
}
}
#[derive(Debug, Clone)]
#[derive(Clone)]
struct Events(Vec<Delta>);
impl std::fmt::Debug for Events {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut arr = vec![];
for i in self.0.iter() {
arr.push(format!("{:?}", i.clone()));
}
write!(f, "[\n {}\n]", arr.join("\n "))
}
}
impl Events {
pub fn new(file: &String) -> Result<Events, String> {
let logs = match std::fs::read_dir(Self::dir(&file)) {
@ -877,7 +844,7 @@ impl Events {
fn snapshot(&self) -> Result<Vec<Task>, String> {
let mut result = vec![];
for event in self.0.iter() {
for event in self.0.iter().filter(|t| t.ts <= Delta::now_time()) {
match &event.op {
Op::Add => match event.task.next_due(event.ts) {
Some(next_due) => match next_due <= Delta::now_time() {

View File

@ -25,3 +25,8 @@
{"ts":1762915973,"op":"Add","task":"hi","tasks":null}
{"ts":1764635053,"op":"Snapshot","task":null,"tasks":["read; https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks","pglogical vs ha\n\n# api.git#breel/keys-620-pglogical-always-set-cr/2-user-survives-cr\n$ mise run pulsegres-new ^logical/toggl\n","drive; VERIFY spoc posts daily summary w/ unresolved","drive; VERIFY spoc refreshes summary w/ thread comment contianing 'refresh'","637; reconcile deploy if replicas wrong; https://github.com/renderinc/api/pull/26540/files","https://linear.app/render-com/issue/KEYS-633/add-3-when-max-connections-overridden-for-3-superuser-connections","https://linear.app/render-com/issue/KEYS-637/billing-resume-should-1-unsuspend-pg-in-cloudsql-2-unsuspend-pg-in-cr","https://linear.app/render-com/issue/KEYS-638/pgoperator-generates-new-ha-patroni-cert-every-reconcile-no-matter","pg; how2partition; https://renderinc.slack.com/archives/C0319NYCSSG/p1756357545556659?thread_ts=1756357467.613369&cid=C0319NYCSSG","pitr; backup purge cronjob for PL types","pg11 pgbackup doesnt write to envsetting mucked env key","incident io; teach spocbotvr to read slacks","userdb to internal; peer packages can use internal as userdb","fcr; cannot pitr because pgbackrest doesnt know wal spans thus pgexporter and friends cant know pitr works","etcd statefulset of 1 (for no random podname, no conflict, k8s ensures pod replace)\npatroni always\n","maher; https://slab.render.com/posts/hopes-and-dreams-blegf8fx#hdsyt-valkey-bundle","maher; shadow lizhi pm loops","maher; get more interviewers","maher; get concrete career and project plans so i can get promo in 2y; no manager to advocate","read; https://trychroma.com/engineering/wal3","read; https://github.com/renderinc/dashboard/pull/8883","read; https://litestream.io/getting-started/","kr\nto del gcloud old key\nie https://console.cloud.google.com/iam-admin/serviceaccounts/details/104206017956912104938/keys?hl=en&project=render-prod\n",{"subtasks":["","pitr\nhttps://slab.render.com/posts/pitr-as-a-service-health-abvnqx11\nmore aggressive alert autotune backup cores\nmore aggressive alert on MOAR backup cores\ncreate alert autotune archive-push cores\ncreate alert MOAR archive-push cores\n","cr; frontend","cr; cli.git","cr; public-api-schema.git; https://github.com/renderinc/public-api-schema/pull/407 STILL NEED EVENTS","cr; website.git","cr; changelog","ops; pgproxy rate limits 50ps 100burst; https://github.com/renderinc/dbproxy/pull/91","2873; no conn patroni if upgradeInProgressWithoutHA; https://github.com/renderinc/api/pull/26328","2733; only EnvSettings; https://github.com/renderinc/api/pull/25322/files","pg18; after cred rotation works, re enable e2e","655; pg18; pub api sch; https://github.com/renderinc/public-api-schema/pull/421","655; pg18; go generate pub api sch; https://github.com/renderinc/api/pull/26694","663; das; show status in /info; https://github.com/renderinc/dashboard/pull/9616","664; pg18; go gen terraform; https://github.com/renderinc/api/pull/26701","664; pg18; ga; push terraform.git#breel/keys-664-pg18","656; pg18; website; https://github.com/renderinc/website/pull/985/files","663; das; note disk cannot decrease even if autoscaled; https://github.com/renderinc/dashboard/pull/9621","pulsegres; pls let me keep my test emails; https://github.com/renderinc/api/pull/26741","pgup; restore view owner; https://github.com/renderinc/api/pull/26814","pgup; resync if missing resync; https://github.com/renderinc/api/pull/26817","pgup; replicas use $RESYNC; https://github.com/renderinc/api/pull/26878"],"todo":"blocked"},"hi"]}
{"ts":1764636274,"op":"Add","task":{"schedule":"2026-01-01","todo":"not yet due"},"tasks":null}
{"ts":1764721753,"op":"Add","task":"just_add","tasks":null}
{"ts":1764721753,"op":"Snapshot","task":null,"tasks":["read; https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks","pglogical vs ha\n\n# api.git#breel/keys-620-pglogical-always-set-cr/2-user-survives-cr\n$ mise run pulsegres-new ^logical/toggl\n","drive; VERIFY spoc posts daily summary w/ unresolved","drive; VERIFY spoc refreshes summary w/ thread comment contianing 'refresh'","637; reconcile deploy if replicas wrong; https://github.com/renderinc/api/pull/26540/files","https://linear.app/render-com/issue/KEYS-633/add-3-when-max-connections-overridden-for-3-superuser-connections","https://linear.app/render-com/issue/KEYS-637/billing-resume-should-1-unsuspend-pg-in-cloudsql-2-unsuspend-pg-in-cr","https://linear.app/render-com/issue/KEYS-638/pgoperator-generates-new-ha-patroni-cert-every-reconcile-no-matter","pg; how2partition; https://renderinc.slack.com/archives/C0319NYCSSG/p1756357545556659?thread_ts=1756357467.613369&cid=C0319NYCSSG","pitr; backup purge cronjob for PL types","pg11 pgbackup doesnt write to envsetting mucked env key","incident io; teach spocbotvr to read slacks","userdb to internal; peer packages can use internal as userdb","fcr; cannot pitr because pgbackrest doesnt know wal spans thus pgexporter and friends cant know pitr works","etcd statefulset of 1 (for no random podname, no conflict, k8s ensures pod replace)\npatroni always\n","maher; https://slab.render.com/posts/hopes-and-dreams-blegf8fx#hdsyt-valkey-bundle","maher; shadow lizhi pm loops","maher; get more interviewers","maher; get concrete career and project plans so i can get promo in 2y; no manager to advocate","read; https://trychroma.com/engineering/wal3","read; https://github.com/renderinc/dashboard/pull/8883","read; https://litestream.io/getting-started/","kr\nto del gcloud old key\nie https://console.cloud.google.com/iam-admin/serviceaccounts/details/104206017956912104938/keys?hl=en&project=render-prod\n",{"subtasks":["","pitr\nhttps://slab.render.com/posts/pitr-as-a-service-health-abvnqx11\nmore aggressive alert autotune backup cores\nmore aggressive alert on MOAR backup cores\ncreate alert autotune archive-push cores\ncreate alert MOAR archive-push cores\n","cr; frontend","cr; cli.git","cr; public-api-schema.git; https://github.com/renderinc/public-api-schema/pull/407 STILL NEED EVENTS","cr; website.git","cr; changelog","ops; pgproxy rate limits 50ps 100burst; https://github.com/renderinc/dbproxy/pull/91","2873; no conn patroni if upgradeInProgressWithoutHA; https://github.com/renderinc/api/pull/26328","2733; only EnvSettings; https://github.com/renderinc/api/pull/25322/files","pg18; after cred rotation works, re enable e2e","655; pg18; pub api sch; https://github.com/renderinc/public-api-schema/pull/421","655; pg18; go generate pub api sch; https://github.com/renderinc/api/pull/26694","663; das; show status in /info; https://github.com/renderinc/dashboard/pull/9616","664; pg18; go gen terraform; https://github.com/renderinc/api/pull/26701","664; pg18; ga; push terraform.git#breel/keys-664-pg18","656; pg18; website; https://github.com/renderinc/website/pull/985/files","663; das; note disk cannot decrease even if autoscaled; https://github.com/renderinc/dashboard/pull/9621","pulsegres; pls let me keep my test emails; https://github.com/renderinc/api/pull/26741","pgup; restore view owner; https://github.com/renderinc/api/pull/26814","pgup; resync if missing resync; https://github.com/renderinc/api/pull/26817","pgup; replicas use $RESYNC; https://github.com/renderinc/api/pull/26878"],"todo":"blocked"},"hi","just_add"]}
{"ts":1764721753,"op":"Add","task":{"schedule":"2000-01-01","do":"add_past"},"tasks":null}
{"ts":1764721753,"op":"Snapshot","task":null,"tasks":["read; https://topicpartition.io/blog/postgres-pubsub-queue-benchmarks","pglogical vs ha\n\n# api.git#breel/keys-620-pglogical-always-set-cr/2-user-survives-cr\n$ mise run pulsegres-new ^logical/toggl\n","drive; VERIFY spoc posts daily summary w/ unresolved","drive; VERIFY spoc refreshes summary w/ thread comment contianing 'refresh'","637; reconcile deploy if replicas wrong; https://github.com/renderinc/api/pull/26540/files","https://linear.app/render-com/issue/KEYS-633/add-3-when-max-connections-overridden-for-3-superuser-connections","https://linear.app/render-com/issue/KEYS-637/billing-resume-should-1-unsuspend-pg-in-cloudsql-2-unsuspend-pg-in-cr","https://linear.app/render-com/issue/KEYS-638/pgoperator-generates-new-ha-patroni-cert-every-reconcile-no-matter","pg; how2partition; https://renderinc.slack.com/archives/C0319NYCSSG/p1756357545556659?thread_ts=1756357467.613369&cid=C0319NYCSSG","pitr; backup purge cronjob for PL types","pg11 pgbackup doesnt write to envsetting mucked env key","incident io; teach spocbotvr to read slacks","userdb to internal; peer packages can use internal as userdb","fcr; cannot pitr because pgbackrest doesnt know wal spans thus pgexporter and friends cant know pitr works","etcd statefulset of 1 (for no random podname, no conflict, k8s ensures pod replace)\npatroni always\n","maher; https://slab.render.com/posts/hopes-and-dreams-blegf8fx#hdsyt-valkey-bundle","maher; shadow lizhi pm loops","maher; get more interviewers","maher; get concrete career and project plans so i can get promo in 2y; no manager to advocate","read; https://trychroma.com/engineering/wal3","read; https://github.com/renderinc/dashboard/pull/8883","read; https://litestream.io/getting-started/","kr\nto del gcloud old key\nie https://console.cloud.google.com/iam-admin/serviceaccounts/details/104206017956912104938/keys?hl=en&project=render-prod\n",{"subtasks":["","pitr\nhttps://slab.render.com/posts/pitr-as-a-service-health-abvnqx11\nmore aggressive alert autotune backup cores\nmore aggressive alert on MOAR backup cores\ncreate alert autotune archive-push cores\ncreate alert MOAR archive-push cores\n","cr; frontend","cr; cli.git","cr; public-api-schema.git; https://github.com/renderinc/public-api-schema/pull/407 STILL NEED EVENTS","cr; website.git","cr; changelog","ops; pgproxy rate limits 50ps 100burst; https://github.com/renderinc/dbproxy/pull/91","2873; no conn patroni if upgradeInProgressWithoutHA; https://github.com/renderinc/api/pull/26328","2733; only EnvSettings; https://github.com/renderinc/api/pull/25322/files","pg18; after cred rotation works, re enable e2e","655; pg18; pub api sch; https://github.com/renderinc/public-api-schema/pull/421","655; pg18; go generate pub api sch; https://github.com/renderinc/api/pull/26694","663; das; show status in /info; https://github.com/renderinc/dashboard/pull/9616","664; pg18; go gen terraform; https://github.com/renderinc/api/pull/26701","664; pg18; ga; push terraform.git#breel/keys-664-pg18","656; pg18; website; https://github.com/renderinc/website/pull/985/files","663; das; note disk cannot decrease even if autoscaled; https://github.com/renderinc/dashboard/pull/9621","pulsegres; pls let me keep my test emails; https://github.com/renderinc/api/pull/26741","pgup; restore view owner; https://github.com/renderinc/api/pull/26814","pgup; resync if missing resync; https://github.com/renderinc/api/pull/26817","pgup; replicas use $RESYNC; https://github.com/renderinc/api/pull/26878"],"todo":"blocked"},"hi","just_add",{"schedule":"2000-01-01","do":"add_past"}]}
{"ts":2051222400,"op":"Add","task":{"schedule":"2035-01-01","do":"add_future"},"tasks":null}

View File

@ -61,4 +61,7 @@
- pgup; replicas use $RESYNC; https://github.com/renderinc/api/pull/26878
todo: blocked
- hi
- just_add
- schedule: 2000-01-01
do: add_past