create torrent handler

Former-commit-id: dc7a0cbdbbf2ddc985f5581e740ba59a9c18aff0
master
Bel LaPointe 2019-06-25 09:49:50 -06:00
parent 8d353b69f4
commit 8f3d0c2077
6 changed files with 480 additions and 33 deletions

View File

@ -41,12 +41,5 @@ func (h *Handler) Job(key string) error {
if err := f.Pull(); err != nil {
return err
}
for _, tag := range f.Tags {
if foo := ByTag(tag); foo != nil {
if err := foo(key); err != nil {
return err
}
}
}
return nil
}

View File

@ -1,12 +0,0 @@
package handlers
func ByTag(tag string) func(string) error {
var foo func(string) error
switch tag {
case "torrent":
foo = torrent
case "podcast":
foo = podcast
}
return foo
}

View File

@ -1,7 +0,0 @@
package handlers
import "errors"
func podcast(key string) error {
return errors.New("not impl")
}

View File

@ -1,7 +0,0 @@
package handlers
import "errors"
func torrent(key string) error {
return errors.New("not impl")
}

230
handlers/torrent/main.go Normal file
View File

@ -0,0 +1,230 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"local/args"
"local/storage"
"log"
"net/http"
"regexp"
"strings"
"time"
"github.com/mmcdole/gofeed"
)
const sessionHeader = "X-Transmission-Session-Id"
type Config struct {
url string
vpntor string
outdir string
interval time.Duration
db storage.DB
ctx context.Context
can context.CancelFunc
}
func main() {
config, err := config()
if err != nil {
panic(err)
}
for {
if err := mainLoop(config); err != nil {
panic(err)
}
}
}
func mainLoop(config Config) error {
select {
case <-time.After(config.interval):
if err := pull(config.db, config.vpntor, config.outdir, config.url); err != nil {
log.Println(err)
}
case <-config.ctx.Done():
if err := config.ctx.Err(); err != nil {
return err
}
}
return nil
}
func config() (Config, error) {
as := args.NewArgSet()
as.Append(args.STRING, "url", "url of rss feed", "http://192.168.0.86:33419/api/tag/torrent")
as.Append(args.STRING, "vpntor", "url of vpntor", "http://192.168.0.86:9091/transmission/rpc")
as.Append(args.DURATION, "interval", "interval to check feed", "30m")
as.Append(args.STRING, "outdir", "save dir", "/data/completed-rss")
as.Append(args.STRING, "db", "db type", "map")
as.Append(args.STRING, "addr", "db addr", "")
as.Append(args.STRING, "user", "db user", "")
as.Append(args.STRING, "pass", "db pass", "")
if err := as.Parse(); err != nil {
return Config{}, err
}
db, err := storage.New(
storage.TypeFromString(as.Get("db").GetString()),
as.Get("addr").GetString(),
as.Get("user").GetString(),
as.Get("pass").GetString(),
)
if err != nil {
panic(err)
}
ctx, can := context.WithCancel(context.Background())
return Config{
url: as.Get("url").GetString(),
vpntor: as.Get("vpntor").GetString(),
interval: as.Get("interval").GetDuration(),
outdir: as.Get("outdir").GetString(),
db: db,
ctx: ctx,
can: can,
}, nil
}
func pull(db storage.DB, vpntor, outdir, url string) error {
gofeed, err := getGoFeed(url)
if err != nil {
return err
}
for _, item := range gofeed.Items {
if ok, err := isDone(db, item.Link); err != nil {
return err
} else if ok {
continue
}
s, err := getItemContent(item)
if err != nil {
return err
}
if err := handle(vpntor, outdir, s); err != nil {
return err
}
if err := db.Set(item.Link, []byte{}); err != nil {
return err
}
}
return nil
}
func getGoFeed(url string) (*gofeed.Feed, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return gofeed.NewParser().Parse(resp.Body)
}
func getItemContent(item *gofeed.Item) (string, error) {
s := item.Description
if s == "" {
s = item.Content
}
if s == "" {
resp, err := http.Get(item.Link)
if err != nil {
return s, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return s, err
}
s = string(b)
}
return s, nil
}
func isDone(db storage.DB, url string) (bool, error) {
_, err := db.Get(url)
if err == storage.ErrNotFound {
return false, nil
}
return true, err
}
func handle(vpntor, outdir, content string) error {
for _, magnet := range findMagnets(content) {
resp, err := submit(vpntor, outdir, magnet)
if err != nil {
return err
}
if err := succeeded(resp.Body); err != nil {
return err
}
}
return nil
}
func findMagnets(s string) []string {
magnetRegexp := regexp.MustCompile(`magnet:.xt[^ $"]*`)
return magnetRegexp.FindAllString(s, -1)
}
func submit(vpntor, outdir, magnet string) (*http.Response, error) {
session, err := getSessionID(vpntor)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", vpntor, buildReqBody(outdir, magnet))
if err != nil {
return nil, err
}
req.Header.Add(sessionHeader, session)
return (&http.Client{}).Do(req)
}
func succeeded(body io.ReadCloser) error {
defer body.Close()
b, err := ioutil.ReadAll(body)
if err != nil {
return err
}
var result struct {
Result string `json:"result"`
}
if err := json.Unmarshal(b, &result); err != nil {
return err
}
if result.Result != "success" {
return fmt.Errorf("denied: %s", b)
}
return nil
}
func buildReqBody(outdir, magnet string) io.Reader {
return strings.NewReader(fmt.Sprintf(`
{
"method": "torrent-add",
"arguments": {
"filename": %q,
"download-dir": %q
}
}
`, magnet, outdir))
}
func getSessionID(vpntor string) (string, error) {
resp, err := http.Get(vpntor)
if err != nil {
return "", err
}
defer resp.Body.Close()
id := resp.Header.Get(sessionHeader)
if id == "" {
err = errors.New("session id header not found")
}
return id, err
}

View File

@ -0,0 +1,250 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"local/storage"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
"github.com/mmcdole/gofeed"
)
type stringReaderCloser struct {
io.Reader
}
func mockReadClose(s string) io.ReadCloser {
reader := strings.NewReader(s)
return stringReaderCloser{Reader: reader}
}
func (src stringReaderCloser) Close() error {
return nil
}
func fakeRSSServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`
<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<item>
<title>Item Title</title>
<link>https://roosterteeth.com/episode/rooster-teeth-podcast-2019-549</link>
<pubDate>Tue, 18 Jun 2019 19:00:00 +0000</pubDate>
<description>Gavin Free discuss raditation, toilet paper face, Chris's continued haircuts, and more on this week's RT Podcast! magnet:-xt1 magnet:-xt2 <a href="magnet:-xt3">link</a></description>
<enclosure url="http://www.podtrac.com/pts/redirect.mp3/traffic.libsyn.com/roosterteethpodcast/Rooster_Teeth_Podcast_549.mp3" type="audio/mpeg" />
</item>
</channel>
</rss>
`))
}))
}
func TestMainLoopCtx(t *testing.T) {
ctx, can := context.WithCancel(context.Background())
can()
c := Config{
interval: time.Hour,
ctx: ctx,
}
if err := mainLoop(c); err == nil || !strings.Contains(err.Error(), "cancel") {
t.Fatal(err)
}
}
func TestConfig(t *testing.T) {
was := os.Args[:]
defer func() {
os.Args = was
}()
os.Args = []string{"a"}
if _, err := config(); err != nil {
t.Fatal(err)
}
os.Args = []string{"a", "-interval", "not a duration"}
stderr := os.Stderr
f, _ := os.Open("/dev/null")
os.Stderr = f
defer func() {
os.Stderr = stderr
}()
if _, err := config(); err == nil {
t.Fatal(err)
}
}
func TestGetGoFeed(t *testing.T) {
s := fakeRSSServer()
defer s.Close()
f, err := getGoFeed(s.URL)
if err != nil {
t.Fatal(err)
}
if len(f.Items) != 1 {
t.Fatal(len(f.Items))
}
}
func TestGetItemContent(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`Hello`))
}))
defer s.Close()
cases := []struct {
item gofeed.Item
body string
err error
}{
{
item: gofeed.Item{
Description: "hi",
Content: "hi2",
},
body: "hi",
err: nil,
},
{
item: gofeed.Item{
Content: "hi2",
},
body: "hi2",
err: nil,
},
{
item: gofeed.Item{
Link: s.URL,
},
body: "Hello",
err: nil,
},
}
for i, c := range cases {
body, err := getItemContent(&c.item)
cerrS := fmt.Sprintf("%v", c.err)
errS := fmt.Sprintf("%v", err)
if cerrS != errS {
t.Errorf("[%d] unexpected err %v, want %v", i, err, c.err)
}
if body != c.body {
t.Errorf("[%d] unexpected body %v, want %v", i, body, c.body)
}
}
}
func TestIsDone(t *testing.T) {
db, _ := storage.New(storage.MAP)
db.Set("a", []byte("hi"))
if ok, err := isDone(db, "a"); err != nil {
t.Fatal(err)
} else if !ok {
t.Fatal(ok)
}
if ok, err := isDone(db, "b"); err != nil {
t.Fatal(err)
} else if ok {
t.Fatal(ok)
}
}
func TestGetSessionID(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add(sessionHeader, "id")
w.Write([]byte(`Hello`))
}))
defer s.Close()
session, err := getSessionID(s.URL)
if err != nil {
t.Fatal(err)
}
if session != "id" {
t.Fatal(session)
}
}
func TestBuildReqBody(t *testing.T) {
var want struct {
Method string `json:"method"`
Arguments struct {
Filename string `json:"filename"`
DownloadDir string `json:"download-dir"`
} `json:"arguments"`
}
b := buildReqBody("out", "mag")
if err := json.NewDecoder(b).Decode(&want); err != nil {
t.Fatal(err)
}
if want.Method != "torrent-add" {
t.Fatal(want.Method)
}
if want.Arguments.Filename != "mag" {
t.Fatal(want.Arguments.Filename)
}
if want.Arguments.DownloadDir != "out" {
t.Fatal(want.Arguments.DownloadDir)
}
}
func TestSucceeded(t *testing.T) {
cases := []struct {
s string
err error
}{
{
s: `{"result":"success"}`,
err: nil,
},
{
s: `this isnt json`,
err: errors.New("invalid character 'h' in literal true (expecting 'r')"),
},
{
s: `{"result":"failure"}`,
err: errors.New(`denied: {"result":"failure"}`),
},
}
for i, c := range cases {
err := succeeded(mockReadClose(c.s))
cerrS := fmt.Sprintf("%v", c.err)
errS := fmt.Sprintf("%v", err)
if cerrS != errS {
t.Errorf("[%d] unexpected err %v, want %v", i, err, c.err)
}
}
}
func TestFindMagnets(t *testing.T) {
cases := []struct {
s string
l int
}{
{
s: `here is some magnet:-xt1 and magnet:-xt2 another one <a href="magnet:-xt3">link</a>`,
l: 3,
},
}
for i, c := range cases {
out := findMagnets(c.s)
if len(out) != c.l {
t.Errorf("[%d] found %v magnets, want %v", i, len(out), c.l)
}
}
}