From f7f44d66152b82586a5bc30b2023ffa1fe4f3dac Mon Sep 17 00:00:00 2001 From: Bel LaPointe <153096461+breel-render@users.noreply.github.com> Date: Thu, 8 May 2025 15:04:04 -0600 Subject: [PATCH] refactor out cronning --- src/cmd/asses/main.go | 27 ++------------------------- src/cmd/fetch/main.go | 15 ++------------- src/cron/cron.go | 31 +++++++++++++++++++++++++++++++ src/feeds/db.go | 18 ++++++++++++++++++ src/feeds/http.go | 9 ++++++--- 5 files changed, 59 insertions(+), 41 deletions(-) create mode 100644 src/cron/cron.go diff --git a/src/cmd/asses/main.go b/src/cmd/asses/main.go index fac5ea2..8a8dd1b 100644 --- a/src/cmd/asses/main.go +++ b/src/cmd/asses/main.go @@ -4,34 +4,11 @@ import ( "context" "io" "show-rss/src/asses" - "time" + "show-rss/src/cron" ) func Main(ctx context.Context) error { - next, err := asses.Next(ctx) - if err != nil { - return err - } - - c := time.NewTicker(3 * time.Hour) - defer c.Stop() - for { - select { - case <-ctx.Done(): - case <-c.C: - case <-time.After(time.Until(next)): - } - - if err := One(ctx); err != nil { - return err - } - - next, err = asses.Next(ctx) - if err != nil { - return err - } - } - return ctx.Err() + return cron.Cron(ctx, asses.Next, One) } func One(ctx context.Context) error { diff --git a/src/cmd/fetch/main.go b/src/cmd/fetch/main.go index ca845f2..84213cd 100644 --- a/src/cmd/fetch/main.go +++ b/src/cmd/fetch/main.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "net/url" + "show-rss/src/cron" "show-rss/src/feeds" "show-rss/src/webhooks" "strings" @@ -16,19 +17,7 @@ import ( ) func Main(ctx context.Context) error { - c := time.NewTicker(time.Minute) - defer c.Stop() - for { - if err := One(ctx); err != nil { - return err - } - - select { - case <-ctx.Done(): - case <-c.C: - } - } - return ctx.Err() + return cron.Cron(ctx, feeds.Next, One) } func One(ctx context.Context) error { diff --git a/src/cron/cron.go b/src/cron/cron.go new file mode 100644 index 0000000..d08d714 --- /dev/null +++ b/src/cron/cron.go @@ -0,0 +1,31 @@ +package cron + +import ( + "context" + "time" +) + +func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error { + n, err := next(ctx) + if err != nil { + return err + } + + c := time.NewTicker(3 * time.Minute) + defer c.Stop() + for { + select { + case <-ctx.Done(): + case <-c.C: + n, err = next(ctx) + if err != nil { + return err + } + case <-time.After(time.Until(n)): + if err := do(ctx); err != nil { + return err + } + } + } + return ctx.Err() +} diff --git a/src/feeds/db.go b/src/feeds/db.go index 573b8ff..9608ce9 100644 --- a/src/feeds/db.go +++ b/src/feeds/db.go @@ -42,6 +42,24 @@ type ( } ) +func Next(ctx context.Context) (time.Time, error) { + result := time.Now().Add(3 * time.Minute) + err := ForEach(ctx, func(f Feed) error { + next, err := f.Next() + if err != nil { + return nil + } + + if next.After(result) { + return nil + } + + result = next + return nil + }) + return result, err +} + func ForEach(ctx context.Context, cb func(Feed) error) error { if err := initDB(ctx); err != nil { return err diff --git a/src/feeds/http.go b/src/feeds/http.go index 5ea40d2..6c17e5b 100644 --- a/src/feeds/http.go +++ b/src/feeds/http.go @@ -31,7 +31,11 @@ func (feed Feed) ShouldExecute() (bool, error) { if !feed.Entry.Deleted.IsZero() { return false, nil } + next, err := feed.Next() + return time.Now().After(next), err +} +func (feed Feed) Next() (time.Time, error) { schedule, err := cron.NewParser( cron.SecondOptional | cron.Minute | @@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) { cron.Descriptor, ).Parse(feed.Version.Cron) if err != nil { - return false, fmt.Errorf("illegal cron %q", feed.Version.Cron) + return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron) } - next := schedule.Next(feed.Execution.Executed) - return time.Now().After(next), nil + return schedule.Next(feed.Execution.Executed), nil } func (feed Feed) Fetch(ctx context.Context) (Items, error) {