Compare commits

...

77 Commits

Author SHA1 Message Date
bel
18ac13fd57 should not execute feed if version.url=="" 2025-12-10 08:40:28 -07:00
bel
375fc1000a feeds for each runs all and returns all errs 2025-12-10 08:38:59 -07:00
bel
47c7aa74d3 uncap retries on main 2025-12-10 08:23:54 -07:00
bel
a6aad2820d cap retries 2025-12-10 08:13:45 -07:00
bel
b26afcb325 log 2025-12-02 16:30:10 -07:00
bel
613bfdf96e transcode entrypoint 2025-12-02 16:28:55 -07:00
Bel LaPointe
81507319dd disable by default 2025-12-02 16:25:15 -07:00
bel
50ad3bb3db install_scratch.sh 2025-11-30 09:00:47 -07:00
bel
07992b6636 delete less accidentally clickable 2025-11-30 08:56:37 -07:00
bel
9583234df5 do not list deleted (url == "") 2025-11-30 08:54:28 -07:00
bel
2943362587 if POST /?delete then DELETE 2025-11-30 08:54:18 -07:00
bel
cbd4e32022 DELETE /v1/feeds/abc updates all fields to "" 2025-11-30 08:44:27 -07:00
bel
727b4fdea6 from testdata to public for .html 2025-11-30 08:40:15 -07:00
bel
fd7dcafd4e timeout ass to srt 2025-06-01 10:28:43 -06:00
Bel LaPointe
4fbc96b96f dont fail on err but log all 2025-06-01 10:27:00 -06:00
Bel LaPointe
0afb6535b6 impl -e=best-ass-to-srt 2025-06-01 09:55:01 -06:00
Bel LaPointe
10a40d4a54 nopanik choose best lang 2025-06-01 09:52:18 -06:00
Bel LaPointe
4bdfbd1f06 panik till fixed 2025-05-31 11:15:00 -06:00
Bel LaPointe
44bcc0ba2e bestasstosrt does not also remove all sub streams from mkv 2025-05-31 11:14:43 -06:00
Bel LaPointe
b17801060e refactor for fix the world 2025-05-31 11:06:59 -06:00
bel
99c1061a18 fix asses.Next and do asses.Record 2025-05-31 10:56:04 -06:00
bel
67840f6b28 asses does not fail on individual failure 2025-05-30 06:25:06 -06:00
bel
cb44644475 -e=deport-ass filename 2025-05-26 21:41:39 -06:00
bel
6626077201 log asses.Main new dir processing 2025-05-25 11:49:16 -06:00
bel
9c0129f968 if ffmpeg -i .name.ass fails, then rm .name.ass 2025-05-25 11:46:36 -06:00
Bel LaPointe
8efffd0fe4 no size="..." in .srt 2025-05-22 11:29:14 -06:00
bel
fe02d1624f fix now() InLocation(local), deadline per-one 2025-05-17 21:24:32 -06:00
bel
75b7e21bec asses.Next returns midnight if outside working hours 2025-05-17 21:17:26 -06:00
bel
be148f5de5 assing 12am-8am only 2025-05-17 21:10:44 -06:00
bel
a74f741298 deterministic rand for rescan trheshold per file 2025-05-17 21:05:19 -06:00
bel
fba3d635ea if optimistically assuming cksum unchanged, then do not get cksum, just update checked, modified 2025-05-17 20:57:41 -06:00
bel
8f18cbae3a fix small Read() big rate for slow.Reader 2025-05-17 20:53:15 -06:00
bel
d73b63f43c read fast 2025-05-17 20:41:23 -06:00
bel
f57560ebfc fix WEEKS to recheck and assume empty modified has happy cksum 2025-05-17 20:36:59 -06:00
bel
fc66d26c10 log if no check 2025-05-17 20:25:54 -06:00
bel
bd5ae006a1 rate limit asses.Cksum to 10MBps 2025-05-17 20:22:41 -06:00
bel
11b215d026 slow.Reader{} 2025-05-17 20:20:03 -06:00
bel
d2f0466aae only cksum if previously cksummed 2025-05-17 20:06:42 -06:00
bel
90887d3f11 asses add modtime column to skip cksumming for dupe work 2025-05-17 20:03:53 -06:00
bel
582e35b237 shorter log lines 2025-05-17 19:47:55 -06:00
bel
8aeab5ada5 more log 2025-05-17 19:44:56 -06:00
bel
7dd5af0681 asses log 2025-05-17 19:44:07 -06:00
bel
2897a55842 ffmpeg -y 2025-05-17 19:37:10 -06:00
Bel LaPointe
8b668af899 accept $NO_DEPORT 2025-05-08 16:58:34 -06:00
Bel LaPointe
1c7dafc78b asses.One(path) seems to deport ass neato 2025-05-08 16:56:58 -06:00
Bel LaPointe
2e8c8d3d39 stubmore 2025-05-08 16:02:02 -06:00
Bel LaPointe
5e6fd81921 io.eof ready 2025-05-08 15:58:05 -06:00
Bel LaPointe
b9036ed950 testdata mkv 2025-05-08 15:54:54 -06:00
Bel LaPointe
bfbc2b6e7f impl asses skips if cksum matches or a lotta time passes 2025-05-08 15:48:08 -06:00
Bel LaPointe
6b51a0c0a3 impl asses.checkLast(), .check() 2025-05-08 15:40:53 -06:00
Bel LaPointe
137fdf07ed stub cmd.asses 2025-05-08 15:30:51 -06:00
Bel LaPointe
64c4d1908a rm unused 2025-05-08 15:04:44 -06:00
Bel LaPointe
aad5959350 rm unused 2025-05-08 15:04:37 -06:00
Bel LaPointe
f7f44d6615 refactor out cronning 2025-05-08 15:04:04 -06:00
Bel LaPointe
14e80ac2c3 stub asses 2025-05-08 14:55:54 -06:00
Bel LaPointe
6259a4f179 from mutex to semaphore chan 2025-05-08 11:35:41 -06:00
Bel LaPointe
3ac7ae63b6 db locks rather than returning dbInUse errs 2025-05-08 11:31:37 -06:00
Bel LaPointe
786ea3ef8f cache css pls 2025-05-08 11:08:12 -06:00
bel
c3bf31894c default nyaa 2025-05-08 11:03:37 -06:00
bel
e8b6396760 autofill empty form 2025-05-08 11:01:24 -06:00
bel
5470576b10 cron logs 2025-05-07 22:32:36 -06:00
bel
8f0c62bd77 runs 2025-05-07 22:27:06 -06:00
bel
a0ddc7f25f embed fs 2025-05-07 22:15:01 -06:00
bel
e6f551913a / == /experimental/ui 2025-05-07 22:05:44 -06:00
Bel LaPointe
c7375949c2 cli can pass -p 10_000 2025-05-07 20:22:59 -06:00
Bel LaPointe
49064f1ea2 cli can pass -db /path/to/sql.db 2025-05-07 19:45:52 -06:00
Bel LaPointe
2f503497e9 update test for new /v1/feeds status code 2025-05-07 19:45:39 -06:00
bel
5acbd00ee0 src/cmd/* tests are failing 2025-05-05 23:07:51 -06:00
bel
b48e596853 /v1/feeds redirects to /experimental/ui 2025-05-05 23:06:40 -06:00
bel
d7f098bea0 ui can edit 2025-05-05 22:51:06 -06:00
bel
bd67eb0dfe impl feeds.Update 2025-05-05 22:41:37 -06:00
bel
7a94c74226 ui title links to home 2025-05-05 22:17:48 -06:00
bel
1eedf8fce8 someday 2025-05-05 22:14:57 -06:00
bel
bf23d6a9cf wish i could preview items but too lazy 2025-05-05 22:10:04 -06:00
bel
0271f84948 ui has multi-page edit vs new even tho they the same page but templates a lil diff 2025-05-05 21:33:31 -06:00
bel
0e3e6c54de fix tests vs running 2025-05-05 21:32:37 -06:00
bel
352eff2691 webhooks can be vpntor:///outdir 2025-05-05 21:26:19 -06:00
43 changed files with 1393 additions and 144 deletions

5
go.mod
View File

@@ -2,7 +2,10 @@ module show-rss
go 1.23.3
require modernc.org/sqlite v1.37.0
require (
golang.org/x/time v0.11.0
modernc.org/sqlite v1.37.0
)
require (
github.com/PuerkitoBio/goquery v1.8.0 // indirect

2
go.sum
View File

@@ -59,6 +59,8 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=

3
install_scratch.sh Normal file
View File

@@ -0,0 +1,3 @@
#! /usr/bin/env bash
CGO_ENABLED=1 CC=x86_64-linux-musl-gcc go install -ldflags="-linkmode external -extldflags '-static'"

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"os"
"os/signal"
"show-rss/src/cmd"
"syscall"
@@ -17,7 +18,7 @@ func Main(ctx context.Context) error {
ctx, can := signal.NotifyContext(ctx, syscall.SIGINT)
defer can()
if err := cmd.Main(ctx); err != nil && ctx.Err() == nil {
if err := cmd.Main(ctx, os.Args[1:]); err != nil && ctx.Err() == nil {
return err
}

View File

@@ -2,6 +2,8 @@ package main_test
import (
"context"
"os"
"path"
main "show-rss"
"show-rss/src/db"
"testing"
@@ -12,6 +14,8 @@ func TestMain(t *testing.T) {
ctx, can := context.WithTimeout(context.Background(), 2*time.Second)
defer can()
os.Args = []string{os.Args[0], "-db", path.Join(t.TempDir(), "db.db")}
if err := main.Main(db.Test(t, ctx)); err != nil && ctx.Err() == nil {
t.Fatal(err)
}

96
src/asses/db.go Normal file
View File

@@ -0,0 +1,96 @@
package asses
import (
"context"
"show-rss/src/db"
"time"
"github.com/google/uuid"
)
func Next(ctx context.Context) (time.Time, error) {
if err := initDB(ctx); err != nil {
return time.Time{}, err
}
if deadline := Deadline(); time.Since(deadline) > time.Minute {
return midnightLastNight().Add(24 * time.Hour), nil
}
type Did struct {
Did time.Time
}
result, err := db.QueryOne[Did](ctx, `
SELECT executed_at AS "Did"
FROM "asses.executions"
ORDER BY executed_at DESC
LIMIT 1
`)
return result.Did.Add(time.Hour), err
}
func Record(ctx context.Context) error {
if err := initDB(ctx); err != nil {
return err
}
return db.Exec(ctx, `INSERT INTO "asses.executions" (id, executed_at) VALUES ($1, $2)`, uuid.New().String(), time.Now())
}
type last struct {
T time.Time `json:"checked_at"`
Cksum string `json:"cksum"`
Modified time.Time `json:"modified"`
}
func checkLast(ctx context.Context, p string) (last, error) {
if err := initDB(ctx); err != nil {
return last{}, err
}
return db.QueryOne[last](ctx, `
SELECT checked_at, cksum, modified
FROM "asses.checks"
WHERE p=$1
`, p)
}
func checked(ctx context.Context, p, cksum string, modified time.Time) error {
if err := initDB(ctx); err != nil {
return err
}
if cksum != "" {
} else if err := db.Exec(ctx, `
UPDATE "asses.checks"
SET checked_at=$2, modified=$3
WHERE p=$1
`, p, time.Now(), modified); err == nil {
return nil
}
return db.Exec(ctx, `
INSERT INTO "asses.checks"
(p, checked_at, cksum, modified)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO UPDATE
SET checked_at=$2, cksum=$3, modified=$4
WHERE p=$1
`, p, time.Now(), cksum, modified)
}
func initDB(ctx context.Context) error {
return db.InitializeSchema(ctx, "asses", []string{
`CREATE TABLE "asses.executions" (
id TEXT PRIMARY KEY NOT NULL,
executed_at TIMESTAMP NOT NULL
)`,
`CREATE TABLE "asses.checks" (
p TEXT PRIMARY KEY NOT NULL,
checked_at TIMESTAMP NOT NULL,
cksum TEXT NOT NULL
)`,
`ALTER TABLE "asses.checks" ADD COLUMN "modified" TIMESTAMP`,
})
}

View File

@@ -0,0 +1,29 @@
package asses_test
import (
"context"
"show-rss/src/asses"
"show-rss/src/db"
"testing"
"time"
)
func TestNextRecord(t *testing.T) {
ctx := db.Test(t, context.Background())
if v, err := asses.Next(ctx); err != nil {
t.Fatal(err)
} else if zero := v.IsZero(); !zero && time.Now().Hour() < 8 {
t.Fatal(v)
}
if err := asses.Record(ctx); err != nil {
t.Fatal(err)
}
if v, err := asses.Next(ctx); err != nil {
t.Fatal(err)
} else if since := time.Since(v); since > time.Minute && time.Now().Hour() < 8 {
t.Fatal(since)
}
}

34
src/asses/db_test.go Normal file
View File

@@ -0,0 +1,34 @@
package asses
import (
"context"
"math"
"show-rss/src/db"
"testing"
"time"
)
func TestLast(t *testing.T) {
ctx := db.Test(t, context.Background())
if last, err := checkLast(ctx, "p"); err != nil {
t.Fatal(err)
} else if !last.T.IsZero() || last.Cksum != "" {
t.Fatal(last)
}
modtime := time.Now().Add(-5 * time.Minute)
if err := checked(ctx, "p", "cksum", modtime); err != nil {
t.Fatal(err)
} else if err := checked(ctx, "p", "cksum", modtime); err != nil {
t.Fatal(err)
}
if last, err := checkLast(ctx, "p"); err != nil {
t.Fatal(err)
} else if last.T.IsZero() || last.Cksum != "cksum" {
t.Fatal(last)
} else if math.Abs(float64(last.Modified.Sub(modtime))) > float64(time.Second) {
t.Fatalf("modified not uploaded: %v vs. %v (diff of %v)", last.Modified, modtime, last.Modified.Sub(modtime))
}
}

15
src/asses/deadline.go Normal file
View File

@@ -0,0 +1,15 @@
package asses
import "time"
func Deadline() time.Time {
return midnightLastNight().Add(8 * time.Hour) // midnight-8AM
}
func midnightLastNight() time.Time {
t, err := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local)
if err != nil {
panic(err)
}
return t
}

247
src/asses/deport.go Normal file
View File

@@ -0,0 +1,247 @@
package asses
import (
"bytes"
"context"
"fmt"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"slices"
"strings"
"time"
)
func Entrypoint(ctx context.Context, p string) error {
return deport(ctx, p)
}
func deport(ctx context.Context, p string) error {
if os.Getenv("NO_DEPORT") != "" {
log.Printf("would deport %s", p)
return nil
}
assStreams, err := assStreams(ctx, p)
if err != nil {
return err
}
assStreamIDs := make([]string, len(assStreams))
for i, stream := range assStreams {
assStreamIDs[i] = stream.id
assF := path.Join(
path.Dir(p),
fmt.Sprintf(
".%s.%s.%s.ass",
path.Base(p),
stream.id,
stream.title,
),
)
if err := ffmpeg(ctx, "-y", "-i", p, "-map", stream.id, assF); err != nil {
return fmt.Errorf("failed to pull %s from %s: %w", stream.id, p, err)
}
}
if err := BestAssToSRT(ctx, p); err != nil {
return err
}
base := path.Base(p)
withoutExt := strings.TrimSuffix(base, path.Ext(base))
p2 := path.Join(path.Dir(p), fmt.Sprintf("%s.subless.mkv", withoutExt))
args := []string{
"-i", p,
"-map", "0",
}
for _, assStream := range assStreams {
args = append(args, "-map", "-"+assStream.id)
}
args = append(args,
"-c", "copy",
p2,
)
if err := ffmpeg(ctx, args...); err != nil {
return err
} else if err := os.Rename(p2, p); err != nil {
return err
}
return nil
}
type stream struct {
id string
title string
}
func assStreams(ctx context.Context, p string) ([]stream, error) {
output, err := ffprobe(ctx, "-i", p)
if err != nil {
return nil, err
}
result := []stream{}
for _, line := range strings.Split(output, "\n") {
fields := strings.Fields(line)
if len(fields) < 3 {
continue
} else if fields[0] != "Stream" {
continue
} else if !strings.Contains(fields[1], "(") {
continue
} else if fields[2] != "Subtitle:" {
continue
} else if fields[3] != "ass" {
continue
}
field1 := fields[1]
id := strings.Trim(strings.Split(field1, "(")[0], "#")
title := strings.Trim(strings.Split(field1, "(")[1], "):")
result = append(result, stream{
id: id,
title: title,
})
}
return result, nil
}
func ffprobe(ctx context.Context, args ...string) (string, error) {
return execc(ctx, "ffprobe", args...)
}
func ffmpeg(ctx context.Context, args ...string) error {
std, err := execc(ctx, "ffmpeg", args...)
if err != nil {
return fmt.Errorf("(%w) %s", err, std)
}
return nil
}
func execc(ctx context.Context, bin string, args ...string) (string, error) {
stdout := bytes.NewBuffer(nil)
cmd := exec.CommandContext(ctx, bin, args...)
cmd.Stdin = nil
cmd.Stderr = stdout
cmd.Stdout = stdout
err := cmd.Run()
return string(stdout.Bytes()), err
}
func BestAssToSRT(ctx context.Context, p string) error {
asses, err := filepath.Glob(path.Join(
path.Dir(p),
fmt.Sprintf(".%s.*.ass", path.Base(p)),
))
if err != nil {
return err
}
srts := []string{}
for _, ass := range asses {
srt, err := assToSRT(ctx, ass)
if err != nil {
return err
}
srts = append(srts, srt)
}
srts = SRTsByGoodness(srts)
for i := range srts {
if i == 0 {
base := path.Base(p)
withoutExt := strings.TrimSuffix(base, path.Ext(base))
srt := path.Join(path.Dir(p), fmt.Sprintf("%s.srt", withoutExt))
if err := os.Rename(srts[i], srt); err != nil {
return err
}
} else {
os.Remove(srts[i])
}
}
return nil
}
func assToSRT(ctx context.Context, ass string) (string, error) {
ctx, can := context.WithTimeout(ctx, 30*time.Second)
defer can()
srt := fmt.Sprintf("%s.srt", strings.TrimSuffix(ass, ".ass"))
if _, err := os.Stat(srt); err == nil {
return srt, nil
}
if err := ffmpeg(ctx, "-y", "-i", ass, srt); err != nil {
if ctx.Err() == nil {
log.Printf("ffmpeg failed to process %s; removing", ass)
os.Remove(ass)
}
return srt, err
}
b, err := os.ReadFile(srt)
if err != nil {
return srt, err
}
before := len(b)
b = regexp.MustCompile(`size="[^"]*"`).ReplaceAll(b, []byte{})
if after := len(b); before == after {
} else if err := os.WriteFile(srt, b, os.ModePerm); err != nil {
return srt, err
}
return srt, nil
}
func SRTsByGoodness(srts []string) []string {
skippers := []*regexp.Regexp{
regexp.MustCompile(`(?i)lat.*amer`),
regexp.MustCompile(`(?i)signs`),
regexp.MustCompile(`(?i)rus`),
regexp.MustCompile(`(?i)por`),
regexp.MustCompile(`(?i)ita`),
regexp.MustCompile(`(?i)fre`),
regexp.MustCompile(`(?i)spa`),
regexp.MustCompile(`(?i)ger`),
regexp.MustCompile(`(?i)ara`),
regexp.MustCompile(`(?i)jpn`),
regexp.MustCompile(`(?i)urop`),
regexp.MustCompile(`(?i)razil`),
regexp.MustCompile(`(?i)Deu`),
regexp.MustCompile(`(?i)ara`),
}
keepers := []*regexp.Regexp{
regexp.MustCompile(`(?i)^eng$`),
}
srts = slices.Clone(srts)
slices.SortFunc(srts, func(a, b string) int {
a = strings.ToLower(a)
b = strings.ToLower(b)
for _, skipper := range skippers {
if skipper.MatchString(b) {
return -1
} else if skipper.MatchString(a) {
return 1
}
}
for _, keeper := range keepers {
if keeper.MatchString(a) {
return -1
} else if keeper.MatchString(b) {
return 1
}
}
return strings.Compare(a, b)
})
return srts
}

47
src/asses/deport_test.go Normal file
View File

@@ -0,0 +1,47 @@
package asses_test
import (
"show-rss/src/asses"
"testing"
)
func TestSRTsByGoodness(t *testing.T) {
cases := map[string]struct {
given []string
want string
}{
"eng": {
given: []string{"a", "eng"},
want: "eng",
},
"eng nocap": {
given: []string{"A", "eng"},
want: "eng",
},
".Apothecary_Diaries_S02E19.mkv.0:9.ita.ass": {
given: []string{
".Apothecary_Diaries_S02E19.mkv.0:10.rus.srt",
".Apothecary_Diaries_S02E19.mkv.0:2.eng.srt",
".Apothecary_Diaries_S02E19.mkv.0:3.por.srt",
".Apothecary_Diaries_S02E19.mkv.0:4.spa.srt",
".Apothecary_Diaries_S02E19.mkv.0:5.spa.srt",
".Apothecary_Diaries_S02E19.mkv.0:6.ara.srt",
".Apothecary_Diaries_S02E19.mkv.0:7.fre.srt",
".Apothecary_Diaries_S02E19.mkv.0:8.ger.srt",
".Apothecary_Diaries_S02E19.mkv.0:9.ita.srt",
},
want: ".Apothecary_Diaries_S02E19.mkv.0:2.eng.srt",
},
}
for name, d := range cases {
name := name
c := d
t.Run(name, func(t *testing.T) {
got := asses.SRTsByGoodness(c.given)
if got[0] != c.want {
t.Errorf("expected %s but got %s (%+v)", c.want, got[0], got)
}
})
}
}

125
src/asses/one.go Normal file
View File

@@ -0,0 +1,125 @@
package asses
import (
"context"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"log"
"math/rand"
"os"
"path"
"show-rss/src/slow"
"strconv"
"time"
"golang.org/x/time/rate"
)
var EnvCksumBPS = func() int {
s := os.Getenv("CKSUM_BPS")
if s == "" {
return 50_000_000
}
n, err := strconv.Atoi(s)
if err != nil || n < 1 {
panic(err)
}
return n
}()
func One(ctx context.Context, p string) error {
shortp := path.Join("...", path.Base(path.Dir(p)), path.Base(p))
last, err := checkLast(ctx, p)
if err != nil {
return err
}
threshold := 20 + rand.New(rand.NewSource(func() int64 {
b := md5.New().Sum([]byte(p))
var sum int64
for _, c := range b {
sum += int64(c)
sum *= int64(c)
}
return sum
}())).Int()%10
if daysSince := int(time.Since(last.T).Hours() / 24); daysSince > threshold {
log.Printf("asses.One(%s) // no modified check as %vd since last check", shortp, daysSince)
} else if stat, err := os.Stat(p); err != nil {
return fmt.Errorf("cannot stat %s: %w", p, err)
} else if stat.ModTime() == last.Modified {
//log.Printf("asses.One(%s) // unmodified since %v", shortp, last.T)
return nil
} else {
log.Printf("asses.One(%s) // modified (%v) is now %v", shortp, last.Modified, stat.ModTime())
}
doCksum := true
if err := func() error {
if len(last.Cksum) > 0 {
if last.Modified.IsZero() {
doCksum = false
log.Printf("asses.One(%s) // assume cksum unchanged given null modified ", shortp)
return nil
}
cksum, err := Cksum(ctx, p)
if err != nil {
return err
}
if cksum == last.Cksum {
log.Printf("asses.One(%s) // cksum unchanged since %v", shortp, last.T)
return nil
}
}
log.Printf("asses.deport(%s)...", shortp)
if err := deport(ctx, p); err != nil {
return err
}
log.Printf("asses.transcode(%s)...", shortp)
if err := transcode(ctx, p); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
var cksum string
if doCksum {
var err error
cksum, err = Cksum(ctx, p)
if err != nil {
return err
}
}
stat, err := os.Stat(p)
if err != nil {
return err
}
if err := checked(ctx, p, cksum, stat.ModTime()); err != nil {
log.Printf("failed to mark %s checked: %v", shortp, err)
return err
}
return nil
}
func Cksum(ctx context.Context, p string) (string, error) {
f, err := os.Open(p)
if err != nil {
return "", err
}
defer f.Close()
hasher := md5.New()
_, err = io.Copy(hasher, slow.NewReader(ctx, rate.Limit(EnvCksumBPS), f))
return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err
}

44
src/asses/one_test.go Normal file
View File

@@ -0,0 +1,44 @@
package asses_test
import (
"context"
"os"
"path"
"show-rss/src/asses"
"show-rss/src/db"
"testing"
)
func TestOne(t *testing.T) {
ctx := db.Test(t, context.Background())
os.Setenv("DO_TRANSCODE", "true")
d := t.TempDir()
b, _ := os.ReadFile(path.Join("testdata", "survivor_au_S11E12.smoller.mkv"))
p := path.Join(d, "f.mkv")
os.WriteFile(p, b, os.ModePerm)
t.Logf("initial cksum...")
cksum, _ := asses.Cksum(context.Background(), p)
t.Logf("one && one...")
if err := asses.One(ctx, p); err != nil {
t.Fatal(err)
} else if err := asses.One(ctx, p); err != nil {
t.Fatal(err)
}
t.Logf("test -f...")
if _, err := os.Stat(p); err != nil {
t.Fatalf("lost original mkv: %v", err)
} else if _, err := os.Stat(path.Join(d, "f.srt")); err != nil {
t.Fatalf("no new srt: %v", err)
}
t.Logf("final cksum...")
newCksum, _ := asses.Cksum(context.Background(), p)
if cksum == newCksum {
t.Fatalf("cksum unchanged")
}
}

Binary file not shown.

64
src/asses/transcode.go Normal file
View File

@@ -0,0 +1,64 @@
package asses
import (
"context"
"fmt"
"log"
"os"
"path"
"slices"
"strings"
)
func EntrypointTranscode(ctx context.Context, p string) error {
return transcode(ctx, p)
}
func transcode(ctx context.Context, p string) error {
if os.Getenv("NO_TRANSCODE") != "" || os.Getenv("DO_TRANSCODE") == "" {
log.Printf("would transcode %s but $NO_TRANSCODE=x or $DO_TRANSCODE=", p)
return nil
}
output, err := ffprobe(ctx, "-i", p)
if err != nil {
return err
}
h264 := slices.ContainsFunc(strings.Split(output, "\n"), func(line string) bool {
return strings.Contains(line, "tream #") && strings.Contains(line, "Video: ") && strings.Contains(line, "h264")
})
aac := slices.ContainsFunc(strings.Split(output, "\n"), func(line string) bool {
return strings.Contains(line, "tream #") && strings.Contains(line, "Audio: ") && strings.Contains(line, "aac")
})
if h264 && aac {
return nil
}
p2 := p + ".en" + path.Ext(p)
if err := ffmpeg(ctx, "-y",
"-i", p,
"-vcodec", "libx264",
"-acodec", "aac",
p2,
); err != nil {
return err
}
output2, err := ffprobe(ctx, "-i", p)
if err != nil {
return err
}
df := func(line string) bool {
return !strings.Contains(line, "tream #")
}
originalStreams := slices.DeleteFunc(strings.Split(output, "\n"), df)
newStreams := slices.DeleteFunc(strings.Split(output2, "\n"), df)
if len(originalStreams) != len(newStreams) {
return fmt.Errorf("stream count changed from transcode")
}
return os.Rename(p2, p)
}

70
src/cmd/asses/main.go Normal file
View File

@@ -0,0 +1,70 @@
package asses
import (
"context"
"io/fs"
"log"
"path"
"path/filepath"
"show-rss/src/asses"
"show-rss/src/cron"
)
var rootDs = []string{
"/volume1/video/Bel/Anime",
"/volume1/video/QT/TV",
}
type CB func(context.Context, string) error
func Main(ctx context.Context) error {
return cron.Cron(ctx, asses.Next, One)
}
func One(ctx context.Context) error {
ctx, can := context.WithDeadline(ctx, asses.Deadline())
defer can()
lastD := ""
if err := OneWith(ctx, rootDs, func(ctx context.Context, p string) error {
if d := path.Dir(p); d != lastD {
log.Printf("asses.One(%s/...)...", d)
lastD = d
}
if err := asses.One(ctx, p); err != nil {
log.Printf("asses.One(.../%s/%s)...: err: %v", path.Base(path.Dir(p)), path.Base(p), err)
}
return nil
}); err != nil {
return err
}
return asses.Record(ctx)
}
func OneWith(ctx context.Context, rootds []string, cb CB) error {
for _, rootd := range rootds {
if err := one(ctx, rootd, cb); err != nil {
return err
}
}
return nil
}
func one(ctx context.Context, rootd string, cb CB) error {
return filepath.WalkDir(rootd, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if path.Ext(p) != ".mkv" {
return nil
}
return cb(ctx, p)
})
}

View File

@@ -0,0 +1,28 @@
package asses_test
import (
"context"
"os"
"path"
"show-rss/src/cmd/asses"
"show-rss/src/db"
"testing"
)
func TestOneWith(t *testing.T) {
ctx := db.Test(t, context.Background())
d := t.TempDir()
os.MkdirAll(path.Join(d, "a", "b", "c"), os.ModePerm)
os.WriteFile(path.Join(d, "a", "f.mkv"), []byte{}, os.ModePerm)
if err := asses.OneWith(ctx, []string{d}, func(_ context.Context, p string) error {
t.Logf("%q", p)
if _, err := os.Stat(p); err != nil {
return err
}
return nil
}); err != nil {
t.Fatal(err)
}
}

View File

@@ -2,17 +2,47 @@ package cmd
import (
"context"
"flag"
"os"
"show-rss/src/cleanup"
"show-rss/src/db"
"show-rss/src/server"
)
func Config(ctx context.Context) (context.Context, func(), error) {
ctx, err := db.Inject(ctx, "/tmp/f.db")
type Flags struct {
DB string
Port int
Entrypoint Entrypoint
Pos []string
}
func NewFlags(args []string) (Flags, error) {
var result Flags
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
fs.StringVar(&result.DB, "db", "/tmp/f.db", "path to sqlite.db")
fs.IntVar(&result.Port, "p", 10000, "port for http")
fs.Var(&result.Entrypoint, "e", "entrypoint")
err := fs.Parse(args)
result.Pos = fs.Args()
return result, err
}
func Config(ctx context.Context, args []string) (context.Context, func(), Flags, error) {
flags, err := NewFlags(args)
if err != nil {
return ctx, nil, err
return ctx, nil, flags, err
}
ctx, err = db.Inject(ctx, flags.DB)
if err != nil {
return ctx, nil, flags, err
}
ctx = server.Inject(ctx, flags.Port)
return ctx, func() {
cleanup.Extract(ctx)()
}, nil
}, flags, nil
}

View File

@@ -1,12 +1,14 @@
package cron
package fetch
import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"
"show-rss/src/cron"
"show-rss/src/feeds"
"show-rss/src/webhooks"
"strings"
@@ -15,19 +17,7 @@ import (
)
func Main(ctx context.Context) error {
c := time.NewTicker(time.Minute)
defer c.Stop()
for {
if err := One(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
case <-c.C:
}
}
return ctx.Err()
return cron.Cron(ctx, feeds.Next, One)
}
func One(ctx context.Context) error {
@@ -45,11 +35,13 @@ func one(ctx context.Context, feed feeds.Feed) error {
} else if !should {
return nil
}
log.Printf("fetching %s", feed.Version.URL)
items, err := feed.Fetch(ctx)
if err != nil {
return err
}
log.Printf("fetched feed %s: %d items", feed.Version.URL, len(items))
for _, item := range items {
if err := oneItem(ctx, feed, item); err != nil {
@@ -70,17 +62,19 @@ func oneItem(ctx context.Context, feed feeds.Feed, item feeds.Item) error {
Item: item,
}
method, err := render(feed.Version.WebhookMethod, arg)
wmethod, wurl, wbody := feed.Webhook(ctx)
method, err := render(wmethod, arg)
if err != nil {
return err
}
wurl, err := render(feed.Version.WebhookURL, arg)
wurl, err = render(wurl, arg)
if err != nil {
return err
}
body, err := render(feed.Version.WebhookBody, arg)
body, err := render(wbody, arg)
if err != nil {
return err
}
@@ -95,6 +89,7 @@ func oneItem(ctx context.Context, feed feeds.Feed, item feeds.Item) error {
if did, err := webhooks.Did(ctx, method, wurl, body); err != nil || did {
return err
}
log.Printf("webhooking %s %s: %s", method, wurl, body)
req, err := http.NewRequest(method, wurl, strings.NewReader(body))
if err != nil {

View File

@@ -1,4 +1,4 @@
package cron_test
package fetch_test
import (
"bytes"
@@ -7,7 +7,7 @@ import (
"io"
"net/http"
"os"
"show-rss/src/cmd/cron"
"show-rss/src/cmd/fetch"
"show-rss/src/db"
"show-rss/src/feeds"
"strconv"
@@ -29,9 +29,9 @@ func TestOne(t *testing.T) {
},
"feeds": func(t *testing.T) context.Context {
gets := []string{}
sURL := "http://localhost:10000/"
sURL := "http://localhost:10001/"
s := &http.Server{
Addr: ":10000",
Addr: ":10001",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gets = append(gets, r.URL.String())
rb, _ := io.ReadAll(r.Body)
@@ -43,6 +43,7 @@ func TestOne(t *testing.T) {
case "10":
case "11":
default:
t.Logf("%s => 404", r.URL)
http.NotFound(w, r)
}
@@ -84,7 +85,7 @@ func TestOne(t *testing.T) {
ctx := aCtx(t)
for i := 0; i < 2; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
if err := cron.One(ctx); err != nil && ctx.Err() == nil {
if err := fetch.One(ctx); err != nil && ctx.Err() == nil {
t.Fatalf("failed %d: %v", i, err)
}
})
@@ -95,7 +96,7 @@ func TestOne(t *testing.T) {
for i := 0; i < 2; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
ctx := aCtx(t)
if err := cron.One(ctx); err != nil && ctx.Err() == nil {
if err := fetch.One(ctx); err != nil && ctx.Err() == nil {
t.Fatalf("failed %d: %v", i, err)
}
})

View File

@@ -4,25 +4,31 @@ import (
"context"
"fmt"
"log"
"show-rss/src/cmd/cron"
inass "show-rss/src/asses"
"show-rss/src/cmd/asses"
"show-rss/src/cmd/fetch"
"show-rss/src/cmd/server"
"show-rss/src/pool"
"strings"
"time"
)
func Main(ctx context.Context) error {
func Main(ctx context.Context, args []string) error {
ctx, can := context.WithCancel(ctx)
defer can()
ctx, can, err := Config(ctx)
ctx, can, flags, err := Config(ctx, args)
if err != nil {
return fmt.Errorf("failed to inject: %w", err)
}
defer can()
switch flags.Entrypoint {
case Defacto:
foos := map[string]func(context.Context) error{
"server": server.Main,
"cron": cron.Main,
"fetch": fetch.Main,
"asses": asses.Main,
}
p := pool.New(len(foos))
defer p.Wait(ctx)
@@ -34,6 +40,82 @@ func Main(ctx context.Context) error {
}
return p.Wait(ctx)
case DeportAss:
for _, pos := range flags.Pos {
if err := inass.Entrypoint(ctx, pos); err != nil {
return err
}
}
return nil
case BestAssToSRT:
errs := []string{}
for _, pos := range flags.Pos {
if err := inass.BestAssToSRT(ctx, pos); err != nil {
err = fmt.Errorf("[%s] %w", pos, err)
log.Println(err)
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf("errors: %+v", errs)
}
return nil
case Transcode:
for _, pos := range flags.Pos {
log.Printf("transcoding %q...", pos)
if err := inass.EntrypointTranscode(ctx, pos); err != nil {
return err
}
}
return nil
default:
panic(flags.Entrypoint.String())
}
return nil
}
type Entrypoint int
const (
Defacto Entrypoint = iota
DeportAss
BestAssToSRT
Transcode
)
func (e *Entrypoint) Set(s string) error {
switch s {
case Defacto.String():
*e = Defacto
case DeportAss.String():
*e = DeportAss
case BestAssToSRT.String():
*e = BestAssToSRT
case Transcode.String():
*e = Transcode
default:
return fmt.Errorf("%s nin (%s)", s, strings.Join([]string{
Defacto.String(),
DeportAss.String(),
BestAssToSRT.String(),
Transcode.String(),
}, ", "))
}
return nil
}
func (e Entrypoint) String() string {
switch e {
case Defacto:
return ""
case DeportAss:
return "deport-ass"
case BestAssToSRT:
return "best-ass-to-srt"
case Transcode:
return "transcode"
}
panic("cannot serialize entrypoint")
}
func runner(ctx context.Context, k string, foo func(context.Context) error) func() error {
@@ -50,6 +132,5 @@ func runner(ctx context.Context, k string, foo func(context.Context) error) func
case <-time.After(time.Second):
}
}
return err
}
}

View File

@@ -10,32 +10,60 @@ import (
func (h Handler) feeds(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case http.MethodPost:
case http.MethodDelete:
if err := r.ParseForm(); err != nil {
return err
}
return h.feedsPost(r.Context(), r.Form)
if err := h.feedsDelete(r.Context(), r.URL.Query().Get("id")); err != nil {
return err
}
case http.MethodPost, http.MethodPut:
if err := r.ParseForm(); err != nil {
return err
}
if r.URL.Query().Has("delete") {
if err := h.feedsDelete(r.Context(), r.URL.Query().Get("id")); err != nil {
return err
}
} else {
if err := h.feedsUpsert(r.Context(), r.URL.Query().Get("id"), r.Form); err != nil {
return err
}
}
default:
http.NotFound(w, r)
return nil
}
u2 := *r.URL
u2.RawQuery = ""
u2.Path = "/"
http.Redirect(w, r, u2.String(), http.StatusSeeOther)
return nil
}
func (h Handler) feedsPost(ctx context.Context, form url.Values) error {
func (h Handler) feedsDelete(ctx context.Context, id string) error {
return feeds.Delete(ctx, id)
}
func (h Handler) feedsUpsert(ctx context.Context, id string, form url.Values) error {
var req feeds.Version
for k, v := range map[string]*string{
"URL": &req.URL,
"Cron": &req.Cron,
"Pattern": &req.Pattern,
"URL": &req.URL,
"WebhookBody": &req.WebhookBody,
"WebhookMethod": &req.WebhookMethod,
"WebhookURL": &req.WebhookURL,
"WebhookBody": &req.WebhookBody,
} {
if *v = form.Get(k); *v == "" {
return fmt.Errorf("no ?%s in %s", k, form.Encode())
}
}
if id == "" {
_, err := feeds.Insert(ctx, req.URL, req.Cron, req.Pattern, req.WebhookMethod, req.WebhookURL, req.WebhookBody)
return err
}
return feeds.Update(ctx, id, req.URL, req.Cron, req.Pattern, req.WebhookMethod, req.WebhookURL, req.WebhookBody)
}

View File

@@ -29,7 +29,7 @@ func TestFeeds(t *testing.T) {
r = r.WithContext(ctx)
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
h.ServeHTTP(w, r)
if w.Code != http.StatusOK {
if w.Code != http.StatusSeeOther {
t.Errorf("(%d) %s", w.Code, w.Body.Bytes())
}
found := false

View File

@@ -28,7 +28,7 @@ func (h Handler) serveHTTP(w http.ResponseWriter, r *http.Request) error {
return h.vpntor(r.Context(), r.Body)
} else if strings.HasPrefix(r.URL.Path, "/v1/feeds") {
return h.feeds(w, r)
} else if strings.HasPrefix(r.URL.Path, "/experimental/ui") {
} else if strings.HasPrefix(r.URL.Path, "/experimental/ui") || r.URL.Path == "/" {
return h.ui(w, r)
} else if strings.HasPrefix(r.URL.Path, "/experimental/echo") {
b, _ := io.ReadAll(r.Body)

View File

@@ -0,0 +1,56 @@
<html>
<header>
<link rel="stylesheet" href="/experimental/ui/dark.css">
</header>
<body>
<h2><a href="?">Feeds</a></h2>
<div>
{{ range feeds }}
<div>
<h3><code><a href="?edit={{.Entry.ID}}">{{ .Version.URL }}</a></code></h3>
<div>@<code>{{ .Version.Cron }}</code> ~<code>{{ .Version.Pattern }}</code></div>
<div><code>{{ .Version.WebhookMethod }} {{ .Version.WebhookURL }} | {{ .Version.WebhookBody }}</code></div>
<div>(last run {{ ago .Execution.Executed }} ago)</div>
</div>
{{ end }}
</div>
<br><hr><br>
<div>
<h3>
{{ if eq "" .editing.ID }}
New
{{ else }}
Updating <code><a target="_blank" href="{{ .editing_url }}">{{ .editing.URL }}</a></code> (<a href="?">clear</a>)
<br>
<div style="scale: 0.85">
<form method="POST" action="/v1/feeds?id={{ .editing.ID }}&delete">
<button type="submit">DELETE</button>
</form>
</div>
{{ end }}
</h3>
<form method="POST" action="/v1/feeds?id={{ .editing.ID }}">
{{ range $k, $v := .editing }}
{{ if not (in $k "Created" "Deleted" "Updated" "ID") }}
<div>
<label for="{{ $k }}">
{{ $k }}
{{- if eq $k "URL" }}
(hint: nyaa://?q=show)
{{ else if eq $k "WebhookURL" }}
(hint: vpntor:///outdir)
{{ end }}
</label>
<input name="{{ $k }}" type="text" value="{{ $v }}"/>
</div>
{{ end }}
{{ end }}
<button type="submit">Submit</button>
<code>Preview someday</code>
</form>
</div>
</body>
</html>

View File

@@ -1,41 +0,0 @@
<html>
<header>
<link rel="stylesheet" href="/experimental/ui/dark.css">
<script>
fill(elem) {
}
</script>
</header>
<body>
<h2>Feeds</h2>
<div>
{{ range feeds }}
<div>
<h4>(<button onclick="fill(this)">{{ namespan "entry.id" .Entry.ID}}</button>) {{ namespan "version.url" .Version.URL }}</h4>
<div>{{ .Version.Created }} (last {{ .Execution.Executed }})</div>
<div>@{{ .Version.Cron }} ~"{{ .Version.Pattern }}"</div>
<div>{{ .Version.WebhookMethod }} {{ .Version.WebhookURL }} | {{ .Version.WebhookBody }}</div>
</div>
{{ end }}
</div>
<br>
<div>
<h3>New</h3>
<form method="POST" action="/v1/feeds">
{{ range feedsVersionFields }}
{{ if ne . "Created" }}
<div>
<label for="{{ . }}">{{ . }}</label>
<input name="{{ . }}" type="text" />
</div>
{{ end }}
{{ end }}
<button type="submit">Submit</button>
</form>
</div>
</body>
</html>

View File

@@ -3,43 +3,80 @@ package handler
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"os"
"path"
"show-rss/src/feeds"
"slices"
"strings"
"text/template"
"time"
"embed"
_ "embed"
)
//go:embed public/index.tmpl
var embeddedIndexTMPL string
//go:embed public/*
var embeddedDir embed.FS
var dir = func() string {
if v := os.Getenv("UI_D"); v != "" {
return v
}
return "./src/cmd/server/handler/testdata"
return "./src/cmd/server/handler/public"
}()
func (h Handler) ui(w http.ResponseWriter, r *http.Request) error {
fs := http.FileServer(http.Dir(dir))
if path.Base(r.URL.Path) == "ui" {
if path.Base(r.URL.Path) == "ui" || r.URL.Path == "/" {
return h.uiIndex(w, r)
}
http.StripPrefix("/experimental/ui", fs).ServeHTTP(w, r)
w.Header().Set("Cache-Control", "max-age=2592000")
fs := http.FileServer(http.FS(embeddedDir))
r.URL.Path = fmt.Sprintf("/public/%s", strings.TrimPrefix(r.URL.Path, "/experimental/ui"))
fs.ServeHTTP(w, r)
return nil
}
func (h Handler) uiIndex(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
var editing struct {
feeds.Entry `json:",inline"`
feeds.Version `json:",inline"`
}
editing.Version.Cron = fmt.Sprintf("%d */12 * * *", rand.Int()%60)
editing.Version.URL = "nyaa://?q=SOME_SHOW dual web-dl"
editing.Version.Pattern = ".*"
editing.Version.WebhookBody = "{{ .Item.Link }}"
editing.Version.WebhookMethod = "POST"
editing.Version.WebhookURL = "vpntor:///data/completed-rss/TITLE"
all := []feeds.Feed{}
if err := feeds.ForEach(ctx, func(f feeds.Feed) error {
if deleted := f.Version.URL == ""; !deleted {
all = append(all, f)
}
if f.Entry.ID == r.URL.Query().Get("edit") {
editing.Entry = f.Entry
editing.Version = f.Version
}
return ctx.Err()
}); err != nil {
return err
}
b, _ := os.ReadFile(path.Join(dir, "index.tmpl"))
if len(b) == 0 {
b = []byte(embeddedIndexTMPL)
}
tmpl := template.New(r.URL.Path).Funcs(template.FuncMap{
"feeds": func() ([]feeds.Feed, error) {
all := []feeds.Feed{}
err := feeds.ForEach(ctx, func(f feeds.Feed) error {
all = append(all, f)
return ctx.Err()
})
return all, err
"feeds": func() []feeds.Feed {
return all
},
"feedsVersionFields": func() []string {
b, _ := json.Marshal(feeds.Version{})
@@ -52,8 +89,11 @@ func (h Handler) uiIndex(w http.ResponseWriter, r *http.Request) error {
slices.Sort(ks)
return ks
},
"namespan": func(k string, v any) string {
return fmt.Sprintf("<span name=%q>%s</span>", k, v)
"in": func(k string, v ...string) bool {
return slices.Contains(v, k)
},
"ago": func(t time.Time) time.Duration {
return time.Since(t)
},
})
@@ -62,5 +102,16 @@ func (h Handler) uiIndex(w http.ResponseWriter, r *http.Request) error {
return err
}
return tmpl.Execute(w, nil)
args := map[string]any{}
{
b, _ := json.Marshal(editing)
var m map[string]any
json.Unmarshal(b, &m)
args["editing"] = m
args["editing_url"], _ = editing.FetchURL()
}
return tmpl.Execute(w, args)
}

View File

@@ -5,18 +5,12 @@ import (
"fmt"
"net"
"net/http"
"os"
"show-rss/src/cmd/server/handler"
"strconv"
"show-rss/src/server"
)
func Main(ctx context.Context) error {
port, _ := strconv.Atoi(os.Getenv("PORT"))
if port == 0 {
port = 10000
}
return Run(ctx, fmt.Sprintf(":%d", port))
return Run(ctx, fmt.Sprintf(":%d", server.Extract(ctx)))
}
func Run(ctx context.Context, listen string) error {

31
src/cron/cron.go Normal file
View File

@@ -0,0 +1,31 @@
package cron
import (
"context"
"time"
)
func Cron(ctx context.Context, next func(context.Context) (time.Time, error), do func(ctx context.Context) error) error {
n, err := next(ctx)
if err != nil {
return err
}
c := time.NewTicker(3 * time.Minute)
defer c.Stop()
for {
select {
case <-ctx.Done():
case <-c.C:
n, err = next(ctx)
if err != nil {
return err
}
case <-time.After(time.Until(n)):
if err := do(ctx); err != nil {
return err
}
}
}
return ctx.Err()
}

View File

@@ -25,7 +25,7 @@ func Test(t *testing.T, ctx context.Context) context.Context {
t.Fatalf("failed to inject db %s: %v", p, err)
}
t.Cleanup(func() {
db, err := extract(ctx)
db, _, err := extract(ctx)
if err != nil {
return
}
@@ -35,7 +35,7 @@ func Test(t *testing.T, ctx context.Context) context.Context {
}
func Inject(ctx context.Context, conn string) (context.Context, error) {
if _, err := extract(ctx); err == nil {
if _, _, err := extract(ctx); err == nil {
return ctx, nil
}
@@ -69,13 +69,13 @@ func Inject(ctx context.Context, conn string) (context.Context, error) {
return ctx, err
}
return context.WithValue(ctx, ctxKey, db), ctx.Err()
return context.WithValue(context.WithValue(ctx, ctxKey+"_lock", newSemaphore()), ctxKey, db), ctx.Err()
}
func extract(ctx context.Context) (*sql.DB, error) {
func extract(ctx context.Context) (*sql.DB, semaphore, error) {
db := ctx.Value(ctxKey)
if db == nil {
return nil, fmt.Errorf("db not injected")
return nil, nil, fmt.Errorf("db not injected")
}
return db.(*sql.DB), nil
return db.(*sql.DB), ctx.Value(ctxKey + "_lock").(semaphore), nil
}

View File

@@ -129,9 +129,11 @@ func Exec(ctx context.Context, q string, args ...any) error {
}
func with(ctx context.Context, foo func(*sql.DB) error) error {
db, err := extract(ctx)
db, sem, err := extract(ctx)
if err != nil {
return err
}
return sem.With(ctx, func() error {
return foo(db)
})
}

21
src/db/sem.go Normal file
View File

@@ -0,0 +1,21 @@
package db
import "context"
type semaphore chan struct{}
func newSemaphore() semaphore {
return make(semaphore, 1)
}
func (semaphore semaphore) With(ctx context.Context, cb func() error) error {
select {
case semaphore <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
defer func() {
<-semaphore
}()
return cb()
}

View File

@@ -2,8 +2,10 @@ package feeds
import (
"context"
"io"
"fmt"
"net/url"
"show-rss/src/db"
"show-rss/src/server"
"time"
"github.com/google/uuid"
@@ -39,6 +41,24 @@ type (
}
)
func Next(ctx context.Context) (time.Time, error) {
result := time.Now().Add(3 * time.Minute)
err := ForEach(ctx, func(f Feed) error {
next, err := f.Next()
if err != nil {
return nil
}
if next.After(result) {
return nil
}
result = next
return nil
})
return result, err
}
func ForEach(ctx context.Context, cb func(Feed) error) error {
if err := initDB(ctx); err != nil {
return err
@@ -52,18 +72,27 @@ func ForEach(ctx context.Context, cb func(Feed) error) error {
return err
}
var errs []string
for _, id := range ids {
feed, err := Get(ctx, id.ID)
if err != nil {
return err
} else if err := cb(feed); err != nil {
return err
errs = append(errs, fmt.Sprintf(`failed to fetch %s: %v`, id.ID, err))
}
}
if len(errs) > 0 {
return fmt.Errorf("failed some callbacks: %+v", errs)
}
return nil
}
func Delete(ctx context.Context, id string) error {
return Update(ctx, id, "", "", "", "", "", "")
}
func Get(ctx context.Context, id string) (Feed, error) {
if err := initDB(ctx); err != nil {
return Feed{}, err
@@ -116,10 +145,9 @@ func Get(ctx context.Context, id string) (Feed, error) {
LIMIT 1
) AS "Execution.Version"
FROM entry
JOIN "feed.versions" version_entries_id ON
version_entries_id.entries_id=entry.ID
JOIN "feed.versions" versions ON
versions.created_at=entry.Updated
WHERE versions.entries_id=entry.ID
`, id, id)
}
@@ -174,12 +202,34 @@ func Insert(ctx context.Context, url, cron, pattern, webhookMethod, webhookURL,
)
}
func (feed Feed) Update(ctx context.Context, url, cron, pattern, tag *string) error {
return io.EOF
}
func Update(ctx context.Context, id string, url, cron, pattern, webhookMethod, webhookURL, webhookBody string) error {
if err := initDB(ctx); err != nil {
return err
}
func (feed Feed) Delete(ctx context.Context) error {
return io.EOF
if _, err := Get(ctx, id); err != nil {
return err
}
now := time.Now()
return db.Exec(ctx, `
BEGIN;
UPDATE "feed.entries" SET updated_at=$1 WHERE id=$2;
INSERT INTO "feed.versions" (
entries_id,
created_at,
url,
cron,
pattern,
webhook_method,
webhook_url,
webhook_body
) VALUES ($3, $4, $5, $6, $7, $8, $9, $10);
COMMIT;
`,
now, id,
id, now, url, cron, pattern, webhookMethod, webhookURL, webhookBody,
)
}
func getEntry(ctx context.Context, id string) (Entry, error) {
@@ -230,3 +280,17 @@ func initDB(ctx context.Context) error {
)`,
})
}
func (feed Feed) Webhook(ctx context.Context) (string, string, string) {
u, _ := url.Parse(feed.Version.WebhookURL)
switch u.Scheme {
case "vpntor":
return "POST", fmt.Sprintf("http://localhost:%d/v1/vpntor", server.Extract(ctx)), fmt.Sprintf(`{
"Magnet": "{{ .Item.Link }}",
"Dir": %q,
"URL": "https://vpntor.int.bel.blue/transmission/rpc"
}`, u.Path)
default:
return feed.Version.WebhookMethod, feed.Version.WebhookURL, feed.Version.WebhookBody
}
}

View File

@@ -112,5 +112,31 @@ func TestFeeds(t *testing.T) {
} else if n == 0 {
t.Errorf("for each didnt hit known get")
}
if err := feeds.Update(ctx, id, "url2", "cron2", "pattern2", "wmethod2", "wurl2", "wbody2"); err != nil {
t.Fatal("cannot update:", err)
}
got, err = feeds.Get(ctx, id)
if err != nil {
t.Fatal("cannot get updated:", err)
}
if v := got.Version.URL; v != "url2" {
t.Error(v)
}
if v := got.Version.Cron; v != "cron2" {
t.Error(v)
}
if v := got.Version.Pattern; v != "pattern2" {
t.Error(v)
}
if v := got.Version.WebhookMethod; v != "wmethod2" {
t.Error(v)
}
if v := got.Version.WebhookURL; v != "wurl2" {
t.Error(v)
}
if v := got.Version.WebhookBody; v != "wbody2" {
t.Error(v)
}
})
}

View File

@@ -28,10 +28,14 @@ var (
)
func (feed Feed) ShouldExecute() (bool, error) {
if !feed.Entry.Deleted.IsZero() {
if !feed.Entry.Deleted.IsZero() || feed.Version.URL == "" {
return false, nil
}
next, err := feed.Next()
return time.Now().After(next), err
}
func (feed Feed) Next() (time.Time, error) {
schedule, err := cron.NewParser(
cron.SecondOptional |
cron.Minute |
@@ -42,10 +46,9 @@ func (feed Feed) ShouldExecute() (bool, error) {
cron.Descriptor,
).Parse(feed.Version.Cron)
if err != nil {
return false, fmt.Errorf("illegal cron %q", feed.Version.Cron)
return time.Time{}, fmt.Errorf("illegal cron %q", feed.Version.Cron)
}
next := schedule.Next(feed.Execution.Executed)
return time.Now().After(next), nil
return schedule.Next(feed.Execution.Executed), nil
}
func (feed Feed) Fetch(ctx context.Context) (Items, error) {
@@ -100,8 +103,14 @@ func (feed Feed) Fetch(ctx context.Context) (Items, error) {
slices.Sort(links)
links = slices.Compact(links)
var link string
if len(links) > 0 {
link = links[0]
}
result = append(result, Item{
Title: gitem.Title,
Link: link,
Links: links,
Preview: preview,
Body: body,
@@ -132,14 +141,18 @@ func proxyFetch(ctx context.Context, u string) (string, error) {
b, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed fetch: (%d) %s", resp.StatusCode, b)
return "", fmt.Errorf("failed fetch %s %s: (%d) %s", req.Method, req.URL.String(), resp.StatusCode, b)
}
return string(b), nil
}
func (feed Feed) FetchURL() (*url.URL, error) {
u, err := url.Parse(feed.Version.URL)
return feed.Version.FetchURL()
}
func (version Version) FetchURL() (*url.URL, error) {
u, err := url.Parse(version.URL)
if err != nil {
return nil, err
}
@@ -148,7 +161,7 @@ func (feed Feed) FetchURL() (*url.URL, error) {
case "nyaa": // `nyaa://?q=A B` to `https://nyaa.si/?page=rss&q=A%20B&c=0_0&f=0`
q := u.Query()
if q.Get("q") == "" {
return nil, fmt.Errorf("invalid nyaa:// (%s): no ?q", feed.Version.URL)
return nil, fmt.Errorf("invalid nyaa:// (%s): no ?q", version.URL)
}
q.Set("page", "rss")

View File

@@ -67,6 +67,7 @@ func TestFeedFetch(t *testing.T) {
expect := feeds.Item{
Title: `Cheap 'Transforming' Electric Truck Announced by Jeff Bezos-Backed Startup`,
Link: `https://tech.slashdot.org/story/25/04/26/0425259/cheap-transforming-electric-truck-announced-by-jeff-bezos-backed-startup?utm_source=rss1.0mainlinkanon&utm_medium=feed`,
Links: []string{`https://tech.slashdot.org/story/25/04/26/0425259/cheap-transforming-electric-truck-announced-by-jeff-bezos-backed-startup?utm_source=rss1.0mainlinkanon&utm_medium=feed`},
Preview: `It's a pickup truck "that can change into whatever...`,
}

View File

@@ -4,6 +4,7 @@ type Items []Item
type Item struct {
Title string
Link string
Links []string
Preview string
Body string

18
src/server/config.go Normal file
View File

@@ -0,0 +1,18 @@
package server
import (
"context"
)
func Inject(ctx context.Context, port int) context.Context {
return context.WithValue(ctx, "server.port", port)
}
func Extract(ctx context.Context) int {
v := ctx.Value("server.port")
port, ok := v.(int)
if !ok {
return 10_000
}
return port
}

45
src/slow/reader.go Normal file
View File

@@ -0,0 +1,45 @@
package slow
import (
"context"
"io"
"golang.org/x/time/rate"
)
type Reader struct {
ctx context.Context
limiter rate.Limiter
r io.Reader
}
var _ io.Reader = &Reader{}
func NewReader(ctx context.Context, bps rate.Limit, r io.Reader) *Reader {
return &Reader{
ctx: ctx,
limiter: *rate.NewLimiter(bps, int(bps)),
r: r,
}
}
func (r *Reader) Read(b []byte) (int, error) {
n, err := r.r.Read(b)
m := 0
burst := r.limiter.Burst()
for m < n {
page := burst
if left := n - m; page > left {
page = left
}
if err := r.limiter.WaitN(r.ctx, page); err != nil {
return n, err
}
m += page
}
return n, err
}

20
src/slow/reader_test.go Normal file
View File

@@ -0,0 +1,20 @@
package slow_test
import "show-rss/src/slow"
import "testing"
import "context"
import "bytes"
import "io"
func TestReader(t *testing.T) {
junk := bytes.NewReader(bytes.Repeat([]byte("1"), 256_000))
slowReader := slow.NewReader(context.Background(), 300_000, junk)
buff := bytes.NewBuffer(nil)
if n, err := io.Copy(buff, slowReader); err != nil {
t.Fatal(err)
} else if n != 256_000 {
t.Fatal(n)
}
}