async slack scrape goes up to ?since

main
Bel LaPointe 2024-04-18 14:56:33 -06:00
parent 14de286415
commit f8861a73b5
10 changed files with 229 additions and 124 deletions

View File

@ -32,6 +32,7 @@ type Config struct {
storage Storage storage Storage
ai AI ai AI
slackToModelPipeline Pipeline slackToModelPipeline Pipeline
slackScrapePipeline Pipeline
modelToPersistencePipeline Pipeline modelToPersistencePipeline Pipeline
} }
@ -146,5 +147,11 @@ func newConfigFromEnv(ctx context.Context, getEnv func(string) string) (Config,
} }
result.modelToPersistencePipeline = modelToPersistencePipeline result.modelToPersistencePipeline = modelToPersistencePipeline
slackScrapePipeline, err := NewSlackScrapePipeline(ctx, result)
if err != nil {
return Config{}, err
}
result.slackScrapePipeline = slackScrapePipeline
return result, nil return result, nil
} }

5
go.mod
View File

@ -6,9 +6,9 @@ require (
github.com/glebarez/go-sqlite v1.21.2 github.com/glebarez/go-sqlite v1.21.2
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
github.com/nikolaydubina/llama2.go v0.7.1
github.com/tmc/langchaingo v0.1.8 github.com/tmc/langchaingo v0.1.8
gotest.tools/v3 v3.5.1 golang.org/x/time v0.5.0
gotest.tools v2.2.0+incompatible
) )
require ( require (
@ -20,7 +20,6 @@ require (
github.com/pkoukk/tiktoken-go v0.1.6 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
modernc.org/libc v1.22.5 // indirect modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect

6
go.sum
View File

@ -16,8 +16,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nikolaydubina/llama2.go v0.7.1 h1:ORmH1XbwFYGIOPHprkjtUPOEovlVXhnmnMjbMckaSyE=
github.com/nikolaydubina/llama2.go v0.7.1/go.mod h1:ggXhXOaDnEAgSSkcYsomqx/RLjInxe5ZAbcJ+/Y2mTM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
@ -34,12 +32,12 @@ github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdF
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=

90
main.go
View File

@ -39,6 +39,7 @@ func run(ctx context.Context, cfg Config) error {
case err := <-processPipelines(ctx, case err := <-processPipelines(ctx,
cfg.slackToModelPipeline, cfg.slackToModelPipeline,
cfg.modelToPersistencePipeline, cfg.modelToPersistencePipeline,
cfg.slackScrapePipeline,
): ):
return err return err
case err := <-listenAndServe(ctx, cfg): case err := <-listenAndServe(ctx, cfg):
@ -111,85 +112,24 @@ func newHandlerPutAPIV1RPCScrapeSlack(cfg Config) http.HandlerFunc {
if !basicAuth(cfg, w, r) { if !basicAuth(cfg, w, r) {
return return
} }
channel := r.Header.Get("slack-channel")
token := r.Header.Get("slack-oauth-token")
urls := []string{"https://slack.com/api/conversations.history?channel=" + channel} since, err := parseSince(r.URL.Query().Get("since"))
if err != nil {
httpc := http.Client{Timeout: time.Second} http.Error(w, err.Error(), http.StatusBadRequest)
get := func(url string) ([]byte, error) { return
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token)
req = req.WithContext(r.Context())
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
return io.ReadAll(resp.Body)
} }
n := 0 job, _ := json.Marshal(SlackScrape{
Latest: time.Now().Unix(),
for len(urls) > 0 { Oldest: since.Unix(),
url := urls[0] ThreadTS: "",
urls = urls[1:] Channel: r.Header.Get("slack-channel"),
select { Token: r.Header.Get("slack-oauth-token"),
case <-r.Context().Done(): })
case <-time.After(time.Second): if err := cfg.slackScrapePipeline.reader.Enqueue(r.Context(), job); err != nil {
} http.Error(w, err.Error(), http.StatusInternalServerError)
body, err := get(url) return
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
errs := []error{}
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("rpc/scrapeslack => %s", messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(r.Context(), b); err != nil {
errs = append(errs, err)
} else {
n += 1
}
if !strings.Contains(url, "ts=") {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
urls = append(urls, fmt.Sprintf("https://slack.com/api/conversations.replies?channel=%s&ts=%s", channel, peek.ThreadTS))
}
}
}
if len(errs) > 0 {
http.Error(w, fmt.Sprint(errs), http.StatusInternalServerError)
return
}
} }
json.NewEncoder(w).Encode(map[string]any{"scraped": n})
} }
} }

View File

@ -47,6 +47,7 @@ func TestRun(t *testing.T) {
cfg.SlackToken = "redacted" cfg.SlackToken = "redacted"
cfg.SlackChannels = []string{"C06U1DDBBU4"} cfg.SlackChannels = []string{"C06U1DDBBU4"}
cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg) cfg.slackToModelPipeline, _ = NewSlackToModelPipeline(ctx, cfg)
cfg.slackScrapePipeline, _ = NewSlackScrapePipeline(ctx, cfg)
cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg) cfg.modelToPersistencePipeline, _ = NewModelToPersistencePipeline(ctx, cfg)
go func() { go func() {

View File

@ -30,11 +30,11 @@ func NewModelToPersistencePipeline(ctx context.Context, cfg Config) (Pipeline, e
return Pipeline{ return Pipeline{
writer: writer, writer: writer,
reader: reader, reader: reader,
process: newModelToPersistenceProcess(cfg.storage), process: newModelToPersistenceProcess(cfg, cfg.storage),
}, nil }, nil
} }
func newModelToPersistenceProcess(storage Storage) processFunc { func newModelToPersistenceProcess(cfg Config, storage Storage) processFunc {
return func(ctx context.Context, models []byte) ([]byte, error) { return func(ctx context.Context, models []byte) ([]byte, error) {
var m Models var m Models
if err := json.Unmarshal(models, &m); err != nil { if err := json.Unmarshal(models, &m); err != nil {
@ -56,7 +56,9 @@ func newModelToPersistenceProcess(storage Storage) processFunc {
return nil, fmt.Errorf("failed to persist message: %w", err) return nil, fmt.Errorf("failed to persist message: %w", err)
} }
log.Printf("persisted models") if cfg.Debug {
log.Printf("persisted models")
}
return json.Marshal(ModelIDs{ return json.Marshal(ModelIDs{
Event: m.Event.ID, Event: m.Event.ID,
Thread: m.Thread.ID, Thread: m.Thread.ID,

View File

@ -16,7 +16,7 @@ func TestModelToPersistenceProcessor(t *testing.T) {
d := NewTestDriver(t) d := NewTestDriver(t)
s, _ := NewStorage(ctx, d) s, _ := NewStorage(ctx, d)
process := newModelToPersistenceProcess(s) process := newModelToPersistenceProcess(Config{}, s)
_, _ = ctx, process _, _ = ctx, process

View File

@ -20,14 +20,14 @@ func NewNoopQueue() Queue {
func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) { func NewQueue(ctx context.Context, topic string, driver Driver) (Queue, error) {
if _, err := driver.ExecContext(ctx, ` if _, err := driver.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS queue ( CREATE TABLE IF NOT EXISTS queue (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
topic TEXT NOT NULL, topic TEXT NOT NULL,
updated INTEGER NOT NULL, updated INTEGER NOT NULL,
reservation TEXT, reservation TEXT,
payload TEXT payload TEXT
); );
`); err != nil { `); err != nil {
return Queue{}, fmt.Errorf("failed to create table: %w", err) return Queue{}, fmt.Errorf("failed to create table: %w", err)
} }
return Queue{topic: topic, driver: driver}, nil return Queue{topic: topic, driver: driver}, nil
@ -37,15 +37,23 @@ func (q Queue) Enqueue(ctx context.Context, b []byte) error {
if q.driver.DB == nil { if q.driver.DB == nil {
return nil return nil
} }
_, err := q.driver.ExecContext(ctx, ` result, err := q.driver.ExecContext(ctx, `
INSERT INTO queue (id, topic, updated, payload) VALUES ($4, $1, $2, $3) INSERT INTO queue (id, topic, updated, payload) VALUES ($1, $2, $3, $4)
`, `,
uuid.New().String(),
q.topic, q.topic,
time.Now().Unix(), time.Now().Unix(),
b, b,
uuid.New().String(),
) )
return err if err != nil {
return err
}
if n, err := result.RowsAffected(); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("insert into queue %s affected %v rows", b, n)
}
return nil
} }
func (q Queue) Syn(ctx context.Context) (string, []byte, error) { func (q Queue) Syn(ctx context.Context) (string, []byte, error) {
@ -71,22 +79,22 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
reservation := []byte(uuid.New().String()) reservation := []byte(uuid.New().String())
var payload []byte var payload []byte
if result, err := q.driver.ExecContext(ctx, ` if result, err := q.driver.ExecContext(ctx, `
UPDATE queue UPDATE queue
SET SET
updated = $1, reservation = $2 updated = $1, reservation = $2
WHERE WHERE
id IN ( id IN (
SELECT id SELECT id
FROM queue FROM queue
WHERE WHERE
topic = $3 topic = $3
AND ( AND (
reservation IS NULL reservation IS NULL
OR $4 - updated > 60 OR $4 - updated > 60
) )
LIMIT 1 LIMIT 1
) )
`, now, reservation, q.topic, now); err != nil { `, now, reservation, q.topic, now); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: %w", err) return nil, nil, fmt.Errorf("failed to assign reservation: %w", err)
} else if n, err := result.RowsAffected(); err != nil { } else if n, err := result.RowsAffected(); err != nil {
return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err) return nil, nil, fmt.Errorf("failed to assign reservation: no count: %w", err)
@ -95,11 +103,11 @@ func (q Queue) syn(ctx context.Context) ([]byte, []byte, error) {
} }
row := q.driver.QueryRowContext(ctx, ` row := q.driver.QueryRowContext(ctx, `
SELECT payload SELECT payload
FROM queue FROM queue
WHERE reservation=$1 WHERE reservation=$1
LIMIT 1 LIMIT 1
`, reservation) `, reservation)
if err := row.Err(); err != nil { if err := row.Err(); err != nil {
return nil, nil, fmt.Errorf("failed to query reservation: %w", err) return nil, nil, fmt.Errorf("failed to query reservation: %w", err)
} else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") { } else if err := row.Scan(&payload); err != nil && !strings.Contains(err.Error(), "no rows in result") {
@ -118,9 +126,9 @@ func (q Queue) ack(ctx context.Context, reservation []byte) error {
return nil return nil
} }
result, err := q.driver.ExecContext(ctx, ` result, err := q.driver.ExecContext(ctx, `
DELETE FROM queue DELETE FROM queue
WHERE reservation=$1 WHERE reservation=$1
`, reservation) `, reservation)
if err != nil { if err != nil {
return err return err
} }

View File

@ -77,7 +77,9 @@ func newSlackToModelProcess(cfg Config) processFunc {
thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event) thread = model.NewThread(s.Thread, s.Source, s.TS, s.Channel, s.Event)
} }
log.Printf("parsed slack message into models") if cfg.Debug {
log.Printf("parsed slack message into models")
}
return json.Marshal(Models{ return json.Marshal(Models{
Event: event, Event: event,
Message: message, Message: message,

148
slackscrape.go Normal file
View File

@ -0,0 +1,148 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"time"
"golang.org/x/time/rate"
)
type SlackScrape struct {
Latest int64
Oldest int64
ThreadTS string
Channel string
Token string
}
func NewSlackScrapePipeline(ctx context.Context, cfg Config) (Pipeline, error) {
reader, err := NewQueue(ctx, "slack_channels_to_scrape", cfg.driver)
if err != nil {
return Pipeline{}, err
}
cfg.slackScrapePipeline.reader = reader
writer := NewNoopQueue()
return Pipeline{
writer: writer,
reader: reader,
process: newSlackScrapeProcess(cfg),
}, nil
}
func newSlackScrapeProcess(cfg Config) processFunc {
limiter := rate.NewLimiter(0.5, 1)
return func(ctx context.Context, jobb []byte) ([]byte, error) {
log.Printf("newSlackScrapeProcess(%s)", jobb)
if err := limiter.Wait(ctx); err != nil {
return nil, err
}
var job SlackScrape
if err := json.Unmarshal(jobb, &job); err != nil {
return nil, fmt.Errorf("received non SlackScrape payload: %w", err)
}
u := url.URL{
Scheme: "https",
Host: "slack.com",
Path: "/api/conversations.history",
}
q := url.Values{}
q.Set("channel", job.Channel)
q.Set("latest", strconv.FormatInt(job.Latest, 10))
q.Set("limit", "999")
q.Set("inclusive", "true")
if job.ThreadTS != "" {
u.Path = "/api/conversations.replies"
q.Set("ts", job.ThreadTS)
}
if job.Oldest != 0 {
q.Set("oldest", strconv.FormatInt(job.Oldest, 10))
}
u.RawQuery = q.Encode()
url := u.String()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+job.Token)
req = req.WithContext(ctx)
httpc := http.Client{Timeout: time.Second}
resp, err := httpc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("(%d) %s", resp.StatusCode, b)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var page struct {
Messages []json.RawMessage
}
if err := json.Unmarshal(body, &page); err != nil {
return nil, err
}
newLatest := float64(job.Latest)
for _, messageJSON := range page.Messages {
if cfg.Debug {
log.Printf("slackScrapePipeline %s => %s", url, messageJSON)
}
b, _ := json.Marshal(ChannelWrapper{Channel: job.Channel, V: messageJSON})
if err := cfg.slackToModelPipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
var peekTS struct {
TS float64 `json:"ts,string"`
}
if err := json.Unmarshal(messageJSON, &peekTS); err == nil && peekTS.TS > 0 && peekTS.TS < newLatest {
newLatest = peekTS.TS
}
if job.ThreadTS == "" {
var peek struct {
ThreadTS string `json:"thread_ts"`
}
json.Unmarshal(messageJSON, &peek)
if peek.ThreadTS != "" {
clone := job
clone.ThreadTS = peek.ThreadTS
clone.Oldest = 0
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("enqueue thread scrape for %s/%s", job.Channel, peek.ThreadTS)
}
}
}
if len(page.Messages) == 999 {
clone := job
clone.Latest = int64(newLatest)
b, _ := json.Marshal(clone)
if err := cfg.slackScrapePipeline.reader.Enqueue(ctx, b); err != nil {
return nil, err
}
log.Printf("enqueue page scrape for %s up to %v", job.Channel, clone.Latest)
}
log.Printf("scraped %v from %s", len(page.Messages), url)
return nil, nil
}
}