Compare commits

..

35 Commits

Author SHA1 Message Date
bel
18ac13fd57 should not execute feed if version.url=="" 2025-12-10 08:40:28 -07:00
bel
375fc1000a feeds for each runs all and returns all errs 2025-12-10 08:38:59 -07:00
bel
47c7aa74d3 uncap retries on main 2025-12-10 08:23:54 -07:00
bel
a6aad2820d cap retries 2025-12-10 08:13:45 -07:00
bel
b26afcb325 log 2025-12-02 16:30:10 -07:00
bel
613bfdf96e transcode entrypoint 2025-12-02 16:28:55 -07:00
Bel LaPointe
81507319dd disable by default 2025-12-02 16:25:15 -07:00
bel
50ad3bb3db install_scratch.sh 2025-11-30 09:00:47 -07:00
bel
07992b6636 delete less accidentally clickable 2025-11-30 08:56:37 -07:00
bel
9583234df5 do not list deleted (url == "") 2025-11-30 08:54:28 -07:00
bel
2943362587 if POST /?delete then DELETE 2025-11-30 08:54:18 -07:00
bel
cbd4e32022 DELETE /v1/feeds/abc updates all fields to "" 2025-11-30 08:44:27 -07:00
bel
727b4fdea6 from testdata to public for .html 2025-11-30 08:40:15 -07:00
bel
fd7dcafd4e timeout ass to srt 2025-06-01 10:28:43 -06:00
Bel LaPointe
4fbc96b96f dont fail on err but log all 2025-06-01 10:27:00 -06:00
Bel LaPointe
0afb6535b6 impl -e=best-ass-to-srt 2025-06-01 09:55:01 -06:00
Bel LaPointe
10a40d4a54 nopanik choose best lang 2025-06-01 09:52:18 -06:00
Bel LaPointe
4bdfbd1f06 panik till fixed 2025-05-31 11:15:00 -06:00
Bel LaPointe
44bcc0ba2e bestasstosrt does not also remove all sub streams from mkv 2025-05-31 11:14:43 -06:00
Bel LaPointe
b17801060e refactor for fix the world 2025-05-31 11:06:59 -06:00
bel
99c1061a18 fix asses.Next and do asses.Record 2025-05-31 10:56:04 -06:00
bel
67840f6b28 asses does not fail on individual failure 2025-05-30 06:25:06 -06:00
bel
cb44644475 -e=deport-ass filename 2025-05-26 21:41:39 -06:00
bel
6626077201 log asses.Main new dir processing 2025-05-25 11:49:16 -06:00
bel
9c0129f968 if ffmpeg -i .name.ass fails, then rm .name.ass 2025-05-25 11:46:36 -06:00
Bel LaPointe
8efffd0fe4 no size="..." in .srt 2025-05-22 11:29:14 -06:00
bel
fe02d1624f fix now() InLocation(local), deadline per-one 2025-05-17 21:24:32 -06:00
bel
75b7e21bec asses.Next returns midnight if outside working hours 2025-05-17 21:17:26 -06:00
bel
be148f5de5 assing 12am-8am only 2025-05-17 21:10:44 -06:00
bel
a74f741298 deterministic rand for rescan trheshold per file 2025-05-17 21:05:19 -06:00
bel
fba3d635ea if optimistically assuming cksum unchanged, then do not get cksum, just update checked, modified 2025-05-17 20:57:41 -06:00
bel
8f18cbae3a fix small Read() big rate for slow.Reader 2025-05-17 20:53:15 -06:00
bel
d73b63f43c read fast 2025-05-17 20:41:23 -06:00
bel
f57560ebfc fix WEEKS to recheck and assume empty modified has happy cksum 2025-05-17 20:36:59 -06:00
bel
fc66d26c10 log if no check 2025-05-17 20:25:54 -06:00
19 changed files with 525 additions and 109 deletions

3
install_scratch.sh Normal file
View File

@@ -0,0 +1,3 @@
#! /usr/bin/env bash
CGO_ENABLED=1 CC=x86_64-linux-musl-gcc go install -ldflags="-linkmode external -extldflags '-static'"

View File

@@ -13,6 +13,10 @@ func Next(ctx context.Context) (time.Time, error) {
return time.Time{}, err return time.Time{}, err
} }
if deadline := Deadline(); time.Since(deadline) > time.Minute {
return midnightLastNight().Add(24 * time.Hour), nil
}
type Did struct { type Did struct {
Did time.Time Did time.Time
} }
@@ -22,7 +26,7 @@ func Next(ctx context.Context) (time.Time, error) {
ORDER BY executed_at DESC ORDER BY executed_at DESC
LIMIT 1 LIMIT 1
`) `)
return result.Did, err return result.Did.Add(time.Hour), err
} }
func Record(ctx context.Context) error { func Record(ctx context.Context) error {
@@ -56,6 +60,15 @@ func checked(ctx context.Context, p, cksum string, modified time.Time) error {
return err return err
} }
if cksum != "" {
} else if err := db.Exec(ctx, `
UPDATE "asses.checks"
SET checked_at=$2, modified=$3
WHERE p=$1
`, p, time.Now(), modified); err == nil {
return nil
}
return db.Exec(ctx, ` return db.Exec(ctx, `
INSERT INTO "asses.checks" INSERT INTO "asses.checks"
(p, checked_at, cksum, modified) (p, checked_at, cksum, modified)

View File

@@ -13,7 +13,7 @@ func TestNextRecord(t *testing.T) {
if v, err := asses.Next(ctx); err != nil { if v, err := asses.Next(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} else if zero := v.IsZero(); !zero { } else if zero := v.IsZero(); !zero && time.Now().Hour() < 8 {
t.Fatal(v) t.Fatal(v)
} }
@@ -23,7 +23,7 @@ func TestNextRecord(t *testing.T) {
if v, err := asses.Next(ctx); err != nil { if v, err := asses.Next(ctx); err != nil {
t.Fatal(err) t.Fatal(err)
} else if since := time.Since(v); since > time.Minute { } else if since := time.Since(v); since > time.Minute && time.Now().Hour() < 8 {
t.Fatal(since) t.Fatal(since)
} }
} }

15
src/asses/deadline.go Normal file
View File

@@ -0,0 +1,15 @@
package asses
import "time"
func Deadline() time.Time {
return midnightLastNight().Add(8 * time.Hour) // midnight-8AM
}
func midnightLastNight() time.Time {
t, err := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local)
if err != nil {
panic(err)
}
return t
}

View File

@@ -9,10 +9,16 @@ import (
"os/exec" "os/exec"
"path" "path"
"path/filepath" "path/filepath"
"regexp"
"slices" "slices"
"strings" "strings"
"time"
) )
func Entrypoint(ctx context.Context, p string) error {
return deport(ctx, p)
}
func deport(ctx context.Context, p string) error { func deport(ctx context.Context, p string) error {
if os.Getenv("NO_DEPORT") != "" { if os.Getenv("NO_DEPORT") != "" {
log.Printf("would deport %s", p) log.Printf("would deport %s", p)
@@ -41,67 +47,28 @@ func deport(ctx context.Context, p string) error {
} }
} }
asses, err := filepath.Glob(path.Join( if err := BestAssToSRT(ctx, p); err != nil {
path.Dir(p),
fmt.Sprintf(".%s.*.ass", path.Base(p)),
))
if err != nil {
return err return err
} }
for _, ass := range asses {
srt := fmt.Sprintf("%s.srt", strings.TrimSuffix(ass, ".ass"))
if _, err := os.Stat(srt); err == nil {
continue
}
if err := ffmpeg(ctx, "-y", "-i", ass, srt); err != nil { base := path.Base(p)
return err withoutExt := strings.TrimSuffix(base, path.Ext(base))
} p2 := path.Join(path.Dir(p), fmt.Sprintf("%s.subless.mkv", withoutExt))
args := []string{
"-i", p,
"-map", "0",
} }
for _, assStream := range assStreams {
srts, err := filepath.Glob(path.Join( args = append(args, "-map", "-"+assStream.id)
path.Dir(p), }
fmt.Sprintf(".%s.*.srt", path.Base(p)), args = append(args,
)) "-c", "copy",
if err != nil { p2,
)
if err := ffmpeg(ctx, args...); err != nil {
return err
} else if err := os.Rename(p2, p); err != nil {
return err return err
}
slices.SortFunc(srts, func(a, b string) int {
// if skip a { return 1 }
// if skip b { return -1 }
// return -1 * (wc(a) - wc(b))
return strings.Compare(a, b)
})
for i := range srts {
if i == 0 {
base := path.Base(p)
withoutExt := strings.TrimSuffix(base, path.Ext(base))
srt := path.Join(path.Dir(p), fmt.Sprintf("%s.srt", withoutExt))
if err := os.Rename(srts[i], srt); err != nil {
return err
}
p2 := path.Join(path.Dir(p), fmt.Sprintf("%s.subless.mkv", withoutExt))
args := []string{
"-i", p,
"-map", "0",
}
for _, assStreamID := range assStreamIDs {
args = append(args, "-map", "-"+assStreamID)
}
args = append(args,
"-c", "copy",
p2,
)
if err := ffmpeg(ctx, args...); err != nil {
return err
} else if err := os.Rename(p2, p); err != nil {
return err
}
} else {
os.Remove(srts[i])
}
} }
return nil return nil
@@ -166,3 +133,115 @@ func execc(ctx context.Context, bin string, args ...string) (string, error) {
err := cmd.Run() err := cmd.Run()
return string(stdout.Bytes()), err return string(stdout.Bytes()), err
} }
func BestAssToSRT(ctx context.Context, p string) error {
asses, err := filepath.Glob(path.Join(
path.Dir(p),
fmt.Sprintf(".%s.*.ass", path.Base(p)),
))
if err != nil {
return err
}
srts := []string{}
for _, ass := range asses {
srt, err := assToSRT(ctx, ass)
if err != nil {
return err
}
srts = append(srts, srt)
}
srts = SRTsByGoodness(srts)
for i := range srts {
if i == 0 {
base := path.Base(p)
withoutExt := strings.TrimSuffix(base, path.Ext(base))
srt := path.Join(path.Dir(p), fmt.Sprintf("%s.srt", withoutExt))
if err := os.Rename(srts[i], srt); err != nil {
return err
}
} else {
os.Remove(srts[i])
}
}
return nil
}
func assToSRT(ctx context.Context, ass string) (string, error) {
ctx, can := context.WithTimeout(ctx, 30*time.Second)
defer can()
srt := fmt.Sprintf("%s.srt", strings.TrimSuffix(ass, ".ass"))
if _, err := os.Stat(srt); err == nil {
return srt, nil
}
if err := ffmpeg(ctx, "-y", "-i", ass, srt); err != nil {
if ctx.Err() == nil {
log.Printf("ffmpeg failed to process %s; removing", ass)
os.Remove(ass)
}
return srt, err
}
b, err := os.ReadFile(srt)
if err != nil {
return srt, err
}
before := len(b)
b = regexp.MustCompile(`size="[^"]*"`).ReplaceAll(b, []byte{})
if after := len(b); before == after {
} else if err := os.WriteFile(srt, b, os.ModePerm); err != nil {
return srt, err
}
return srt, nil
}
func SRTsByGoodness(srts []string) []string {
skippers := []*regexp.Regexp{
regexp.MustCompile(`(?i)lat.*amer`),
regexp.MustCompile(`(?i)signs`),
regexp.MustCompile(`(?i)rus`),
regexp.MustCompile(`(?i)por`),
regexp.MustCompile(`(?i)ita`),
regexp.MustCompile(`(?i)fre`),
regexp.MustCompile(`(?i)spa`),
regexp.MustCompile(`(?i)ger`),
regexp.MustCompile(`(?i)ara`),
regexp.MustCompile(`(?i)jpn`),
regexp.MustCompile(`(?i)urop`),
regexp.MustCompile(`(?i)razil`),
regexp.MustCompile(`(?i)Deu`),
regexp.MustCompile(`(?i)ara`),
}
keepers := []*regexp.Regexp{
regexp.MustCompile(`(?i)^eng$`),
}
srts = slices.Clone(srts)
slices.SortFunc(srts, func(a, b string) int {
a = strings.ToLower(a)
b = strings.ToLower(b)
for _, skipper := range skippers {
if skipper.MatchString(b) {
return -1
} else if skipper.MatchString(a) {
return 1
}
}
for _, keeper := range keepers {
if keeper.MatchString(a) {
return -1
} else if keeper.MatchString(b) {
return 1
}
}
return strings.Compare(a, b)
})
return srts
}

47
src/asses/deport_test.go Normal file
View File

@@ -0,0 +1,47 @@
package asses_test
import (
"show-rss/src/asses"
"testing"
)
func TestSRTsByGoodness(t *testing.T) {
cases := map[string]struct {
given []string
want string
}{
"eng": {
given: []string{"a", "eng"},
want: "eng",
},
"eng nocap": {
given: []string{"A", "eng"},
want: "eng",
},
".Apothecary_Diaries_S02E19.mkv.0:9.ita.ass": {
given: []string{
".Apothecary_Diaries_S02E19.mkv.0:10.rus.srt",
".Apothecary_Diaries_S02E19.mkv.0:2.eng.srt",
".Apothecary_Diaries_S02E19.mkv.0:3.por.srt",
".Apothecary_Diaries_S02E19.mkv.0:4.spa.srt",
".Apothecary_Diaries_S02E19.mkv.0:5.spa.srt",
".Apothecary_Diaries_S02E19.mkv.0:6.ara.srt",
".Apothecary_Diaries_S02E19.mkv.0:7.fre.srt",
".Apothecary_Diaries_S02E19.mkv.0:8.ger.srt",
".Apothecary_Diaries_S02E19.mkv.0:9.ita.srt",
},
want: ".Apothecary_Diaries_S02E19.mkv.0:2.eng.srt",
},
}
for name, d := range cases {
name := name
c := d
t.Run(name, func(t *testing.T) {
got := asses.SRTsByGoodness(c.given)
if got[0] != c.want {
t.Errorf("expected %s but got %s (%+v)", c.want, got[0], got)
}
})
}
}

View File

@@ -11,9 +11,24 @@ import (
"os" "os"
"path" "path"
"show-rss/src/slow" "show-rss/src/slow"
"strconv"
"time" "time"
"golang.org/x/time/rate"
) )
var EnvCksumBPS = func() int {
s := os.Getenv("CKSUM_BPS")
if s == "" {
return 50_000_000
}
n, err := strconv.Atoi(s)
if err != nil || n < 1 {
panic(err)
}
return n
}()
func One(ctx context.Context, p string) error { func One(ctx context.Context, p string) error {
shortp := path.Join("...", path.Base(path.Dir(p)), path.Base(p)) shortp := path.Join("...", path.Base(path.Dir(p)), path.Base(p))
@@ -22,16 +37,35 @@ func One(ctx context.Context, p string) error {
return err return err
} }
if threshold := 20 + time.Duration(rand.Int()%10)*24*time.Hour; time.Since(last.T) > threshold { threshold := 20 + rand.New(rand.NewSource(func() int64 {
b := md5.New().Sum([]byte(p))
var sum int64
for _, c := range b {
sum += int64(c)
sum *= int64(c)
}
return sum
}())).Int()%10
if daysSince := int(time.Since(last.T).Hours() / 24); daysSince > threshold {
log.Printf("asses.One(%s) // no modified check as %vd since last check", shortp, daysSince)
} else if stat, err := os.Stat(p); err != nil { } else if stat, err := os.Stat(p); err != nil {
return fmt.Errorf("cannot stat %s: %w", p, err) return fmt.Errorf("cannot stat %s: %w", p, err)
} else if stat.ModTime() == last.Modified { } else if stat.ModTime() == last.Modified {
log.Printf("asses.One(%s) // unmodified since %v", shortp, last.T) //log.Printf("asses.One(%s) // unmodified since %v", shortp, last.T)
return nil return nil
} else {
log.Printf("asses.One(%s) // modified (%v) is now %v", shortp, last.Modified, stat.ModTime())
} }
doCksum := true
if err := func() error { if err := func() error {
if len(last.Cksum) > 0 { if len(last.Cksum) > 0 {
if last.Modified.IsZero() {
doCksum = false
log.Printf("asses.One(%s) // assume cksum unchanged given null modified ", shortp)
return nil
}
cksum, err := Cksum(ctx, p) cksum, err := Cksum(ctx, p)
if err != nil { if err != nil {
return err return err
@@ -48,20 +82,34 @@ func One(ctx context.Context, p string) error {
return err return err
} }
log.Printf("asses.transcode(%s)...", shortp)
if err := transcode(ctx, p); err != nil {
return err
}
return nil return nil
}(); err != nil { }(); err != nil {
return err return err
} }
cksum, err := Cksum(ctx, p) var cksum string
if err != nil { if doCksum {
return err var err error
cksum, err = Cksum(ctx, p)
if err != nil {
return err
}
} }
stat, err := os.Stat(p) stat, err := os.Stat(p)
if err != nil { if err != nil {
return err return err
} }
return checked(ctx, p, cksum, stat.ModTime()) if err := checked(ctx, p, cksum, stat.ModTime()); err != nil {
log.Printf("failed to mark %s checked: %v", shortp, err)
return err
}
return nil
} }
func Cksum(ctx context.Context, p string) (string, error) { func Cksum(ctx context.Context, p string) (string, error) {
@@ -72,6 +120,6 @@ func Cksum(ctx context.Context, p string) (string, error) {
defer f.Close() defer f.Close()
hasher := md5.New() hasher := md5.New()
_, err = io.Copy(hasher, slow.NewReader(ctx, 10_000_000, f)) _, err = io.Copy(hasher, slow.NewReader(ctx, rate.Limit(EnvCksumBPS), f))
return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err return base64.StdEncoding.EncodeToString(hasher.Sum(nil)), err
} }

View File

@@ -12,25 +12,31 @@ import (
func TestOne(t *testing.T) { func TestOne(t *testing.T) {
ctx := db.Test(t, context.Background()) ctx := db.Test(t, context.Background())
os.Setenv("DO_TRANSCODE", "true")
d := t.TempDir() d := t.TempDir()
b, _ := os.ReadFile(path.Join("testdata", "survivor_au_S11E12.smoller.mkv")) b, _ := os.ReadFile(path.Join("testdata", "survivor_au_S11E12.smoller.mkv"))
p := path.Join(d, "f.mkv") p := path.Join(d, "f.mkv")
os.WriteFile(p, b, os.ModePerm) os.WriteFile(p, b, os.ModePerm)
t.Logf("initial cksum...")
cksum, _ := asses.Cksum(context.Background(), p) cksum, _ := asses.Cksum(context.Background(), p)
t.Logf("one && one...")
if err := asses.One(ctx, p); err != nil { if err := asses.One(ctx, p); err != nil {
t.Fatal(err) t.Fatal(err)
} else if err := asses.One(ctx, p); err != nil { } else if err := asses.One(ctx, p); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("test -f...")
if _, err := os.Stat(p); err != nil { if _, err := os.Stat(p); err != nil {
t.Fatalf("lost original mkv: %v", err) t.Fatalf("lost original mkv: %v", err)
} else if _, err := os.Stat(path.Join(d, "f.srt")); err != nil { } else if _, err := os.Stat(path.Join(d, "f.srt")); err != nil {
t.Fatalf("no new srt: %v", err) t.Fatalf("no new srt: %v", err)
} }
t.Logf("final cksum...")
newCksum, _ := asses.Cksum(context.Background(), p) newCksum, _ := asses.Cksum(context.Background(), p)
if cksum == newCksum { if cksum == newCksum {
t.Fatalf("cksum unchanged") t.Fatalf("cksum unchanged")

64
src/asses/transcode.go Normal file
View File

@@ -0,0 +1,64 @@
package asses
import (
"context"
"fmt"
"log"
"os"
"path"
"slices"
"strings"
)
func EntrypointTranscode(ctx context.Context, p string) error {
return transcode(ctx, p)
}
func transcode(ctx context.Context, p string) error {
if os.Getenv("NO_TRANSCODE") != "" || os.Getenv("DO_TRANSCODE") == "" {
log.Printf("would transcode %s but $NO_TRANSCODE=x or $DO_TRANSCODE=", p)
return nil
}
output, err := ffprobe(ctx, "-i", p)
if err != nil {
return err
}
h264 := slices.ContainsFunc(strings.Split(output, "\n"), func(line string) bool {
return strings.Contains(line, "tream #") && strings.Contains(line, "Video: ") && strings.Contains(line, "h264")
})
aac := slices.ContainsFunc(strings.Split(output, "\n"), func(line string) bool {
return strings.Contains(line, "tream #") && strings.Contains(line, "Audio: ") && strings.Contains(line, "aac")
})
if h264 && aac {
return nil
}
p2 := p + ".en" + path.Ext(p)
if err := ffmpeg(ctx, "-y",
"-i", p,
"-vcodec", "libx264",
"-acodec", "aac",
p2,
); err != nil {
return err
}
output2, err := ffprobe(ctx, "-i", p)
if err != nil {
return err
}
df := func(line string) bool {
return !strings.Contains(line, "tream #")
}
originalStreams := slices.DeleteFunc(strings.Split(output, "\n"), df)
newStreams := slices.DeleteFunc(strings.Split(output2, "\n"), df)
if len(originalStreams) != len(newStreams) {
return fmt.Errorf("stream count changed from transcode")
}
return os.Rename(p2, p)
}

View File

@@ -3,6 +3,7 @@ package asses
import ( import (
"context" "context"
"io/fs" "io/fs"
"log"
"path" "path"
"path/filepath" "path/filepath"
"show-rss/src/asses" "show-rss/src/asses"
@@ -21,7 +22,24 @@ func Main(ctx context.Context) error {
} }
func One(ctx context.Context) error { func One(ctx context.Context) error {
return OneWith(ctx, rootDs, asses.One) ctx, can := context.WithDeadline(ctx, asses.Deadline())
defer can()
lastD := ""
if err := OneWith(ctx, rootDs, func(ctx context.Context, p string) error {
if d := path.Dir(p); d != lastD {
log.Printf("asses.One(%s/...)...", d)
lastD = d
}
if err := asses.One(ctx, p); err != nil {
log.Printf("asses.One(.../%s/%s)...: err: %v", path.Base(path.Dir(p)), path.Base(p), err)
}
return nil
}); err != nil {
return err
}
return asses.Record(ctx)
} }
func OneWith(ctx context.Context, rootds []string, cb CB) error { func OneWith(ctx context.Context, rootds []string, cb CB) error {

View File

@@ -10,8 +10,10 @@ import (
) )
type Flags struct { type Flags struct {
DB string DB string
Port int Port int
Entrypoint Entrypoint
Pos []string
} }
func NewFlags(args []string) (Flags, error) { func NewFlags(args []string) (Flags, error) {
@@ -19,26 +21,28 @@ func NewFlags(args []string) (Flags, error) {
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
fs.StringVar(&result.DB, "db", "/tmp/f.db", "path to sqlite.db") fs.StringVar(&result.DB, "db", "/tmp/f.db", "path to sqlite.db")
fs.IntVar(&result.Port, "p", 10_000, "port for http") fs.IntVar(&result.Port, "p", 10000, "port for http")
fs.Var(&result.Entrypoint, "e", "entrypoint")
err := fs.Parse(args) err := fs.Parse(args)
result.Pos = fs.Args()
return result, err return result, err
} }
func Config(ctx context.Context, args []string) (context.Context, func(), error) { func Config(ctx context.Context, args []string) (context.Context, func(), Flags, error) {
flags, err := NewFlags(args) flags, err := NewFlags(args)
if err != nil { if err != nil {
return ctx, nil, err return ctx, nil, flags, err
} }
ctx, err = db.Inject(ctx, flags.DB) ctx, err = db.Inject(ctx, flags.DB)
if err != nil { if err != nil {
return ctx, nil, err return ctx, nil, flags, err
} }
ctx = server.Inject(ctx, flags.Port) ctx = server.Inject(ctx, flags.Port)
return ctx, func() { return ctx, func() {
cleanup.Extract(ctx)() cleanup.Extract(ctx)()
}, nil }, flags, nil
} }

View File

@@ -4,10 +4,12 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
inass "show-rss/src/asses"
"show-rss/src/cmd/asses" "show-rss/src/cmd/asses"
"show-rss/src/cmd/fetch" "show-rss/src/cmd/fetch"
"show-rss/src/cmd/server" "show-rss/src/cmd/server"
"show-rss/src/pool" "show-rss/src/pool"
"strings"
"time" "time"
) )
@@ -15,27 +17,105 @@ func Main(ctx context.Context, args []string) error {
ctx, can := context.WithCancel(ctx) ctx, can := context.WithCancel(ctx)
defer can() defer can()
ctx, can, err := Config(ctx, args) ctx, can, flags, err := Config(ctx, args)
if err != nil { if err != nil {
return fmt.Errorf("failed to inject: %w", err) return fmt.Errorf("failed to inject: %w", err)
} }
defer can() defer can()
foos := map[string]func(context.Context) error{ switch flags.Entrypoint {
"server": server.Main, case Defacto:
"fetch": fetch.Main, foos := map[string]func(context.Context) error{
"asses": asses.Main, "server": server.Main,
} "fetch": fetch.Main,
p := pool.New(len(foos)) "asses": asses.Main,
defer p.Wait(ctx)
for k, foo := range foos {
if err := p.Go(ctx, k, runner(ctx, k, foo)); err != nil {
return fmt.Errorf("failed to go %s: %v", k, err)
} }
} p := pool.New(len(foos))
defer p.Wait(ctx)
return p.Wait(ctx) for k, foo := range foos {
if err := p.Go(ctx, k, runner(ctx, k, foo)); err != nil {
return fmt.Errorf("failed to go %s: %v", k, err)
}
}
return p.Wait(ctx)
case DeportAss:
for _, pos := range flags.Pos {
if err := inass.Entrypoint(ctx, pos); err != nil {
return err
}
}
return nil
case BestAssToSRT:
errs := []string{}
for _, pos := range flags.Pos {
if err := inass.BestAssToSRT(ctx, pos); err != nil {
err = fmt.Errorf("[%s] %w", pos, err)
log.Println(err)
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf("errors: %+v", errs)
}
return nil
case Transcode:
for _, pos := range flags.Pos {
log.Printf("transcoding %q...", pos)
if err := inass.EntrypointTranscode(ctx, pos); err != nil {
return err
}
}
return nil
default:
panic(flags.Entrypoint.String())
}
return nil
}
type Entrypoint int
const (
Defacto Entrypoint = iota
DeportAss
BestAssToSRT
Transcode
)
func (e *Entrypoint) Set(s string) error {
switch s {
case Defacto.String():
*e = Defacto
case DeportAss.String():
*e = DeportAss
case BestAssToSRT.String():
*e = BestAssToSRT
case Transcode.String():
*e = Transcode
default:
return fmt.Errorf("%s nin (%s)", s, strings.Join([]string{
Defacto.String(),
DeportAss.String(),
BestAssToSRT.String(),
Transcode.String(),
}, ", "))
}
return nil
}
func (e Entrypoint) String() string {
switch e {
case Defacto:
return ""
case DeportAss:
return "deport-ass"
case BestAssToSRT:
return "best-ass-to-srt"
case Transcode:
return "transcode"
}
panic("cannot serialize entrypoint")
} }
func runner(ctx context.Context, k string, foo func(context.Context) error) func() error { func runner(ctx context.Context, k string, foo func(context.Context) error) func() error {
@@ -52,6 +132,5 @@ func runner(ctx context.Context, k string, foo func(context.Context) error) func
case <-time.After(time.Second): case <-time.After(time.Second):
} }
} }
return err
} }
} }

View File

@@ -10,12 +10,25 @@ import (
func (h Handler) feeds(w http.ResponseWriter, r *http.Request) error { func (h Handler) feeds(w http.ResponseWriter, r *http.Request) error {
switch r.Method { switch r.Method {
case http.MethodDelete:
if err := r.ParseForm(); err != nil {
return err
}
if err := h.feedsDelete(r.Context(), r.URL.Query().Get("id")); err != nil {
return err
}
case http.MethodPost, http.MethodPut: case http.MethodPost, http.MethodPut:
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return err return err
} }
if err := h.feedsUpsert(r.Context(), r.URL.Query().Get("id"), r.Form); err != nil { if r.URL.Query().Has("delete") {
return err if err := h.feedsDelete(r.Context(), r.URL.Query().Get("id")); err != nil {
return err
}
} else {
if err := h.feedsUpsert(r.Context(), r.URL.Query().Get("id"), r.Form); err != nil {
return err
}
} }
default: default:
http.NotFound(w, r) http.NotFound(w, r)
@@ -29,6 +42,10 @@ func (h Handler) feeds(w http.ResponseWriter, r *http.Request) error {
return nil return nil
} }
func (h Handler) feedsDelete(ctx context.Context, id string) error {
return feeds.Delete(ctx, id)
}
func (h Handler) feedsUpsert(ctx context.Context, id string, form url.Values) error { func (h Handler) feedsUpsert(ctx context.Context, id string, form url.Values) error {
var req feeds.Version var req feeds.Version
for k, v := range map[string]*string{ for k, v := range map[string]*string{

View File

@@ -23,7 +23,13 @@
{{ if eq "" .editing.ID }} {{ if eq "" .editing.ID }}
New New
{{ else }} {{ else }}
Update <code><a target="_blank" href="{{ .editing_url }}">{{ .editing.URL }}</a></code> (<a href="?">clear</a>) Updating <code><a target="_blank" href="{{ .editing_url }}">{{ .editing.URL }}</a></code> (<a href="?">clear</a>)
<br>
<div style="scale: 0.85">
<form method="POST" action="/v1/feeds?id={{ .editing.ID }}&delete">
<button type="submit">DELETE</button>
</form>
</div>
{{ end }} {{ end }}
</h3> </h3>
<form method="POST" action="/v1/feeds?id={{ .editing.ID }}"> <form method="POST" action="/v1/feeds?id={{ .editing.ID }}">

View File

@@ -17,17 +17,17 @@ import (
_ "embed" _ "embed"
) )
//go:embed testdata/index.tmpl //go:embed public/index.tmpl
var embeddedIndexTMPL string var embeddedIndexTMPL string
//go:embed testdata/* //go:embed public/*
var embeddedDir embed.FS var embeddedDir embed.FS
var dir = func() string { var dir = func() string {
if v := os.Getenv("UI_D"); v != "" { if v := os.Getenv("UI_D"); v != "" {
return v return v
} }
return "./src/cmd/server/handler/testdata" return "./src/cmd/server/handler/public"
}() }()
func (h Handler) ui(w http.ResponseWriter, r *http.Request) error { func (h Handler) ui(w http.ResponseWriter, r *http.Request) error {
@@ -37,7 +37,7 @@ func (h Handler) ui(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Cache-Control", "max-age=2592000") w.Header().Set("Cache-Control", "max-age=2592000")
fs := http.FileServer(http.FS(embeddedDir)) fs := http.FileServer(http.FS(embeddedDir))
r.URL.Path = fmt.Sprintf("/testdata/%s", strings.TrimPrefix(r.URL.Path, "/experimental/ui")) r.URL.Path = fmt.Sprintf("/public/%s", strings.TrimPrefix(r.URL.Path, "/experimental/ui"))
fs.ServeHTTP(w, r) fs.ServeHTTP(w, r)
return nil return nil
} }
@@ -57,7 +57,9 @@ func (h Handler) uiIndex(w http.ResponseWriter, r *http.Request) error {
editing.Version.WebhookURL = "vpntor:///data/completed-rss/TITLE" editing.Version.WebhookURL = "vpntor:///data/completed-rss/TITLE"
all := []feeds.Feed{} all := []feeds.Feed{}
if err := feeds.ForEach(ctx, func(f feeds.Feed) error { if err := feeds.ForEach(ctx, func(f feeds.Feed) error {
all = append(all, f) if deleted := f.Version.URL == ""; !deleted {
all = append(all, f)
}
if f.Entry.ID == r.URL.Query().Get("edit") { if f.Entry.ID == r.URL.Query().Get("edit") {
editing.Entry = f.Entry editing.Entry = f.Entry
editing.Version = f.Version editing.Version = f.Version

View File

@@ -72,18 +72,27 @@ func ForEach(ctx context.Context, cb func(Feed) error) error {
return err return err
} }
var errs []string
for _, id := range ids { for _, id := range ids {
feed, err := Get(ctx, id.ID) feed, err := Get(ctx, id.ID)
if err != nil { if err != nil {
return err return err
} else if err := cb(feed); err != nil { } else if err := cb(feed); err != nil {
return err errs = append(errs, fmt.Sprintf(`failed to fetch %s: %v`, id.ID, err))
} }
} }
if len(errs) > 0 {
return fmt.Errorf("failed some callbacks: %+v", errs)
}
return nil return nil
} }
func Delete(ctx context.Context, id string) error {
return Update(ctx, id, "", "", "", "", "", "")
}
func Get(ctx context.Context, id string) (Feed, error) { func Get(ctx context.Context, id string) (Feed, error) {
if err := initDB(ctx); err != nil { if err := initDB(ctx); err != nil {
return Feed{}, err return Feed{}, err

View File

@@ -28,7 +28,7 @@ var (
) )
func (feed Feed) ShouldExecute() (bool, error) { func (feed Feed) ShouldExecute() (bool, error) {
if !feed.Entry.Deleted.IsZero() { if !feed.Entry.Deleted.IsZero() || feed.Version.URL == "" {
return false, nil return false, nil
} }
next, err := feed.Next() next, err := feed.Next()

View File

@@ -9,30 +9,36 @@ import (
type Reader struct { type Reader struct {
ctx context.Context ctx context.Context
limiter *rate.Limiter limiter rate.Limiter
r io.Reader r io.Reader
} }
var _ io.Reader = Reader{} var _ io.Reader = &Reader{}
func NewReader(ctx context.Context, bps rate.Limit, r io.Reader) Reader { func NewReader(ctx context.Context, bps rate.Limit, r io.Reader) *Reader {
return Reader{ return &Reader{
ctx: ctx, ctx: ctx,
limiter: rate.NewLimiter(bps, 8192), limiter: *rate.NewLimiter(bps, int(bps)),
r: r, r: r,
} }
} }
func (r Reader) Read(b []byte) (int, error) { func (r *Reader) Read(b []byte) (int, error) {
n, err := r.r.Read(b) n, err := r.r.Read(b)
m := 0 m := 0
burst := r.limiter.Burst() burst := r.limiter.Burst()
for m < n { for m < n {
if err := r.limiter.WaitN(r.ctx, burst); err != nil { page := burst
if left := n - m; page > left {
page = left
}
if err := r.limiter.WaitN(r.ctx, page); err != nil {
return n, err return n, err
} }
m += burst
m += page
} }
return n, err return n, err