Compare commits
10 Commits
02331752fe
...
d5b84da8f5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5b84da8f5 | ||
|
|
b6526b8360 | ||
|
|
c0977a6b7a | ||
|
|
feb48ee4bc | ||
|
|
88f6746c85 | ||
|
|
5817dce70e | ||
|
|
04c7a5c9e1 | ||
|
|
83c8fccb78 | ||
|
|
cb44dfd49d | ||
|
|
6ca1f83727 |
20
config.go
20
config.go
@@ -13,15 +13,17 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
Debug bool
|
Debug bool
|
||||||
InitializeSlack bool
|
InitializeSlack bool
|
||||||
SlackToken string
|
SlackToken string
|
||||||
SlackChannels string
|
SlackChannels string
|
||||||
PostgresConn string
|
PostgresConn string
|
||||||
storage Storage
|
BasicAuthUser string
|
||||||
queue Queue
|
BasicAuthPassword string
|
||||||
driver Driver
|
storage Storage
|
||||||
|
queue Queue
|
||||||
|
driver Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConfig(ctx context.Context) (Config, error) {
|
func newConfig(ctx context.Context) (Config, error) {
|
||||||
|
|||||||
31
driver.go
31
driver.go
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -22,6 +23,33 @@ type Driver interface {
|
|||||||
Set(context.Context, string, string, []byte) error
|
Set(context.Context, string, string, []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func FillWithTestdata(ctx context.Context, driver Driver) error {
|
||||||
|
d := "./testdata/slack_events"
|
||||||
|
entries, err := os.ReadDir(d)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
if entry.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
b, err := os.ReadFile(path.Join(d, entry.Name()))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m, err := ParseSlack(b)
|
||||||
|
if errors.Is(err, ErrIrrelevantMessage) {
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := driver.Set(nil, "m", m.ID, m.Serialize()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type Postgres struct {
|
type Postgres struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
@@ -199,6 +227,9 @@ func (ram RAM) Set(_ context.Context, ns, id string, v []byte) error {
|
|||||||
ram.m[ns] = map[string][]byte{}
|
ram.m[ns] = map[string][]byte{}
|
||||||
}
|
}
|
||||||
ram.m[ns][id] = v
|
ram.m[ns][id] = v
|
||||||
|
if v == nil {
|
||||||
|
delete(ram.m[ns], id)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,24 @@ func TestDriverRAM(t *testing.T) {
|
|||||||
testDriver(t, NewRAM())
|
testDriver(t, NewRAM())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFillTestdata(t *testing.T) {
|
||||||
|
ctx, can := context.WithTimeout(context.Background(), time.Second*15)
|
||||||
|
defer can()
|
||||||
|
|
||||||
|
ram := NewRAM()
|
||||||
|
if err := FillWithTestdata(ctx, ram); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
n := 0
|
||||||
|
if err := ram.ForEach(context.Background(), "m", func(_ string, _ []byte) error {
|
||||||
|
n += 1
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Log(n)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDriverBBolt(t *testing.T) {
|
func TestDriverBBolt(t *testing.T) {
|
||||||
testDriver(t, NewTestDBIn(t.TempDir()))
|
testDriver(t, NewTestDBIn(t.TempDir()))
|
||||||
}
|
}
|
||||||
@@ -76,6 +94,12 @@ func testDriver(t *testing.T, d Driver) {
|
|||||||
t.Error("cannot set from full:", err)
|
t.Error("cannot set from full:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := d.ForEach(ctx, "m", func(string, []byte) error {
|
||||||
|
return errors.New("should have no hits")
|
||||||
|
}); err != nil {
|
||||||
|
t.Error("failed to forEach empty:", err)
|
||||||
|
}
|
||||||
|
|
||||||
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
if b, err := d.Get(ctx, "m", "id"); err != nil {
|
||||||
t.Error("cannot get from deleted:", err)
|
t.Error("cannot get from deleted:", err)
|
||||||
} else if b != nil {
|
} else if b != nil {
|
||||||
|
|||||||
20
main.go
20
main.go
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -51,6 +52,7 @@ func listenAndServe(ctx context.Context, cfg Config) chan error {
|
|||||||
errc := make(chan error)
|
errc := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(errc)
|
defer close(errc)
|
||||||
|
log.Printf("listening on %s", s.Addr)
|
||||||
errc <- s.ListenAndServe()
|
errc <- s.ListenAndServe()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -61,6 +63,7 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
mux.Handle("POST /api/v1/events/slack", http.HandlerFunc(newHandlerPostAPIV1EventsSlack(cfg)))
|
||||||
|
mux.Handle("GET /api/v1/messages", http.HandlerFunc(newHandlerGetAPIV1Message(cfg)))
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
@@ -73,6 +76,17 @@ func newHandler(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newHandlerGetAPIV1Message(cfg Config) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if u, p, _ := r.BasicAuth(); u != cfg.BasicAuthUser || p != cfg.BasicAuthPassword {
|
||||||
|
http.Error(w, "shoo", http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
http.Error(w, "not impl", http.StatusNotImplemented)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
func newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
||||||
if cfg.InitializeSlack {
|
if cfg.InitializeSlack {
|
||||||
return handlerPostAPIV1EventsSlackInitialize
|
return handlerPostAPIV1EventsSlackInitialize
|
||||||
@@ -117,14 +131,18 @@ func _newHandlerPostAPIV1EventsSlack(cfg Config) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m, err := ParseSlack(b)
|
m, err := ParseSlack(b)
|
||||||
if err != nil {
|
if errors.Is(err, ErrIrrelevantMessage) {
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
if err := cfg.storage.Upsert(r.Context(), m); err != nil {
|
||||||
|
log.Printf("failed to ingest %+v: %v", m, err)
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.Printf("ingested %+v", m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
34
message.go
34
message.go
@@ -5,6 +5,11 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrIrrelevantMessage = errors.New("message isnt relevant to spoc bot vr")
|
||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
@@ -17,12 +22,17 @@ type Message struct {
|
|||||||
Event string
|
Event string
|
||||||
Plaintext string
|
Plaintext string
|
||||||
Asset string
|
Asset string
|
||||||
|
Resolved bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m Message) Empty() bool {
|
func (m Message) Empty() bool {
|
||||||
return m == (Message{})
|
return m == (Message{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m Message) Time() time.Time {
|
||||||
|
return time.Unix(int64(m.TS), 0)
|
||||||
|
}
|
||||||
|
|
||||||
func (m Message) Serialize() []byte {
|
func (m Message) Serialize() []byte {
|
||||||
b, err := json.Marshal(m)
|
b, err := json.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -54,6 +64,9 @@ type (
|
|||||||
slackEvent struct {
|
slackEvent struct {
|
||||||
ID string `json:"event_ts"`
|
ID string `json:"event_ts"`
|
||||||
Channel string
|
Channel string
|
||||||
|
// rewrites
|
||||||
|
Nested *slackEvent `json:"message"`
|
||||||
|
PreviousMessage *slackEvent `json:"previous_message"`
|
||||||
// human
|
// human
|
||||||
ParentID string `json:"thread_ts"`
|
ParentID string `json:"thread_ts"`
|
||||||
Text string
|
Text string
|
||||||
@@ -100,9 +113,9 @@ func ParseSlack(b []byte) (Message, error) {
|
|||||||
|
|
||||||
if s.Event.Bot.Name != "" {
|
if s.Event.Bot.Name != "" {
|
||||||
if len(s.Event.Attachments) == 0 {
|
if len(s.Event.Attachments) == 0 {
|
||||||
return Message{}, errors.New("bot message has no attachments")
|
return Message{}, ErrIrrelevantMessage
|
||||||
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
|
} else if !strings.Contains(s.Event.Attachments[0].Title, ": Firing: ") {
|
||||||
return Message{}, errors.New("bot message attachment is not Firing")
|
return Message{}, ErrIrrelevantMessage
|
||||||
}
|
}
|
||||||
return Message{
|
return Message{
|
||||||
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
|
ID: fmt.Sprintf("%s/%v", s.Event.ID, s.TS),
|
||||||
@@ -114,6 +127,7 @@ func ParseSlack(b []byte) (Message, error) {
|
|||||||
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
|
Event: strings.Split(s.Event.Attachments[0].Title, ":")[0],
|
||||||
Plaintext: s.Event.Attachments[0].Text,
|
Plaintext: s.Event.Attachments[0].Text,
|
||||||
Asset: "TODO",
|
Asset: "TODO",
|
||||||
|
Resolved: !strings.HasPrefix(s.Event.Attachments[0].Color, "F"),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,5 +147,21 @@ func ParseSlack(b []byte) (Message, error) {
|
|||||||
func parseSlack(b []byte) (slackMessage, error) {
|
func parseSlack(b []byte) (slackMessage, error) {
|
||||||
var result slackMessage
|
var result slackMessage
|
||||||
err := json.Unmarshal(b, &result)
|
err := json.Unmarshal(b, &result)
|
||||||
|
if result.Event.Nested != nil && !result.Event.Nested.Empty() {
|
||||||
|
result.Event.Blocks = result.Event.Nested.Blocks
|
||||||
|
result.Event.Bot = result.Event.Nested.Bot
|
||||||
|
result.Event.Attachments = result.Event.Nested.Attachments
|
||||||
|
result.Event.Nested = nil
|
||||||
|
}
|
||||||
|
if result.Event.PreviousMessage != nil {
|
||||||
|
if result.Event.PreviousMessage.ID != "" {
|
||||||
|
result.Event.ID = result.Event.PreviousMessage.ID
|
||||||
|
}
|
||||||
|
result.Event.PreviousMessage = nil
|
||||||
|
}
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this slackEvent) Empty() bool {
|
||||||
|
return fmt.Sprintf("%+v", this) == fmt.Sprintf("%+v", slackEvent{})
|
||||||
|
}
|
||||||
|
|||||||
@@ -79,6 +79,41 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
Asset: "TODO",
|
Asset: "TODO",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"opsgenie_alert_resolved.json": {
|
||||||
|
slackMessage: slackMessage{
|
||||||
|
TS: 1712916339,
|
||||||
|
Event: slackEvent{
|
||||||
|
ID: "1712916339.000300",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Bot: slackBot{
|
||||||
|
Name: "Opsgenie for Alert Management",
|
||||||
|
},
|
||||||
|
Attachments: []slackAttachment{{
|
||||||
|
Color: "2ecc71",
|
||||||
|
Title: "#11069: [Grafana]: Firing: Alertconfig Workflow Failed",
|
||||||
|
Text: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Fields: []slackField{
|
||||||
|
{Value: "P3", Title: "Priority"},
|
||||||
|
{Value: "alertname:Alertconfig Workflow Failed, grafana_folder:Datastores, rule_uid:a7639f7e-6950-41be-850a-b22119f74cbb", Title: "Tags"},
|
||||||
|
{Value: "Datastores Non-Critical", Title: "Routed Teams"},
|
||||||
|
},
|
||||||
|
Actions: []slackAction{},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: Message{
|
||||||
|
ID: "1712916339.000300/1712916339",
|
||||||
|
TS: 1712916339,
|
||||||
|
Source: "https://renderinc.slack.com/archives/C06U1DDBBU4/p1712916339000300",
|
||||||
|
Channel: "C06U1DDBBU4",
|
||||||
|
Thread: "1712916339.000300",
|
||||||
|
EventName: "Alertconfig Workflow Failed",
|
||||||
|
Event: "#11069",
|
||||||
|
Plaintext: "At least one alertconfig run has failed unexpectedly.\nDashboard: <https://grafana.render.com/d/VLZU83YVk?orgId=1>\nPanel: <https://grafana.render.com/d/VLZU83YVk?orgId=1&viewPanel=17>\nSource: <https://grafana.render.com/alerting/grafana/fa7b06b8-b4d8-4979-bce7-5e1c432edd81/view?orgId=1>",
|
||||||
|
Asset: "TODO",
|
||||||
|
Resolved: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, d := range cases {
|
for name, d := range cases {
|
||||||
@@ -107,6 +142,9 @@ func TestParseSlackTestdata(t *testing.T) {
|
|||||||
if got != want.message {
|
if got != want.message {
|
||||||
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
t.Errorf("wanted \n\t%+v, got\n\t%+v", want.message, got)
|
||||||
}
|
}
|
||||||
|
if time := got.Time(); time.Unix() != int64(got.TS) {
|
||||||
|
t.Error("not unix time", got.TS, time)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,10 +11,7 @@ func TestQueue(t *testing.T) {
|
|||||||
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
ctx, can := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
db := NewTestDBIn(t.TempDir())
|
q := NewQueue(NewRAM())
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
q := NewQueue(db)
|
|
||||||
|
|
||||||
for i := 0; i < 39; i++ {
|
for i := 0; i < 39; i++ {
|
||||||
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
|
if err := q.Push(ctx, Message{ID: strconv.Itoa(i), TS: uint64(i)}); err != nil {
|
||||||
|
|||||||
50
storage.go
50
storage.go
@@ -3,6 +3,8 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -17,12 +19,56 @@ func NewStorage(driver Driver) Storage {
|
|||||||
return Storage{driver: driver}
|
return Storage{driver: driver}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s Storage) Threads(ctx context.Context) ([]string, error) {
|
||||||
|
return s.ThreadsSince(ctx, time.Unix(0, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) ThreadsSince(ctx context.Context, t time.Time) ([]string, error) {
|
||||||
|
messages, err := s.messagesWhere(ctx, func(m Message) bool {
|
||||||
|
return !t.After(m.Time())
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
threads := map[string]struct{}{}
|
||||||
|
for _, m := range messages {
|
||||||
|
threads[m.Thread] = struct{}{}
|
||||||
|
}
|
||||||
|
result := make([]string, 0, len(threads))
|
||||||
|
for k := range threads {
|
||||||
|
result = append(result, k)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) Thread(ctx context.Context, thread string) ([]Message, error) {
|
||||||
|
return s.messagesWhere(ctx, func(m Message) bool {
|
||||||
|
return m.Thread == thread
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Storage) messagesWhere(ctx context.Context, where func(Message) bool) ([]Message, error) {
|
||||||
|
result := make([]Message, 0)
|
||||||
|
err := s.driver.ForEach(ctx, "m", func(_ string, v []byte) error {
|
||||||
|
m := MustDeserialize(v)
|
||||||
|
if !where(m) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result = append(result, m)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
sort.Slice(result, func(i, j int) bool {
|
||||||
|
return result[i].TS < result[j].TS
|
||||||
|
})
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
func (s Storage) Upsert(ctx context.Context, m Message) error {
|
func (s Storage) Upsert(ctx context.Context, m Message) error {
|
||||||
return s.driver.Set(ctx, "storage", m.ID, m.Serialize())
|
return s.driver.Set(ctx, "m", m.ID, m.Serialize())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
|
func (s Storage) Get(ctx context.Context, id string) (Message, error) {
|
||||||
b, err := s.driver.Get(ctx, "storage", id)
|
b, err := s.driver.Get(ctx, "m", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Message{}, err
|
return Message{}, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,35 +2,69 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//func newStorageFromTestdata(t *testing.T) {
|
||||||
|
|
||||||
func TestStorage(t *testing.T) {
|
func TestStorage(t *testing.T) {
|
||||||
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
ctx, can := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
db := NewTestDBIn(t.TempDir())
|
t.Run("Threads", func(t *testing.T) {
|
||||||
defer db.Close()
|
s := NewStorage(NewRAM())
|
||||||
|
|
||||||
s := NewStorage(db)
|
mX1 := Message{ID: "1", Thread: "X", TS: 1}
|
||||||
|
mX2 := Message{ID: "2", Thread: "X", TS: 2}
|
||||||
|
mY1 := Message{ID: "1", Thread: "Y", TS: 3}
|
||||||
|
|
||||||
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
|
for _, m := range []Message{mX1, mX2, mY1} {
|
||||||
t.Error("failed to get 404", err)
|
if err := s.Upsert(ctx, m); err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
m := Message{
|
if threads, err := s.Threads(ctx); err != nil {
|
||||||
ID: "id",
|
t.Error(err)
|
||||||
TS: 1,
|
} else if len(threads) != 2 {
|
||||||
}
|
t.Error(threads)
|
||||||
|
} else if !slices.Contains(threads, "X") {
|
||||||
|
t.Error(threads, "X")
|
||||||
|
} else if !slices.Contains(threads, "Y") {
|
||||||
|
t.Error(threads, "Y")
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.Upsert(ctx, m); err != nil {
|
if threads, err := s.ThreadsSince(ctx, time.Unix(3, 0)); err != nil {
|
||||||
t.Error("failed to upsert", err)
|
t.Error(err)
|
||||||
}
|
} else if len(threads) != 1 {
|
||||||
|
t.Error(threads)
|
||||||
|
} else if threads[0] != "Y" {
|
||||||
|
t.Error(threads[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
if m2, err := s.Get(ctx, "id"); err != nil {
|
t.Run("Get Upsert", func(t *testing.T) {
|
||||||
t.Error("failed to get", err)
|
s := NewStorage(NewRAM())
|
||||||
} else if m != m2 {
|
|
||||||
t.Error(m2)
|
if _, err := s.Get(ctx, "id"); err != ErrNotFound {
|
||||||
}
|
t.Error("failed to get 404", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m := Message{
|
||||||
|
ID: "id",
|
||||||
|
TS: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Upsert(ctx, m); err != nil {
|
||||||
|
t.Error("failed to upsert", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m2, err := s.Get(ctx, "id"); err != nil {
|
||||||
|
t.Error("failed to get", err)
|
||||||
|
} else if m != m2 {
|
||||||
|
t.Error(m2)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user