Compare commits

...

6 Commits

Author SHA1 Message Date
Bel LaPointe
bfbc2b6e7f impl asses skips if cksum matches or a lotta time passes 2025-05-08 15:48:08 -06:00
Bel LaPointe
6b51a0c0a3 impl asses.checkLast(), .check() 2025-05-08 15:40:53 -06:00
Bel LaPointe
137fdf07ed stub cmd.asses 2025-05-08 15:30:51 -06:00
Bel LaPointe
64c4d1908a rm unused 2025-05-08 15:04:44 -06:00
Bel LaPointe
aad5959350 rm unused 2025-05-08 15:04:37 -06:00
Bel LaPointe
f7f44d6615 refactor out cronning 2025-05-08 15:04:04 -06:00
10 changed files with 242 additions and 67 deletions

View File

@@ -18,9 +18,9 @@ func Next(ctx context.Context) (time.Time, error) {
} }
result, err := db.QueryOne[Did](ctx, ` result, err := db.QueryOne[Did](ctx, `
SELECT executed_at AS "Did" SELECT executed_at AS "Did"
FROM "asses.executions" FROM "asses.executions"
ORDER BY executed_at DESC ORDER BY executed_at DESC
LIMIT 1 LIMIT 1
`) `)
return result.Did, err return result.Did, err
} }
@@ -33,11 +33,49 @@ func Record(ctx context.Context) error {
return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now()) return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now())
} }
type last struct {
T time.Time `json:"checked_at"`
Cksum string `json:"cksum"`
}
func checkLast(ctx context.Context, p string) (last, error) {
if err := initDB(ctx); err != nil {
return last{}, err
}
return db.QueryOne[last](ctx, `
SELECT checked_at, cksum
FROM "asses.checks"
WHERE p=$1
`, p)
}
func checked(ctx context.Context, p, cksum string) error {
if err := initDB(ctx); err != nil {
return err
}
return db.Exec(ctx, `
INSERT INTO "asses.checks"
(p, checked_at, cksum)
VALUES ($1, $2, $3)
ON CONFLICT DO UPDATE
SET checked_at=$2, cksum=$3
WHERE p=$1
`, p, time.Now(), cksum)
}
func initDB(ctx context.Context) error { func initDB(ctx context.Context) error {
return db.InitializeSchema(ctx, "asses", []string{ return db.InitializeSchema(ctx, "asses", []string{
`CREATE TABLE "asses.executions" ( `CREATE TABLE "asses.executions" (
id TEXT PRIMARY KEY NOT NULL, id TEXT PRIMARY KEY NOT NULL,
executed_at TIMESTAMP NOT NULL executed_at TIMESTAMP NOT NULL
)`, )`,
`CREATE TABLE "asses.checks" (
p TEXT PRIMARY KEY NOT NULL,
checked_at TIMESTAMP NOT NULL,
cksum TEXT NOT NULL
)`,
}) })
} }

View File

@@ -0,0 +1,29 @@
package asses_test
import (
"context"
"show-rss/src/asses"
"show-rss/src/db"
"testing"
"time"
)
func TestNextRecord(t *testing.T) {
ctx := db.Test(t, context.Background())
if v, err := asses.Next(ctx); err != nil {
t.Fatal(err)
} else if zero := v.IsZero(); !zero {
t.Fatal(v)
}
if err := asses.Record(ctx); err != nil {
t.Fatal(err)
}
if v, err := asses.Next(ctx); err != nil {
t.Fatal(err)
} else if since := time.Since(v); since > time.Minute {
t.Fatal(since)
}
}

View File

@@ -1,29 +1,29 @@
package asses_test package asses
import ( import (
"context" "context"
"show-rss/src/asses"
"show-rss/src/db" "show-rss/src/db"
"testing" "testing"
"time"
) )
func TestNextRecord(t *testing.T) { func TestLast(t *testing.T) {
ctx := db.Test(t, context.Background()) ctx := db.Test(t, context.Background())
if v, err := asses.Next(ctx); err != nil { if last, err := checkLast(ctx, "p"); err != nil {
t.Fatal(err) t.Fatal(err)
} else if zero := v.IsZero(); !zero { } else if !last.T.IsZero() || last.Cksum != "" {
t.Fatal(v) t.Fatal(last)
} }
if err := asses.Record(ctx); err != nil { if err := checked(ctx, "p", "cksum"); err != nil {
t.Fatal(err)
} else if err := checked(ctx, "p", "cksum"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if v, err := asses.Next(ctx); err != nil { if last, err := checkLast(ctx, "p"); err != nil {
t.Fatal(err) t.Fatal(err)
} else if since := time.Since(v); since > time.Minute { } else if last.T.IsZero() || last.Cksum != "cksum" {
t.Fatal(since) t.Fatal(last)
} }
} }

43
src/asses/one.go Normal file
View File

@@ -0,0 +1,43 @@
package asses
import (
"context"
"crypto/md5"
"encoding/base64"
"io"
"math/rand"
"os"
"time"
)
func One(ctx context.Context, p string) error {
if last, err := checkLast(ctx, p); err != nil {
return err
} else if last.T.IsZero() {
} else if cksum, err := cksum(ctx, p); err != nil {
return err
} else if cksum != last.Cksum {
} else if time.Since(last.T) < 20+time.Duration(rand.Int()%10)*24*time.Hour {
return nil
}
return io.EOF
cksum, err := cksum(ctx, p)
if err != nil {
return err
}
return checked(ctx, p, cksum)
}
func cksum(ctx context.Context, p string) (string, error) {
f, err := os.Open(p)
if err != nil {
return "", err
}
defer f.Close()
hasher := md5.New()
_, err = io.Copy(hasher, f)
return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err
}

View File

@@ -2,38 +2,51 @@ package asses
import ( import (
"context" "context"
"io" "io/fs"
"path"
"path/filepath"
"show-rss/src/asses" "show-rss/src/asses"
"time" "show-rss/src/cron"
) )
var rootDs = []string{
"/volume1/video/Bel/Anime",
"/volume1/video/QT/TV",
}
type CB func(context.Context, string) error
func Main(ctx context.Context) error { func Main(ctx context.Context) error {
next, err := asses.Next(ctx) return cron.Cron(ctx, asses.Next, One)
if err != nil {
return err
}
c := time.NewTicker(3 * time.Hour)
defer c.Stop()
for {
select {
case <-ctx.Done():
case <-c.C:
case <-time.After(time.Until(next)):
}
if err := One(ctx); err != nil {
return err
}
next, err = asses.Next(ctx)
if err != nil {
return err
}
}
return ctx.Err()
} }
func One(ctx context.Context) error { func One(ctx context.Context) error {
return io.EOF return OneWith(ctx, rootDs, asses.One)
}
func OneWith(ctx context.Context, rootds []string, cb CB) error {
for _, rootd := range rootds {
if err := one(ctx, rootd, cb); err != nil {
return err
}
}
return nil
}
func one(ctx context.Context, rootd string, cb CB) error {
return filepath.WalkDir(rootd, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if path.Ext(p) != ".mkv" {
return nil
}
return cb(ctx, p)
})
} }

View File

@@ -1,8 +1,28 @@
package asses_test package asses_test
import ( import (
"context"
"os"
"path"
"show-rss/src/cmd/asses"
"show-rss/src/db"
"testing" "testing"
) )
func TestOne(t *testing.T) { func TestOneWith(t *testing.T) {
ctx := db.Test(t, context.Background())
d := t.TempDir()
os.MkdirAll(path.Join(d, "a", "b", "c"), os.ModePerm)
os.WriteFile(path.Join(d, "a", "f.mkv"), []byte{}, os.ModePerm)
if err := asses.OneWith(ctx, []string{d}, func(_ context.Context, p string) error {
t.Logf("%q", p)
if _, err := os.Stat(p); err != nil {
return err
}
return nil
}); err != nil {
t.Fatal(err)
}
} }

View File

@@ -8,6 +8,7 @@ import (
"log" "log"
"net/http" "net/http"
"net/url" "net/url"
"show-rss/src/cron"
"show-rss/src/feeds" "show-rss/src/feeds"
"show-rss/src/webhooks" "show-rss/src/webhooks"
"strings" "strings"
@@ -16,19 +17,7 @@ import (
) )
func Main(ctx context.Context) error { func Main(ctx context.Context) error {
c := time.NewTicker(time.Minute) return cron.Cron(ctx, feeds.Next, One)
defer c.Stop()
for {
if err := One(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
case <-c.C:
}
}
return ctx.Err()
} }
func One(ctx context.Context) error { func One(ctx context.Context) error {

31
src/cron/cron.go Normal file
View File

@@ -0,0 +1,31 @@
package cron
import (
"context"
"time"
)
func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error {
n, err := next(ctx)
if err != nil {
return err
}
c := time.NewTicker(3 * time.Minute)
defer c.Stop()
for {
select {
case <-ctx.Done():
case <-c.C:
n, err = next(ctx)
if err != nil {
return err
}
case <-time.After(time.Until(n)):
if err := do(ctx); err != nil {
return err
}
}
}
return ctx.Err()
}

View File

@@ -3,7 +3,6 @@ package feeds
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"net/url" "net/url"
"show-rss/src/db" "show-rss/src/db"
"show-rss/src/server" "show-rss/src/server"
@@ -42,6 +41,24 @@ type (
} }
) )
func Next(ctx context.Context) (time.Time, error) {
result := time.Now().Add(3 * time.Minute)
err := ForEach(ctx, func(f Feed) error {
next, err := f.Next()
if err != nil {
return nil
}
if next.After(result) {
return nil
}
result = next
return nil
})
return result, err
}
func ForEach(ctx context.Context, cb func(Feed) error) error { func ForEach(ctx context.Context, cb func(Feed) error) error {
if err := initDB(ctx); err != nil { if err := initDB(ctx); err != nil {
return err return err
@@ -206,14 +223,6 @@ func Update(ctx context.Context, id string, url, cron, pattern, webhookMethod, w
) )
} }
func (feed Feed) Update(ctx context.Context, url, cron, pattern, tag *string) error {
return io.EOF
}
func (feed Feed) Delete(ctx context.Context) error {
return io.EOF
}
func getEntry(ctx context.Context, id string) (Entry, error) { func getEntry(ctx context.Context, id string) (Entry, error) {
if err := initDB(ctx); err != nil { if err := initDB(ctx); err != nil {
return Entry{}, err return Entry{}, err

View File

@@ -31,7 +31,11 @@ func (feed Feed) ShouldExecute() (bool, error) {
if !feed.Entry.Deleted.IsZero() { if !feed.Entry.Deleted.IsZero() {
return false, nil return false, nil
} }
next, err := feed.Next()
return time.Now().After(next), err
}
func (feed Feed) Next() (time.Time, error) {
schedule, err := cron.NewParser( schedule, err := cron.NewParser(
cron.SecondOptional | cron.SecondOptional |
cron.Minute | cron.Minute |
@@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) {
cron.Descriptor, cron.Descriptor,
).Parse(feed.Version.Cron) ).Parse(feed.Version.Cron)
if err != nil { if err != nil {
return false, fmt.Errorf("illegal cron %q", feed.Version.Cron) return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron)
} }
next := schedule.Next(feed.Execution.Executed) return schedule.Next(feed.Execution.Executed), nil
return time.Now().After(next), nil
} }
func (feed Feed) Fetch(ctx context.Context) (Items, error) { func (feed Feed) Fetch(ctx context.Context) (Items, error) {