Compare commits
6 Commits
14e80ac2c3
...
bfbc2b6e7f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bfbc2b6e7f | ||
|
|
6b51a0c0a3 | ||
|
|
137fdf07ed | ||
|
|
64c4d1908a | ||
|
|
aad5959350 | ||
|
|
f7f44d6615 |
@@ -18,9 +18,9 @@ func Next(ctx context.Context) (time.Time, error) {
|
|||||||
}
|
}
|
||||||
result, err := db.QueryOne[Did](ctx, `
|
result, err := db.QueryOne[Did](ctx, `
|
||||||
SELECT executed_at AS "Did"
|
SELECT executed_at AS "Did"
|
||||||
FROM "asses.executions"
|
FROM "asses.executions"
|
||||||
ORDER BY executed_at DESC
|
ORDER BY executed_at DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`)
|
`)
|
||||||
return result.Did, err
|
return result.Did, err
|
||||||
}
|
}
|
||||||
@@ -33,11 +33,49 @@ func Record(ctx context.Context) error {
|
|||||||
return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now())
|
return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type last struct {
|
||||||
|
T time.Time `json:"checked_at"`
|
||||||
|
Cksum string `json:"cksum"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkLast(ctx context.Context, p string) (last, error) {
|
||||||
|
if err := initDB(ctx); err != nil {
|
||||||
|
return last{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.QueryOne[last](ctx, `
|
||||||
|
SELECT checked_at, cksum
|
||||||
|
FROM "asses.checks"
|
||||||
|
WHERE p=$1
|
||||||
|
`, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checked(ctx context.Context, p, cksum string) error {
|
||||||
|
if err := initDB(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.Exec(ctx, `
|
||||||
|
INSERT INTO "asses.checks"
|
||||||
|
(p, checked_at, cksum)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
|
||||||
|
ON CONFLICT DO UPDATE
|
||||||
|
SET checked_at=$2, cksum=$3
|
||||||
|
WHERE p=$1
|
||||||
|
`, p, time.Now(), cksum)
|
||||||
|
}
|
||||||
|
|
||||||
func initDB(ctx context.Context) error {
|
func initDB(ctx context.Context) error {
|
||||||
return db.InitializeSchema(ctx, "asses", []string{
|
return db.InitializeSchema(ctx, "asses", []string{
|
||||||
`CREATE TABLE "asses.executions" (
|
`CREATE TABLE "asses.executions" (
|
||||||
id TEXT PRIMARY KEY NOT NULL,
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
executed_at TIMESTAMP NOT NULL
|
executed_at TIMESTAMP NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
`CREATE TABLE "asses.checks" (
|
||||||
|
p TEXT PRIMARY KEY NOT NULL,
|
||||||
|
checked_at TIMESTAMP NOT NULL,
|
||||||
|
cksum TEXT NOT NULL
|
||||||
|
)`,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
29
src/asses/db_integration_test.go
Normal file
29
src/asses/db_integration_test.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package asses_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"show-rss/src/asses"
|
||||||
|
"show-rss/src/db"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNextRecord(t *testing.T) {
|
||||||
|
ctx := db.Test(t, context.Background())
|
||||||
|
|
||||||
|
if v, err := asses.Next(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if zero := v.IsZero(); !zero {
|
||||||
|
t.Fatal(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := asses.Record(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := asses.Next(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if since := time.Since(v); since > time.Minute {
|
||||||
|
t.Fatal(since)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,29 +1,29 @@
|
|||||||
package asses_test
|
package asses
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"show-rss/src/asses"
|
|
||||||
"show-rss/src/db"
|
"show-rss/src/db"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNextRecord(t *testing.T) {
|
func TestLast(t *testing.T) {
|
||||||
ctx := db.Test(t, context.Background())
|
ctx := db.Test(t, context.Background())
|
||||||
|
|
||||||
if v, err := asses.Next(ctx); err != nil {
|
if last, err := checkLast(ctx, "p"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if zero := v.IsZero(); !zero {
|
} else if !last.T.IsZero() || last.Cksum != "" {
|
||||||
t.Fatal(v)
|
t.Fatal(last)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := asses.Record(ctx); err != nil {
|
if err := checked(ctx, "p", "cksum"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if err := checked(ctx, "p", "cksum"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v, err := asses.Next(ctx); err != nil {
|
if last, err := checkLast(ctx, "p"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if since := time.Since(v); since > time.Minute {
|
} else if last.T.IsZero() || last.Cksum != "cksum" {
|
||||||
t.Fatal(since)
|
t.Fatal(last)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
43
src/asses/one.go
Normal file
43
src/asses/one.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package asses
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/base64"
|
||||||
|
"io"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func One(ctx context.Context, p string) error {
|
||||||
|
if last, err := checkLast(ctx, p); err != nil {
|
||||||
|
return err
|
||||||
|
} else if last.T.IsZero() {
|
||||||
|
} else if cksum, err := cksum(ctx, p); err != nil {
|
||||||
|
return err
|
||||||
|
} else if cksum != last.Cksum {
|
||||||
|
} else if time.Since(last.T) < 20+time.Duration(rand.Int()%10)*24*time.Hour {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.EOF
|
||||||
|
|
||||||
|
cksum, err := cksum(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return checked(ctx, p, cksum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func cksum(ctx context.Context, p string) (string, error) {
|
||||||
|
f, err := os.Open(p)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
hasher := md5.New()
|
||||||
|
_, err = io.Copy(hasher, f)
|
||||||
|
return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err
|
||||||
|
}
|
||||||
@@ -2,38 +2,51 @@ package asses
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io/fs"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"show-rss/src/asses"
|
"show-rss/src/asses"
|
||||||
"time"
|
"show-rss/src/cron"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var rootDs = []string{
|
||||||
|
"/volume1/video/Bel/Anime",
|
||||||
|
"/volume1/video/QT/TV",
|
||||||
|
}
|
||||||
|
|
||||||
|
type CB func(context.Context, string) error
|
||||||
|
|
||||||
func Main(ctx context.Context) error {
|
func Main(ctx context.Context) error {
|
||||||
next, err := asses.Next(ctx)
|
return cron.Cron(ctx, asses.Next, One)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c := time.NewTicker(3 * time.Hour)
|
|
||||||
defer c.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-c.C:
|
|
||||||
case <-time.After(time.Until(next)):
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := One(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
next, err = asses.Next(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func One(ctx context.Context) error {
|
func One(ctx context.Context) error {
|
||||||
return io.EOF
|
return OneWith(ctx, rootDs, asses.One)
|
||||||
|
}
|
||||||
|
|
||||||
|
func OneWith(ctx context.Context, rootds []string, cb CB) error {
|
||||||
|
for _, rootd := range rootds {
|
||||||
|
if err := one(ctx, rootd, cb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func one(ctx context.Context, rootd string, cb CB) error {
|
||||||
|
return filepath.WalkDir(rootd, func(p string, d fs.DirEntry, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if path.Ext(p) != ".mkv" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return cb(ctx, p)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,28 @@
|
|||||||
package asses_test
|
package asses_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"show-rss/src/cmd/asses"
|
||||||
|
"show-rss/src/db"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestOne(t *testing.T) {
|
func TestOneWith(t *testing.T) {
|
||||||
|
ctx := db.Test(t, context.Background())
|
||||||
|
|
||||||
|
d := t.TempDir()
|
||||||
|
os.MkdirAll(path.Join(d, "a", "b", "c"), os.ModePerm)
|
||||||
|
os.WriteFile(path.Join(d, "a", "f.mkv"), []byte{}, os.ModePerm)
|
||||||
|
|
||||||
|
if err := asses.OneWith(ctx, []string{d}, func(_ context.Context, p string) error {
|
||||||
|
t.Logf("%q", p)
|
||||||
|
if _, err := os.Stat(p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"show-rss/src/cron"
|
||||||
"show-rss/src/feeds"
|
"show-rss/src/feeds"
|
||||||
"show-rss/src/webhooks"
|
"show-rss/src/webhooks"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -16,19 +17,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Main(ctx context.Context) error {
|
func Main(ctx context.Context) error {
|
||||||
c := time.NewTicker(time.Minute)
|
return cron.Cron(ctx, feeds.Next, One)
|
||||||
defer c.Stop()
|
|
||||||
for {
|
|
||||||
if err := One(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-c.C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func One(ctx context.Context) error {
|
func One(ctx context.Context) error {
|
||||||
|
|||||||
31
src/cron/cron.go
Normal file
31
src/cron/cron.go
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
package cron
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error {
|
||||||
|
n, err := next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c := time.NewTicker(3 * time.Minute)
|
||||||
|
defer c.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-c.C:
|
||||||
|
n, err = next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case <-time.After(time.Until(n)):
|
||||||
|
if err := do(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
@@ -3,7 +3,6 @@ package feeds
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"show-rss/src/db"
|
"show-rss/src/db"
|
||||||
"show-rss/src/server"
|
"show-rss/src/server"
|
||||||
@@ -42,6 +41,24 @@ type (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func Next(ctx context.Context) (time.Time, error) {
|
||||||
|
result := time.Now().Add(3 * time.Minute)
|
||||||
|
err := ForEach(ctx, func(f Feed) error {
|
||||||
|
next, err := f.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if next.After(result) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result = next
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
func ForEach(ctx context.Context, cb func(Feed) error) error {
|
func ForEach(ctx context.Context, cb func(Feed) error) error {
|
||||||
if err := initDB(ctx); err != nil {
|
if err := initDB(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -206,14 +223,6 @@ func Update(ctx context.Context, id string, url, cron, pattern, webhookMethod, w
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (feed Feed) Update(ctx context.Context, url, cron, pattern, tag *string) error {
|
|
||||||
return io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
func (feed Feed) Delete(ctx context.Context) error {
|
|
||||||
return io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
func getEntry(ctx context.Context, id string) (Entry, error) {
|
func getEntry(ctx context.Context, id string) (Entry, error) {
|
||||||
if err := initDB(ctx); err != nil {
|
if err := initDB(ctx); err != nil {
|
||||||
return Entry{}, err
|
return Entry{}, err
|
||||||
|
|||||||
@@ -31,7 +31,11 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
|||||||
if !feed.Entry.Deleted.IsZero() {
|
if !feed.Entry.Deleted.IsZero() {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
next, err := feed.Next()
|
||||||
|
return time.Now().After(next), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (feed Feed) Next() (time.Time, error) {
|
||||||
schedule, err := cron.NewParser(
|
schedule, err := cron.NewParser(
|
||||||
cron.SecondOptional |
|
cron.SecondOptional |
|
||||||
cron.Minute |
|
cron.Minute |
|
||||||
@@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) {
|
|||||||
cron.Descriptor,
|
cron.Descriptor,
|
||||||
).Parse(feed.Version.Cron)
|
).Parse(feed.Version.Cron)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron)
|
||||||
}
|
}
|
||||||
next := schedule.Next(feed.Execution.Executed)
|
return schedule.Next(feed.Execution.Executed), nil
|
||||||
return time.Now().After(next), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (feed Feed) Fetch(ctx context.Context) (Items, error) {
|
func (feed Feed) Fetch(ctx context.Context) (Items, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user