Compare commits
3 Commits
fc5b26d74d
...
3f1921b023
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3f1921b023 | ||
|
|
253aeb6e65 | ||
|
|
9525032c24 |
18
go.mod
18
go.mod
@@ -1,3 +1,21 @@
|
|||||||
module gitea/json-adapter
|
module gitea/json-adapter
|
||||||
|
|
||||||
go 1.22.3
|
go 1.22.3
|
||||||
|
|
||||||
|
require github.com/glebarez/sqlite v1.11.0
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
|
github.com/glebarez/go-sqlite v1.21.2 // indirect
|
||||||
|
github.com/google/uuid v1.3.0 // indirect
|
||||||
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
|
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
|
golang.org/x/sys v0.7.0 // indirect
|
||||||
|
gorm.io/gorm v1.25.7 // indirect
|
||||||
|
modernc.org/libc v1.22.5 // indirect
|
||||||
|
modernc.org/mathutil v1.5.0 // indirect
|
||||||
|
modernc.org/memory v1.5.0 // indirect
|
||||||
|
modernc.org/sqlite v1.23.1 // indirect
|
||||||
|
)
|
||||||
|
|||||||
32
go.sum
Normal file
32
go.sum
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
|
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
|
||||||
|
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
|
||||||
|
github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw=
|
||||||
|
github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ=
|
||||||
|
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
|
||||||
|
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
|
||||||
|
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||||
|
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||||
|
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||||
|
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||||
|
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
|
||||||
|
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||||
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
|
||||||
|
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
|
||||||
|
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
|
||||||
|
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
|
||||||
|
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
|
||||||
|
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
|
||||||
|
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||||
|
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
|
||||||
|
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
|
||||||
|
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
|
||||||
|
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=
|
||||||
BIN
json-adapter
Executable file
BIN
json-adapter
Executable file
Binary file not shown.
221
main.go
221
main.go
@@ -2,6 +2,8 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"io"
|
"io"
|
||||||
@@ -13,87 +15,164 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/glebarez/sqlite"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var httpc = &http.Client{
|
||||||
|
Timeout: time.Minute,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DisableKeepAlives: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
tmpl *template.Template
|
||||||
|
target *url.URL
|
||||||
|
idempotency *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if err := h.serveHTTP(w, r); err != nil {
|
||||||
|
log.Println("!", err)
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h Handler) serveHTTP(w http.ResponseWriter, r *http.Request) error {
|
||||||
|
b, err := adapt(r.Body, h.tmpl)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var duplicate int
|
||||||
|
if err := h.idempotency.QueryRowContext(r.Context(), `SELECT 1 FROM payloads WHERE payload=$1;`, b).Scan(&duplicate); err != sql.ErrNoRows && err != nil {
|
||||||
|
log.Println("!", err)
|
||||||
|
} else if duplicate > 0 {
|
||||||
|
log.Println("+")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.prune(time.Now().Add(-1 * time.Hour * 24 * 7)); err != nil {
|
||||||
|
log.Println("!", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := proxy(h.target, r, io.NopCloser(bytes.NewReader(b)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if err := forward(w, resp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.idempotency.ExecContext(r.Context(), `INSERT INTO payloads (ts, payload) VALUES ($1, $2);`, time.Now(), b); err != nil {
|
||||||
|
log.Println("!", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
fs := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||||
p := fs.Int("p", 28080, "port")
|
p := fs.Int("p", 28080, "port")
|
||||||
t := fs.String("t", "{{ . }}", "template")
|
t := fs.String("t", "{{ . }}", "template")
|
||||||
y := fs.String("y", "http://localhost:41912", "target")
|
y := fs.String("y", "http://localhost:41912", "target")
|
||||||
|
db := fs.String("db", "/tmp/json-adapter.db", "sqlite db for idempotency")
|
||||||
if err := fs.Parse(os.Args[1:]); err != nil {
|
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
if err := run(*p, *t, *y, *db); err != nil {
|
||||||
tmpl, err := template.New("").Parse(*t)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
u, err := url.Parse(*y)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c := &http.Client{
|
|
||||||
Timeout: time.Minute,
|
|
||||||
Transport: &http.Transport{
|
|
||||||
DisableKeepAlives: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
handle := func(w http.ResponseWriter, r *http.Request) error {
|
|
||||||
b, _ := io.ReadAll(io.LimitReader(r.Body, 1024*1024))
|
|
||||||
|
|
||||||
var v interface{}
|
|
||||||
if err := json.Unmarshal(b, &v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
buff := bytes.NewBuffer(nil)
|
|
||||||
if err := tmpl.Execute(buff, v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("%s => %s => %s", b, buff.Bytes(), u.String())
|
|
||||||
|
|
||||||
req, err := http.NewRequest(r.Method, u.String(), io.NopCloser(buff))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req = req.WithContext(r.Context())
|
|
||||||
for k, v := range r.Header {
|
|
||||||
switch strings.ToLower(k) {
|
|
||||||
case "content-length":
|
|
||||||
default:
|
|
||||||
req.Header.Set(k, v[0])
|
|
||||||
for _, v := range v[1:] {
|
|
||||||
req.Header.Add(k, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resp, err := c.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
for k, v := range resp.Header {
|
|
||||||
w.Header().Set(k, v[0])
|
|
||||||
for _, v := range v[1:] {
|
|
||||||
w.Header().Add(k, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
w.WriteHeader(resp.StatusCode)
|
|
||||||
io.Copy(w, resp.Body)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
log.Println("listening on", *p)
|
|
||||||
if err := http.ListenAndServe(":"+strconv.Itoa(*p), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if err := handle(w, r); err != nil {
|
|
||||||
log.Println("!", err)
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
}
|
|
||||||
})); err != nil {
|
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func run(p int, t string, y string, db string) error {
|
||||||
|
idempotency, err := sql.Open("sqlite", db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer idempotency.Close()
|
||||||
|
if err := idempotency.PingContext(context.Background()); err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := idempotency.ExecContext(context.Background(), `CREATE TABLE IF NOT EXISTS payloads (payload TEXT, ts TIMESTAMP NOT NULL)`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpl, err := template.New("").Parse(t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(y)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h := Handler{tmpl: tmpl, target: u, idempotency: idempotency}
|
||||||
|
log.Println("listening on", p)
|
||||||
|
return http.ListenAndServe(":"+strconv.Itoa(p), h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func adapt(r io.Reader, tmpl *template.Template) ([]byte, error) {
|
||||||
|
b, _ := io.ReadAll(io.LimitReader(r, 1024*1024))
|
||||||
|
var v interface{}
|
||||||
|
buff := bytes.NewBuffer(nil)
|
||||||
|
if len(b) == 0 {
|
||||||
|
} else if err := json.Unmarshal(b, &v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := tmpl.Execute(buff, v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.Printf("%s => %s", b, buff.Bytes())
|
||||||
|
return buff.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func proxy(u *url.URL, r *http.Request, rc io.ReadCloser) (*http.Response, error) {
|
||||||
|
req, err := http.NewRequest(r.Method, u.String(), rc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req = req.WithContext(r.Context())
|
||||||
|
for k, v := range r.Header {
|
||||||
|
switch strings.ToLower(k) {
|
||||||
|
case "content-length":
|
||||||
|
default:
|
||||||
|
req.Header.Set(k, v[0])
|
||||||
|
for _, v := range v[1:] {
|
||||||
|
req.Header.Add(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return httpc.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func forward(w http.ResponseWriter, resp *http.Response) error {
|
||||||
|
for k, v := range resp.Header {
|
||||||
|
w.Header().Set(k, v[0])
|
||||||
|
for _, v := range v[1:] {
|
||||||
|
w.Header().Add(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.WriteHeader(resp.StatusCode)
|
||||||
|
io.Copy(w, resp.Body)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h Handler) prune(ts time.Time) error {
|
||||||
|
result, err := h.idempotency.ExecContext(context.Background(), `DELETE FROM payloads WHERE ts < $1;`, ts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, _ := result.RowsAffected()
|
||||||
|
if rows > 1 {
|
||||||
|
log.Println("-", rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
80
main_test.go
Normal file
80
main_test.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRun(t *testing.T) {
|
||||||
|
targetCalls := 0
|
||||||
|
target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
targetCalls += 1
|
||||||
|
}))
|
||||||
|
defer target.Close()
|
||||||
|
|
||||||
|
p := func() int {
|
||||||
|
s := httptest.NewServer(http.HandlerFunc(http.NotFound))
|
||||||
|
s.Close()
|
||||||
|
u := s.URL
|
||||||
|
ps := strings.Split(u, ":")[2]
|
||||||
|
n, err := strconv.Atoi(ps)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}()
|
||||||
|
tmpl := `{{ . }}`
|
||||||
|
x := fmt.Sprintf(`http://localhost:%d`, p)
|
||||||
|
y := target.URL
|
||||||
|
db := path.Join(t.TempDir(), "db")
|
||||||
|
|
||||||
|
do := func(method, body string) bool {
|
||||||
|
req, _ := http.NewRequest(method, x, strings.NewReader(body))
|
||||||
|
resp, err := httpc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := run(p, tmpl, y, db); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for !do(http.MethodGet, "") {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
if targetCalls != 1 {
|
||||||
|
t.Error("empty req body no called target")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !do(http.MethodGet, "") {
|
||||||
|
t.Error("couldnt get a second time with no body")
|
||||||
|
} else if targetCalls != 1 {
|
||||||
|
t.Error("no dedupe no body")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !do(http.MethodPost, "1") {
|
||||||
|
t.Error("couldnt get a third time with new body")
|
||||||
|
} else if targetCalls != 2 {
|
||||||
|
t.Error("deduped new body")
|
||||||
|
} else if !do(http.MethodPost, "1") {
|
||||||
|
t.Error("couldnt get a fourth time with new body again")
|
||||||
|
} else if targetCalls != 2 {
|
||||||
|
t.Error("no deduped new body again")
|
||||||
|
} else if !do(http.MethodPost, "2") {
|
||||||
|
t.Error("couldnt get a fifth time with new new body")
|
||||||
|
} else if targetCalls != 3 {
|
||||||
|
t.Error("deduped new new body")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user