Compare commits
6 Commits
14e80ac2c3
...
bfbc2b6e7f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bfbc2b6e7f | ||
|
|
6b51a0c0a3 | ||
|
|
137fdf07ed | ||
|
|
64c4d1908a | ||
|
|
aad5959350 | ||
|
|
f7f44d6615 |
@@ -33,11 +33,49 @@ func Record(ctx context.Context) error {
|
||||
return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now())
|
||||
}
|
||||
|
||||
type last struct {
|
||||
T time.Time `json:"checked_at"`
|
||||
Cksum string `json:"cksum"`
|
||||
}
|
||||
|
||||
func checkLast(ctx context.Context, p string) (last, error) {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return last{}, err
|
||||
}
|
||||
|
||||
return db.QueryOne[last](ctx, `
|
||||
SELECT checked_at, cksum
|
||||
FROM "asses.checks"
|
||||
WHERE p=$1
|
||||
`, p)
|
||||
}
|
||||
|
||||
func checked(ctx context.Context, p, cksum string) error {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.Exec(ctx, `
|
||||
INSERT INTO "asses.checks"
|
||||
(p, checked_at, cksum)
|
||||
VALUES ($1, $2, $3)
|
||||
|
||||
ON CONFLICT DO UPDATE
|
||||
SET checked_at=$2, cksum=$3
|
||||
WHERE p=$1
|
||||
`, p, time.Now(), cksum)
|
||||
}
|
||||
|
||||
func initDB(ctx context.Context) error {
|
||||
return db.InitializeSchema(ctx, "asses", []string{
|
||||
`CREATE TABLE "asses.executions" (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
executed_at TIMESTAMP NOT NULL
|
||||
)`,
|
||||
`CREATE TABLE "asses.checks" (
|
||||
p TEXT PRIMARY KEY NOT NULL,
|
||||
checked_at TIMESTAMP NOT NULL,
|
||||
cksum TEXT NOT NULL
|
||||
)`,
|
||||
})
|
||||
}
|
||||
|
||||
29
src/asses/db_integration_test.go
Normal file
29
src/asses/db_integration_test.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package asses_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"show-rss/src/asses"
|
||||
"show-rss/src/db"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNextRecord(t *testing.T) {
|
||||
ctx := db.Test(t, context.Background())
|
||||
|
||||
if v, err := asses.Next(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if zero := v.IsZero(); !zero {
|
||||
t.Fatal(v)
|
||||
}
|
||||
|
||||
if err := asses.Record(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, err := asses.Next(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if since := time.Since(v); since > time.Minute {
|
||||
t.Fatal(since)
|
||||
}
|
||||
}
|
||||
@@ -1,29 +1,29 @@
|
||||
package asses_test
|
||||
package asses
|
||||
|
||||
import (
|
||||
"context"
|
||||
"show-rss/src/asses"
|
||||
"show-rss/src/db"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNextRecord(t *testing.T) {
|
||||
func TestLast(t *testing.T) {
|
||||
ctx := db.Test(t, context.Background())
|
||||
|
||||
if v, err := asses.Next(ctx); err != nil {
|
||||
if last, err := checkLast(ctx, "p"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if zero := v.IsZero(); !zero {
|
||||
t.Fatal(v)
|
||||
} else if !last.T.IsZero() || last.Cksum != "" {
|
||||
t.Fatal(last)
|
||||
}
|
||||
|
||||
if err := asses.Record(ctx); err != nil {
|
||||
if err := checked(ctx, "p", "cksum"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := checked(ctx, "p", "cksum"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, err := asses.Next(ctx); err != nil {
|
||||
if last, err := checkLast(ctx, "p"); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if since := time.Since(v); since > time.Minute {
|
||||
t.Fatal(since)
|
||||
} else if last.T.IsZero() || last.Cksum != "cksum" {
|
||||
t.Fatal(last)
|
||||
}
|
||||
}
|
||||
|
||||
43
src/asses/one.go
Normal file
43
src/asses/one.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package asses
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
func One(ctx context.Context, p string) error {
|
||||
if last, err := checkLast(ctx, p); err != nil {
|
||||
return err
|
||||
} else if last.T.IsZero() {
|
||||
} else if cksum, err := cksum(ctx, p); err != nil {
|
||||
return err
|
||||
} else if cksum != last.Cksum {
|
||||
} else if time.Since(last.T) < 20+time.Duration(rand.Int()%10)*24*time.Hour {
|
||||
return nil
|
||||
}
|
||||
|
||||
return io.EOF
|
||||
|
||||
cksum, err := cksum(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return checked(ctx, p, cksum)
|
||||
}
|
||||
|
||||
func cksum(ctx context.Context, p string) (string, error) {
|
||||
f, err := os.Open(p)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
hasher := md5.New()
|
||||
_, err = io.Copy(hasher, f)
|
||||
return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err
|
||||
}
|
||||
@@ -2,38 +2,51 @@ package asses
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"show-rss/src/asses"
|
||||
"time"
|
||||
"show-rss/src/cron"
|
||||
)
|
||||
|
||||
var rootDs = []string{
|
||||
"/volume1/video/Bel/Anime",
|
||||
"/volume1/video/QT/TV",
|
||||
}
|
||||
|
||||
type CB func(context.Context, string) error
|
||||
|
||||
func Main(ctx context.Context) error {
|
||||
next, err := asses.Next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c := time.NewTicker(3 * time.Hour)
|
||||
defer c.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
case <-time.After(time.Until(next)):
|
||||
}
|
||||
|
||||
if err := One(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
next, err = asses.Next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
return cron.Cron(ctx, asses.Next, One)
|
||||
}
|
||||
|
||||
func One(ctx context.Context) error {
|
||||
return io.EOF
|
||||
return OneWith(ctx, rootDs, asses.One)
|
||||
}
|
||||
|
||||
func OneWith(ctx context.Context, rootds []string, cb CB) error {
|
||||
for _, rootd := range rootds {
|
||||
if err := one(ctx, rootd, cb); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func one(ctx context.Context, rootd string, cb CB) error {
|
||||
return filepath.WalkDir(rootd, func(p string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if path.Ext(p) != ".mkv" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return cb(ctx, p)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,8 +1,28 @@
|
||||
package asses_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path"
|
||||
"show-rss/src/cmd/asses"
|
||||
"show-rss/src/db"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOne(t *testing.T) {
|
||||
func TestOneWith(t *testing.T) {
|
||||
ctx := db.Test(t, context.Background())
|
||||
|
||||
d := t.TempDir()
|
||||
os.MkdirAll(path.Join(d, "a", "b", "c"), os.ModePerm)
|
||||
os.WriteFile(path.Join(d, "a", "f.mkv"), []byte{}, os.ModePerm)
|
||||
|
||||
if err := asses.OneWith(ctx, []string{d}, func(_ context.Context, p string) error {
|
||||
t.Logf("%q", p)
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"show-rss/src/cron"
|
||||
"show-rss/src/feeds"
|
||||
"show-rss/src/webhooks"
|
||||
"strings"
|
||||
@@ -16,19 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func Main(ctx context.Context) error {
|
||||
c := time.NewTicker(time.Minute)
|
||||
defer c.Stop()
|
||||
for {
|
||||
if err := One(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
return cron.Cron(ctx, feeds.Next, One)
|
||||
}
|
||||
|
||||
func One(ctx context.Context) error {
|
||||
|
||||
31
src/cron/cron.go
Normal file
31
src/cron/cron.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error {
|
||||
n, err := next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c := time.NewTicker(3 * time.Minute)
|
||||
defer c.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-c.C:
|
||||
n, err = next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case <-time.After(time.Until(n)):
|
||||
if err := do(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package feeds
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"show-rss/src/db"
|
||||
"show-rss/src/server"
|
||||
@@ -42,6 +41,24 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
func Next(ctx context.Context) (time.Time, error) {
|
||||
result := time.Now().Add(3 * time.Minute)
|
||||
err := ForEach(ctx, func(f Feed) error {
|
||||
next, err := f.Next()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if next.After(result) {
|
||||
return nil
|
||||
}
|
||||
|
||||
result = next
|
||||
return nil
|
||||
})
|
||||
return result, err
|
||||
}
|
||||
|
||||
func ForEach(ctx context.Context, cb func(Feed) error) error {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return err
|
||||
@@ -206,14 +223,6 @@ func Update(ctx context.Context, id string, url, cron, pattern, webhookMethod, w
|
||||
)
|
||||
}
|
||||
|
||||
func (feed Feed) Update(ctx context.Context, url, cron, pattern, tag *string) error {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
func (feed Feed) Delete(ctx context.Context) error {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
func getEntry(ctx context.Context, id string) (Entry, error) {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return Entry{}, err
|
||||
|
||||
@@ -31,7 +31,11 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
||||
if !feed.Entry.Deleted.IsZero() {
|
||||
return false, nil
|
||||
}
|
||||
next, err := feed.Next()
|
||||
return time.Now().After(next), err
|
||||
}
|
||||
|
||||
func (feed Feed) Next() (time.Time, error) {
|
||||
schedule, err := cron.NewParser(
|
||||
cron.SecondOptional |
|
||||
cron.Minute |
|
||||
@@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
||||
cron.Descriptor,
|
||||
).Parse(feed.Version.Cron)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
||||
return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
||||
}
|
||||
next := schedule.Next(feed.Execution.Executed)
|
||||
return time.Now().After(next), nil
|
||||
return schedule.Next(feed.Execution.Executed), nil
|
||||
}
|
||||
|
||||
func (feed Feed) Fetch(ctx context.Context) (Items, error) {
|
||||
|
||||
Reference in New Issue
Block a user