video call with imperfect reconnect, persistent id, possibly many clients

master
bel 2020-05-07 19:39:27 -06:00
parent 9ffd54d4bf
commit fa959a2570
7 changed files with 105 additions and 65 deletions

View File

@ -6,9 +6,9 @@ func config() *args.ArgSet {
as := args.NewArgSet() as := args.NewArgSet()
as.Append(args.INT, "p", "port to listen on", "58080") as.Append(args.INT, "p", "port to listen on", "58080")
as.Append(args.STRING, "d", "root dir to serve static", ".") as.Append(args.STRING, "d", "root dir to serve static", "./public")
as.Append(args.STRING, "crt", "path to crt", "./cert.crt") as.Append(args.STRING, "crt", "path to crt", "./testdata/scratch.crt")
as.Append(args.STRING, "key", "path to key", "./cert.key") as.Append(args.STRING, "key", "path to key", "./testdata/scratch.key")
if err := as.Parse(); err != nil { if err := as.Parse(); err != nil {
panic(err) panic(err)

10
main.go
View File

@ -28,7 +28,13 @@ func main() {
c := as.GetString("crt") c := as.GetString("crt")
k := as.GetString("key") k := as.GetString("key")
log.Printf("listening on %q", httpsServer.Addr) log.Printf("listening on %q", httpsServer.Addr)
if err := httpsServer.ListenAndServeTLS(c, k); err != nil { if c == "" && k == "" {
panic(err) if err := httpsServer.ListenAndServe(); err != nil {
panic(err)
}
} else {
if err := httpsServer.ListenAndServeTLS(c, k); err != nil {
panic(err)
}
} }
} }

53
pool.go
View File

@ -1,26 +1,51 @@
package main package main
import "io" import (
"io"
"io/ioutil"
"log"
"sync"
"github.com/gorilla/websocket"
)
type Pool struct { type Pool struct {
writers []io.Writer conns *sync.Map //map[string]*websocket.Conn
r *io.PipeReader
w *io.PipeWriter
} }
func NewPool() *Pool { func NewPool() *Pool {
r, w := io.Pipe()
return &Pool{ return &Pool{
writers: make([]io.Writer, 0), conns: &sync.Map{}, //map[string]*websocket.Conn{},
r: r,
w: w,
} }
} }
func (p *Pool) Read(b []byte) (int, error) { func (p *Pool) Broadcast(mt int, r io.Reader) error {
return p.r.Read(b) b, err := ioutil.ReadAll(r)
} if err != nil {
return err
func (p *Pool) Write(b []byte) (int, error) { }
return p.w.Write(b) n := 1000000
cnt := 0
p.conns.Range(func(k, v interface{}) bool {
k = k.(string)
conn := v.(*websocket.Conn)
cnt += 1
w, err := conn.NextWriter(mt)
if err != nil {
p.conns.Delete(k)
return true
}
defer w.Close()
m, err := w.Write(b)
if err != nil {
p.conns.Delete(k)
return true
}
if m < n {
n = m
}
return true
})
log.Printf("%d writes, %d min size, %d wanted size", cnt, n, len(b))
return nil
} }

View File

@ -5,9 +5,12 @@
<script src="webrtc.js"></script> <script src="webrtc.js"></script>
<style> <style>
video { video {
max-width: 100%; width: 100%;
min-width: 100%; width: 100%;
max-width: 400px;
min-width: 400px;
border: 1px solid black; border: 1px solid black;
display: inline-block;
} }
</style> </style>
</head> </head>

View File

@ -12,13 +12,40 @@ var peerConnectionConfig = {
] ]
}; };
function getCookie(cname) {
var name = cname + "=";
var decodedCookie = decodeURIComponent(document.cookie);
var ca = decodedCookie.split(';');
for(var i = 0; i <ca.length; i++) {
var c = ca[i];
while (c.charAt(0) == ' ') {
c = c.substring(1);
}
if (c.indexOf(name) == 0) {
return c.substring(name.length, c.length);
}
}
return "";
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (1*24*60*60*1000));
var expires = "expires="+ d.toUTCString();
document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}
function pageReady() { function pageReady() {
uuid = createUUID(); uuid = getCookie("uuid");
if (!uuid) {
uuid = createUUID();
setCookie("uuid", uuid);
}
localVideo = document.getElementById('localVideo'); localVideo = document.getElementById('localVideo');
remoteVideo = document.getElementById('remoteVideo'); remoteVideo = document.getElementById('remoteVideo');
serverConnection = new WebSocket('wss://' + window.location.hostname + '/abc'); serverConnection = new WebSocket('wss://' + window.location.hostname + '/abc?uuid=' + uuid);
serverConnection.onmessage = gotMessageFromServer; serverConnection.onmessage = gotMessageFromServer;
var constraints = { var constraints = {

View File

@ -21,11 +21,12 @@ func New() *Server {
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("serving", r.URL) log.Println("ext", path.Ext(r.URL.Path))
if _, err := os.Stat(path.Join(config().GetString("d"), r.URL.Path[1:])); os.IsNotExist(err) { if path.Ext(r.URL.Path) != "" {
s.fs.ServeHTTP(w, r)
} else if _, err := os.Stat(path.Join(config().GetString("d"), r.URL.Path[1:])); os.IsNotExist(err) {
s.ws.ServeHTTP(w, r) s.ws.ServeHTTP(w, r)
} else { } else {
log.Printf("Serving static %q from %q", r.URL.Path, config().GetString("d"))
s.fs.ServeHTTP(w, r) s.fs.ServeHTTP(w, r)
} }
} }

56
ws.go
View File

@ -1,11 +1,11 @@
package main package main
import ( import (
"io"
"log" "log"
"net/http" "net/http"
"sync" "sync"
"github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -17,8 +17,8 @@ type WS struct {
func NewWS() *WS { func NewWS() *WS {
return &WS{ return &WS{
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
ReadBufferSize: 10240, ReadBufferSize: 16384,
WriteBufferSize: 10240, WriteBufferSize: 16384,
CheckOrigin: func(_ *http.Request) bool { return true }, CheckOrigin: func(_ *http.Request) bool { return true },
}, },
pools: &sync.Map{}, pools: &sync.Map{},
@ -33,51 +33,29 @@ func (ws *WS) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func (ws *WS) serveHTTP(w http.ResponseWriter, r *http.Request) error { func (ws *WS) serveHTTP(w http.ResponseWriter, r *http.Request) error {
id := r.URL.Query().Get("uuid")
if len(id) == 0 {
id = uuid.New().String()
}
log.Println("ws serve http", r.URL.Path) log.Println("ws serve http", r.URL.Path)
pooli, ok := ws.pools.Load(r.URL.Path) pooli, ok := ws.pools.Load(r.URL.Path)
if !ok { if !ok {
pooli = NewPool() pooli = NewPool()
ws.pools.Store(r.URL.Path, pooli.(*Pool))
} }
pool := pooli.(*Pool) pool := pooli.(*Pool)
log.Println("ws upgrade")
conn, err := ws.upgrader.Upgrade(w, r, nil) conn, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
return err return err
} }
log.Println("ws next reader") pool.conns.Store(id, conn)
mt, _, err := conn.NextReader() for {
if err != nil { mt, reader, err := conn.NextReader()
return err if err != nil {
return err
}
if err := pool.Broadcast(mt, reader); err != nil {
return err
}
} }
log.Println("funcs")
go func() {
for {
_, r, err := conn.NextReader()
if err != nil {
panic(err)
}
log.Println("copying to pool...")
if _, err := io.Copy(pool, r); err != nil {
panic(err)
}
log.Println("/copying to pool...")
}
}()
func() {
for {
w, err := conn.NextWriter(mt)
if err != nil {
panic(err)
}
log.Println("copying from pool...")
if _, err := io.Copy(w, pool); err != nil { // todo impl broadcast to channel;; sync map to all channels, goes to a forking reader-writer pipe, all listeners to broadcast read from pipe
panic(err)
}
log.Println("/copying from pool...")
if err := w.Close(); err != nil {
panic(err)
}
}
}()
return nil
} }