confirmed it is a web socket, need to impl pool

master
bel 2020-05-03 23:25:29 -06:00
parent 2388723fa5
commit 9ffd54d4bf
2 changed files with 71 additions and 15 deletions

26
pool.go Normal file
View File

@ -0,0 +1,26 @@
package main
import "io"
type Pool struct {
writers []io.Writer
r *io.PipeReader
w *io.PipeWriter
}
func NewPool() *Pool {
r, w := io.Pipe()
return &Pool{
writers: make([]io.Writer, 0),
r: r,
w: w,
}
}
func (p *Pool) Read(b []byte) (int, error) {
return p.r.Read(b)
}
func (p *Pool) Write(b []byte) (int, error) {
return p.w.Write(b)
}

60
ws.go
View File

@ -4,12 +4,14 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"sync"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type WS struct { type WS struct {
upgrader websocket.Upgrader upgrader websocket.Upgrader
pools *sync.Map
} }
func NewWS() *WS { func NewWS() *WS {
@ -19,6 +21,7 @@ func NewWS() *WS {
WriteBufferSize: 10240, WriteBufferSize: 10240,
CheckOrigin: func(_ *http.Request) bool { return true }, CheckOrigin: func(_ *http.Request) bool { return true },
}, },
pools: &sync.Map{},
} }
} }
@ -30,24 +33,51 @@ func (ws *WS) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func (ws *WS) serveHTTP(w http.ResponseWriter, r *http.Request) error { func (ws *WS) serveHTTP(w http.ResponseWriter, r *http.Request) error {
log.Println("ws serve http", r.URL.Path)
pooli, ok := ws.pools.Load(r.URL.Path)
if !ok {
pooli = NewPool()
}
pool := pooli.(*Pool)
log.Println("ws upgrade")
conn, err := ws.upgrader.Upgrade(w, r, nil) conn, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
return err return err
} }
for { log.Println("ws next reader")
mt, r, err := conn.NextReader() mt, _, err := conn.NextReader()
if err != nil { if err != nil {
return err return err
}
w, err := conn.NextWriter(mt)
if err != nil {
return err
}
if _, err := io.Copy(w, r); err != nil { // todo impl broadcast to channel;; sync map to all channels, goes to a forking reader-writer pipe, all listeners to broadcast read from pipe
return err
}
if err := w.Close(); err != nil {
return err
}
} }
log.Println("funcs")
go func() {
for {
_, r, err := conn.NextReader()
if err != nil {
panic(err)
}
log.Println("copying to pool...")
if _, err := io.Copy(pool, r); err != nil {
panic(err)
}
log.Println("/copying to pool...")
}
}()
func() {
for {
w, err := conn.NextWriter(mt)
if err != nil {
panic(err)
}
log.Println("copying from pool...")
if _, err := io.Copy(w, pool); err != nil { // todo impl broadcast to channel;; sync map to all channels, goes to a forking reader-writer pipe, all listeners to broadcast read from pipe
panic(err)
}
log.Println("/copying from pool...")
if err := w.Close(); err != nil {
panic(err)
}
}
}()
return nil
} }