diff --git a/examples/chat/conn.go b/examples/chat/conn.go index ae25b75..35f2e68 100644 --- a/examples/chat/conn.go +++ b/examples/chat/conn.go @@ -5,13 +5,15 @@ package main import ( - "github.com/gorilla/websocket" "log" "net/http" "time" + + "github.com/gorilla/websocket" ) const ( + // Time allowed to write a message to the peer. writeWait = 10 * time.Second @@ -23,6 +25,9 @@ const ( // Maximum message size allowed from peer. maxMessageSize = 512 + + // Name of the broadcast channel + BcPrefix = "#all" ) var upgrader = websocket.Upgrader{ @@ -39,21 +44,46 @@ type connection struct { send chan []byte } -// readPump pumps messages from the websocket connection to the hub. -func (c *connection) readPump() { - defer func() { - h.unregister <- c - c.ws.Close() - }() +type msgIn struct { + Chan string + Msg string +} + +func (c *connection) startPumps() { + c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + + //likely slow-start message miss problem here + go c.readPump() + go c.writePump() + +} + +// readPump pumps messages from the websocket connection to the hub. +func (c *connection) readPump() { + for { - _, message, err := c.ws.ReadMessage() + + var msg msgIn + + err := c.ws.ReadJSON(&msg) if err != nil { - break + log.Printf("error reading message:shutting down connection:err:%s:", err) + c.write(websocket.CloseMessage, []byte{}) + h.unregister <- c + return } - h.broadcast <- message + + log.Printf("message:%s:\n", msg) + + if msg.Chan == BcPrefix { + h.broadcast <- []byte(msg.Msg) + } else { + log.Printf("No target channel found in message:msg:%s:", msg) + } + } } @@ -65,42 +95,54 @@ func (c *connection) write(mt int, payload []byte) error { // writePump pumps messages from the hub to the websocket connection. func (c *connection) writePump() { + ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.ws.Close() - }() + for { select { case message, ok := <-c.send: + if !ok { + log.Printf("error reading from send channel:shutting it down:") c.write(websocket.CloseMessage, []byte{}) + h.unregister <- c return } + if err := c.write(websocket.TextMessage, message); err != nil { + log.Printf("error writing to channel:shutting it down:") + h.unregister <- c return + } else { + log.Printf("writing to channel:") } + case <-ticker.C: if err := c.write(websocket.PingMessage, []byte{}); err != nil { + log.Printf("error pinging to channel:shutting it down:") + h.unregister <- c return } } } + } // serveWs handles websocket requests from the peer. func serveWs(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { http.Error(w, "Method not allowed", 405) return } + ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } + c := &connection{send: make(chan []byte, 256), ws: ws} h.register <- c - go c.writePump() - c.readPump() + } diff --git a/examples/chat/home.html b/examples/chat/home.html index 2959922..196d15f 100644 --- a/examples/chat/home.html +++ b/examples/chat/home.html @@ -26,13 +26,14 @@ if (!msg.val()) { return false; } - conn.send(msg.val()); + + conn.send( JSON.stringify({ chan: "{{ .BcPrefix }}", msg: msg.val() }) ); msg.val(""); return false }); if (window["WebSocket"]) { - conn = new WebSocket("ws://{{$}}/ws"); + conn = new WebSocket("ws://{{ .Hostname }}/ws"); conn.onclose = function(evt) { appendLog($("
Connection closed.
")) } diff --git a/examples/chat/hub.go b/examples/chat/hub.go index 449ba75..a40d02e 100644 --- a/examples/chat/hub.go +++ b/examples/chat/hub.go @@ -4,6 +4,10 @@ package main +import ( + "log" +) + // hub maintains the set of active connections and broadcasts messages to the // connections. type hub struct { @@ -31,7 +35,15 @@ func (h *hub) run() { for { select { case c := <-h.register: - h.connections[c] = true + + // start up a single reader/writer for each connection + if _, ok := h.connections[c]; !ok { + h.connections[c] = true + + c.startPumps() + + } + case c := <-h.unregister: if _, ok := h.connections[c]; ok { delete(h.connections, c) @@ -41,9 +53,9 @@ func (h *hub) run() { for c := range h.connections { select { case c.send <- m: + log.Printf("hub sending message:msg:%s:", string(m)) default: - close(c.send) - delete(h.connections, c) + log.Printf("hub got an unknown message") } } } diff --git a/examples/chat/main.go b/examples/chat/main.go index 3c4448d..f977c00 100644 --- a/examples/chat/main.go +++ b/examples/chat/main.go @@ -24,14 +24,23 @@ func serveHome(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "text/html; charset=utf-8") - homeTempl.Execute(w, r.Host) + + homeTempl.Execute(w, struct { + BcPrefix string + Hostname string + }{BcPrefix: BcPrefix, Hostname: r.Host}) } func main() { + flag.Parse() + go h.run() + http.HandleFunc("/", serveHome) + http.HandleFunc("/ws", serveWs) + err := http.ListenAndServe(*addr, nil) if err != nil { log.Fatal("ListenAndServe: ", err)