Compare commits
3 Commits
0628a678d8
...
5ed296a3d2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ed296a3d2 | ||
|
|
83026a67d4 | ||
|
|
57e77e5d4e |
@@ -65,7 +65,7 @@ func TestOne(t *testing.T) {
|
||||
ctx := db.Test(t, ctx)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
if _, err := feeds.Insert(ctx, fmt.Sprintf("%s?idx=%d", sURL, i), "* * * * *", "matches", "tag"); err != nil {
|
||||
if _, err := feeds.Insert(ctx, fmt.Sprintf("%s?idx=%d", sURL, i), "* * * * *", "matches", http.MethodHead, fmt.Sprintf("%s?idx=1%d", sURL, i), "{{.Title}}"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
40
src/db/schema.go
Normal file
40
src/db/schema.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func InitializeSchema(ctx context.Context, k string, mods []string) error {
|
||||
if err := Exec(ctx, fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "database_version.%s" (v NUMBER, t TIMESTAMP)`, k)); err != nil {
|
||||
return fmt.Errorf(`failed to create "database_version.%s" table: %w`, k, err)
|
||||
}
|
||||
|
||||
type DatabaseVersion struct {
|
||||
V int `json:"v"`
|
||||
T time.Time `json:"t"`
|
||||
}
|
||||
vs, err := Query[DatabaseVersion](ctx, fmt.Sprintf(`SELECT v, t FROM "database_version.%s" ORDER BY v DESC LIMIT 1`, k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var v DatabaseVersion
|
||||
if len(vs) > 0 {
|
||||
v = vs[0]
|
||||
}
|
||||
|
||||
mods = append([]string{""}, mods...)
|
||||
for i := v.V + 1; i < len(mods); i++ {
|
||||
q := mods[i]
|
||||
q = strings.TrimSpace(q)
|
||||
q = strings.TrimSuffix(q, ";")
|
||||
q = fmt.Sprintf(`BEGIN; %s; INSERT INTO "database_version.%s" (v, t) VALUES (?, ?); COMMIT;`, q, k)
|
||||
if err := Exec(ctx, q, i, time.Now()); err != nil {
|
||||
return fmt.Errorf("[%s][%d] failed mod %s: %w", k, i, mods[i], err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
106
src/feeds/db.go
106
src/feeds/db.go
@@ -2,10 +2,8 @@ package feeds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"show-rss/src/db"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -30,7 +28,9 @@ type (
|
||||
URL string
|
||||
Cron string
|
||||
Pattern string
|
||||
Tag string
|
||||
WebhookMethod string
|
||||
WebhookURL string
|
||||
WebhookBody string
|
||||
}
|
||||
|
||||
Execution struct {
|
||||
@@ -98,7 +98,9 @@ func Get(ctx context.Context, id string) (Feed, error) {
|
||||
versions.url AS "Version.URL",
|
||||
versions.cron AS "Version.Cron",
|
||||
versions.pattern AS "Version.Pattern",
|
||||
versions.tag AS "Version.Tag",
|
||||
versions.webhook_method AS "Version.WebhookMethod",
|
||||
versions.webhook_url AS "Version.WebhookURL",
|
||||
versions.webhook_body AS "Version.WebhookBody",
|
||||
(
|
||||
SELECT executed_at
|
||||
FROM "feed.executions"
|
||||
@@ -121,55 +123,6 @@ func Get(ctx context.Context, id string) (Feed, error) {
|
||||
`, id, id)
|
||||
}
|
||||
|
||||
func oldGet(ctx context.Context, id string) (Feed, error) {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return Feed{}, err
|
||||
}
|
||||
|
||||
entry, err := getEntry(ctx, id)
|
||||
if err != nil {
|
||||
return Feed{}, err
|
||||
}
|
||||
|
||||
version, err := db.QueryOne[Version](ctx, `
|
||||
SELECT
|
||||
"feed.current_versions".versions_created_at AS Created,
|
||||
"feed.current_versions".url AS URL,
|
||||
"feed.current_versions".cron AS Cron,
|
||||
"feed.current_versions".pattern AS Pattern,
|
||||
"feed.current_versions".tag AS Tag
|
||||
FROM
|
||||
"feed.current_versions"
|
||||
JOIN
|
||||
"feed.versions" versions_a ON
|
||||
"feed.current_versions".entries_id=versions_a.entries_id
|
||||
JOIN
|
||||
"feed.versions" versions_b ON
|
||||
"feed.current_versions".versions_created_at=versions_b.created_at
|
||||
WHERE
|
||||
"feed.current_versions".entries_id = ?
|
||||
`, id)
|
||||
if err != nil {
|
||||
return Feed{}, err
|
||||
}
|
||||
|
||||
execution, err := db.QueryOne[Execution](ctx, `
|
||||
SELECT
|
||||
"feed.executed_at" AS Executed,
|
||||
"feed.versions_created_at" AS VersionsCreated
|
||||
FROM
|
||||
"feed.executions"
|
||||
WHERE
|
||||
"feed.executions".entries_id = ?
|
||||
ORDER BY "feed.executions".executed_at DESC
|
||||
`, id)
|
||||
if err != nil {
|
||||
return Feed{}, err
|
||||
}
|
||||
|
||||
return Feed{}, fmt.Errorf("%+v, %+v, %+v", entry, version, execution)
|
||||
}
|
||||
|
||||
func (feed Feed) Executed(ctx context.Context) error {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return err
|
||||
@@ -190,7 +143,7 @@ func (feed Feed) Executed(ctx context.Context) error {
|
||||
)
|
||||
}
|
||||
|
||||
func Insert(ctx context.Context, url, cron, pattern, tag string) (string, error) {
|
||||
func Insert(ctx context.Context, url, cron, pattern, webhookMethod, webhookURL, webhookBody string) (string, error) {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -210,12 +163,14 @@ func Insert(ctx context.Context, url, cron, pattern, tag string) (string, error)
|
||||
url,
|
||||
cron,
|
||||
pattern,
|
||||
tag
|
||||
) VALUES ($4, $5, $6, $7, $8, $9);
|
||||
webhook_method,
|
||||
webhook_url,
|
||||
webhook_body
|
||||
) VALUES ($4, $5, $6, $7, $8, $9, $10, $11);
|
||||
COMMIT;
|
||||
`,
|
||||
id, now, now,
|
||||
id, now, url, cron, pattern, tag,
|
||||
id, now, url, cron, pattern, webhookMethod, webhookURL, webhookBody,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -246,24 +201,7 @@ func getEntry(ctx context.Context, id string) (Entry, error) {
|
||||
}
|
||||
|
||||
func initDB(ctx context.Context) error {
|
||||
if err := db.Exec(ctx, `CREATE TABLE IF NOT EXISTS database_version (v NUMBER, t TIMESTAMP)`); err != nil {
|
||||
return fmt.Errorf("failed to create database_version table: %w", err)
|
||||
}
|
||||
|
||||
type DatabaseVersion struct {
|
||||
V int `json:"v"`
|
||||
T time.Time `json:"t"`
|
||||
}
|
||||
vs, err := db.Query[DatabaseVersion](ctx, `SELECT v, t FROM database_version ORDER BY v DESC LIMIT 1`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var v DatabaseVersion
|
||||
if len(vs) > 0 {
|
||||
v = vs[0]
|
||||
}
|
||||
|
||||
mods := []string{
|
||||
return db.InitializeSchema(ctx, "feeds", []string{
|
||||
`CREATE TABLE "feed.entries" (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
@@ -280,7 +218,9 @@ func initDB(ctx context.Context) error {
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN url TEXT NOT NULL`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN cron TEXT NOT NULL DEFAULT '0 0 * * *'`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN pattern TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN tag TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN webhook_method TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN webhook_url TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "feed.versions" ADD COLUMN webhook_body TEXT NOT NULL DEFAULT ''`,
|
||||
|
||||
`CREATE TABLE "feed.executions" (
|
||||
entries_id TEXT,
|
||||
@@ -288,17 +228,5 @@ func initDB(ctx context.Context) error {
|
||||
executed_at TIMESTAMP,
|
||||
FOREIGN KEY (entries_id, versions_created_at) REFERENCES "feed.versions" (entries_id, created_at)
|
||||
)`,
|
||||
}
|
||||
mods = append([]string{""}, mods...)
|
||||
for i := v.V + 1; i < len(mods); i++ {
|
||||
q := mods[i]
|
||||
q = strings.TrimSpace(q)
|
||||
q = strings.TrimSuffix(q, ";")
|
||||
q = fmt.Sprintf("BEGIN; %s; INSERT INTO database_version (v, t) VALUES (?, ?); COMMIT;", q)
|
||||
if err := db.Exec(ctx, q, i, time.Now()); err != nil {
|
||||
return fmt.Errorf("[%d] failed mod %s: %w", i, mods[i], err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ func TestFeeds(t *testing.T) {
|
||||
t.Run("crud", func(t *testing.T) {
|
||||
ctx := db.Test(t, ctx)
|
||||
|
||||
id, err := feeds.Insert(ctx, "url", "cron", "pattern", "tag")
|
||||
id, err := feeds.Insert(ctx, "url", "cron", "pattern", "wmethod", "wurl", "wbody")
|
||||
if err != nil {
|
||||
t.Fatal("cannot insert:", err)
|
||||
}
|
||||
@@ -51,8 +51,14 @@ func TestFeeds(t *testing.T) {
|
||||
if got.Version.Pattern != "pattern" {
|
||||
t.Error("bad version.pattern")
|
||||
}
|
||||
if got.Version.Tag != "tag" {
|
||||
t.Error("bad version.tag")
|
||||
if got.Version.WebhookMethod != "wmethod" {
|
||||
t.Error("bad version.webhookMethod")
|
||||
}
|
||||
if got.Version.WebhookURL != "wurl" {
|
||||
t.Error("bad version.webhookURL")
|
||||
}
|
||||
if got.Version.WebhookBody != "wbody" {
|
||||
t.Error("bad version.webhookBody")
|
||||
}
|
||||
|
||||
if !got.Execution.Executed.IsZero() {
|
||||
|
||||
@@ -100,7 +100,6 @@ func (feed Feed) Fetch(ctx context.Context) (Items, error) {
|
||||
Links: links,
|
||||
Preview: preview,
|
||||
Body: body,
|
||||
Tag: feed.Version.Tag,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -7,5 +7,4 @@ type Item struct {
|
||||
Links []string
|
||||
Preview string
|
||||
Body string
|
||||
Tag string
|
||||
}
|
||||
|
||||
55
src/webhooks/db.go
Normal file
55
src/webhooks/db.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package webhooks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"show-rss/src/db"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func Did(ctx context.Context, method, url, body string) (bool, error) {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
type Did struct {
|
||||
Did int
|
||||
}
|
||||
result, err := db.QueryOne[Did](ctx, `
|
||||
SELECT 1 AS "Did" FROM "webhook.executions" WHERE method=$1 AND url=$2 AND body=$3
|
||||
`, method, url, body)
|
||||
return result.Did > 0 && err == nil, err
|
||||
}
|
||||
|
||||
func Record(ctx context.Context, method, url, body string) error {
|
||||
if err := initDB(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
id := uuid.New().String()
|
||||
return db.Exec(ctx, `
|
||||
INSERT INTO "webhook.executions" (
|
||||
id,
|
||||
executed_at,
|
||||
method,
|
||||
url,
|
||||
body
|
||||
) VALUES ($1, $2, $3, $4, $5)
|
||||
`,
|
||||
id, now, method, url, body,
|
||||
)
|
||||
}
|
||||
|
||||
func initDB(ctx context.Context) error {
|
||||
return db.InitializeSchema(ctx, "webhooks", []string{
|
||||
`CREATE TABLE "webhook.executions" (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
executed_at TIMESTAMP NOT NULL
|
||||
)`,
|
||||
`ALTER TABLE "webhook.executions" ADD COLUMN method TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "webhook.executions" ADD COLUMN url TEXT NOT NULL DEFAULT ''`,
|
||||
`ALTER TABLE "webhook.executions" ADD COLUMN body TEXT NOT NULL DEFAULT ''`,
|
||||
})
|
||||
}
|
||||
34
src/webhooks/db_test.go
Normal file
34
src/webhooks/db_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package webhooks_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"show-rss/src/db"
|
||||
"show-rss/src/webhooks"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWebhooks(t *testing.T) {
|
||||
ctx := db.Test(t, context.Background())
|
||||
|
||||
if did, err := webhooks.Did(ctx, "m", "u", "b"); err != nil {
|
||||
t.Errorf("cannot Did() empty: %v", err)
|
||||
} else if did {
|
||||
t.Errorf("wrong Did() empty: %v", did)
|
||||
}
|
||||
|
||||
if err := webhooks.Record(ctx, "m", "u", "b"); err != nil {
|
||||
t.Errorf("cannot Record() empty: %v", err)
|
||||
}
|
||||
|
||||
if did, err := webhooks.Did(ctx, "m", "u", "b"); err != nil {
|
||||
t.Errorf("cannot Did() one: %v", err)
|
||||
} else if !did {
|
||||
t.Errorf("wrong Did() one: %v", did)
|
||||
}
|
||||
|
||||
if did, err := webhooks.Did(ctx, "m2", "u", "b"); err != nil {
|
||||
t.Errorf("cannot Did() wrong one: %v", err)
|
||||
} else if did {
|
||||
t.Errorf("wrong Did() wrong one: %v", did)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user