refactor out cronning
parent
14e80ac2c3
commit
f7f44d6615
|
|
@ -4,34 +4,11 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"show-rss/src/asses"
|
||||
"time"
|
||||
"show-rss/src/cron"
|
||||
)
|
||||
|
||||
func Main(ctx context.Context) error {
|
||||
next, err := asses.Next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c := time.NewTicker(3 * time.Hour)
|
||||
defer c.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
case <-time.After(time.Until(next)):
|
||||
}
|
||||
|
||||
if err := One(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
next, err = asses.Next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
return cron.Cron(ctx, asses.Next, One)
|
||||
}
|
||||
|
||||
func One(ctx context.Context) error {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"show-rss/src/cron"
|
||||
"show-rss/src/feeds"
|
||||
"show-rss/src/webhooks"
|
||||
"strings"
|
||||
|
|
@ -16,19 +17,7 @@ import (
|
|||
)
|
||||
|
||||
func Main(ctx context.Context) error {
|
||||
c := time.NewTicker(time.Minute)
|
||||
defer c.Stop()
|
||||
for {
|
||||
if err := One(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
return cron.Cron(ctx, feeds.Next, One)
|
||||
}
|
||||
|
||||
func One(ctx context.Context) error {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error {
|
||||
n, err := next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c := time.NewTicker(3 * time.Minute)
|
||||
defer c.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
n, err = next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case <-time.After(time.Until(n)):
|
||||
if err := do(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
|
@ -42,6 +42,24 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func Next(ctx context.Context) (time.Time, error) {
|
||||
result := time.Now().Add(3 * time.Minute)
|
||||
err := ForEach(ctx, func(f Feed) error {
|
||||
next, err := f.Next()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if next.After(result) {
|
||||
return nil
|
||||
}
|
||||
|
||||
result = next
|
||||
return nil
|
||||
})
|
||||
return result, err
|
||||
}
|
||||
|
||||
func ForEach(ctx context.Context, cb func(Feed) error) error {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -31,7 +31,11 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
|||
if !feed.Entry.Deleted.IsZero() {
|
||||
return false, nil
|
||||
}
|
||||
next, err := feed.Next()
|
||||
return time.Now().After(next), err
|
||||
}
|
||||
|
||||
func (feed Feed) Next() (time.Time, error) {
|
||||
schedule, err := cron.NewParser(
|
||||
cron.SecondOptional |
|
||||
cron.Minute |
|
||||
|
|
@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
|||
cron.Descriptor,
|
||||
).Parse(feed.Version.Cron)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
||||
return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
||||
}
|
||||
next := schedule.Next(feed.Execution.Executed)
|
||||
return time.Now().After(next), nil
|
||||
return schedule.Next(feed.Execution.Executed), nil
|
||||
}
|
||||
|
||||
func (feed Feed) Fetch(ctx context.Context) (Items, error) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue