diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..399db03 --- /dev/null +++ b/pool.go @@ -0,0 +1,26 @@ +package main + +import "io" + +type Pool struct { + writers []io.Writer + r *io.PipeReader + w *io.PipeWriter +} + +func NewPool() *Pool { + r, w := io.Pipe() + return &Pool{ + writers: make([]io.Writer, 0), + r: r, + w: w, + } +} + +func (p *Pool) Read(b []byte) (int, error) { + return p.r.Read(b) +} + +func (p *Pool) Write(b []byte) (int, error) { + return p.w.Write(b) +} diff --git a/ws.go b/ws.go index 8d01927..570f4f0 100644 --- a/ws.go +++ b/ws.go @@ -4,12 +4,14 @@ import ( "io" "log" "net/http" + "sync" "github.com/gorilla/websocket" ) type WS struct { upgrader websocket.Upgrader + pools *sync.Map } func NewWS() *WS { @@ -19,6 +21,7 @@ func NewWS() *WS { WriteBufferSize: 10240, CheckOrigin: func(_ *http.Request) bool { return true }, }, + pools: &sync.Map{}, } } @@ -30,24 +33,51 @@ func (ws *WS) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (ws *WS) serveHTTP(w http.ResponseWriter, r *http.Request) error { + log.Println("ws serve http", r.URL.Path) + pooli, ok := ws.pools.Load(r.URL.Path) + if !ok { + pooli = NewPool() + } + pool := pooli.(*Pool) + log.Println("ws upgrade") conn, err := ws.upgrader.Upgrade(w, r, nil) if err != nil { return err } - for { - mt, r, err := conn.NextReader() - if err != nil { - return err - } - w, err := conn.NextWriter(mt) - if err != nil { - return err - } - if _, err := io.Copy(w, r); err != nil { // todo impl broadcast to channel;; sync map to all channels, goes to a forking reader-writer pipe, all listeners to broadcast read from pipe - return err - } - if err := w.Close(); err != nil { - return err - } + log.Println("ws next reader") + mt, _, err := conn.NextReader() + if err != nil { + return err } + log.Println("funcs") + go func() { + for { + _, r, err := conn.NextReader() + if err != nil { + panic(err) + } + log.Println("copying to pool...") + if _, err := io.Copy(pool, r); err != nil { + panic(err) + } + log.Println("/copying to pool...") + } + }() + func() { + for { + w, err := conn.NextWriter(mt) + if err != nil { + panic(err) + } + log.Println("copying from pool...") + if _, err := io.Copy(w, pool); err != nil { // todo impl broadcast to channel;; sync map to all channels, goes to a forking reader-writer pipe, all listeners to broadcast read from pipe + panic(err) + } + log.Println("/copying from pool...") + if err := w.Close(); err != nil { + panic(err) + } + } + }() + return nil }