revision of chat example that uses single goroutine per connection

This commit is contained in:
mcqueen 2015-10-23 18:19:40 -07:00
parent 5ed2f4547d
commit 7b763760a1
4 changed files with 86 additions and 22 deletions

View File

@ -5,13 +5,15 @@
package main package main
import ( import (
"github.com/gorilla/websocket"
"log" "log"
"net/http" "net/http"
"time" "time"
"github.com/gorilla/websocket"
) )
const ( const (
// Time allowed to write a message to the peer. // Time allowed to write a message to the peer.
writeWait = 10 * time.Second writeWait = 10 * time.Second
@ -23,6 +25,9 @@ const (
// Maximum message size allowed from peer. // Maximum message size allowed from peer.
maxMessageSize = 512 maxMessageSize = 512
// Name of the broadcast channel
BcPrefix = "#all"
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
@ -39,21 +44,46 @@ type connection struct {
send chan []byte send chan []byte
} }
// readPump pumps messages from the websocket connection to the hub. type msgIn struct {
func (c *connection) readPump() { Chan string
defer func() { Msg string
h.unregister <- c }
c.ws.Close()
}() func (c *connection) startPumps() {
c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil }) c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
//likely slow-start message miss problem here
go c.readPump()
go c.writePump()
}
// readPump pumps messages from the websocket connection to the hub.
func (c *connection) readPump() {
for { for {
_, message, err := c.ws.ReadMessage()
var msg msgIn
err := c.ws.ReadJSON(&msg)
if err != nil { if err != nil {
break log.Printf("error reading message:shutting down connection:err:%s:", err)
c.write(websocket.CloseMessage, []byte{})
h.unregister <- c
return
} }
h.broadcast <- message
log.Printf("message:%s:\n", msg)
if msg.Chan == BcPrefix {
h.broadcast <- []byte(msg.Msg)
} else {
log.Printf("No target channel found in message:msg:%s:", msg)
}
} }
} }
@ -65,42 +95,54 @@ func (c *connection) write(mt int, payload []byte) error {
// writePump pumps messages from the hub to the websocket connection. // writePump pumps messages from the hub to the websocket connection.
func (c *connection) writePump() { func (c *connection) writePump() {
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.ws.Close()
}()
for { for {
select { select {
case message, ok := <-c.send: case message, ok := <-c.send:
if !ok { if !ok {
log.Printf("error reading from send channel:shutting it down:")
c.write(websocket.CloseMessage, []byte{}) c.write(websocket.CloseMessage, []byte{})
h.unregister <- c
return return
} }
if err := c.write(websocket.TextMessage, message); err != nil { if err := c.write(websocket.TextMessage, message); err != nil {
log.Printf("error writing to channel:shutting it down:")
h.unregister <- c
return return
} else {
log.Printf("writing to channel:")
} }
case <-ticker.C: case <-ticker.C:
if err := c.write(websocket.PingMessage, []byte{}); err != nil { if err := c.write(websocket.PingMessage, []byte{}); err != nil {
log.Printf("error pinging to channel:shutting it down:")
h.unregister <- c
return return
} }
} }
} }
} }
// serveWs handles websocket requests from the peer. // serveWs handles websocket requests from the peer.
func serveWs(w http.ResponseWriter, r *http.Request) { func serveWs(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" { if r.Method != "GET" {
http.Error(w, "Method not allowed", 405) http.Error(w, "Method not allowed", 405)
return return
} }
ws, err := upgrader.Upgrade(w, r, nil) ws, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
c := &connection{send: make(chan []byte, 256), ws: ws} c := &connection{send: make(chan []byte, 256), ws: ws}
h.register <- c h.register <- c
go c.writePump()
c.readPump()
} }

View File

@ -26,13 +26,14 @@
if (!msg.val()) { if (!msg.val()) {
return false; return false;
} }
conn.send(msg.val());
conn.send( JSON.stringify({ chan: "{{ .BcPrefix }}", msg: msg.val() }) );
msg.val(""); msg.val("");
return false return false
}); });
if (window["WebSocket"]) { if (window["WebSocket"]) {
conn = new WebSocket("ws://{{$}}/ws"); conn = new WebSocket("ws://{{ .Hostname }}/ws");
conn.onclose = function(evt) { conn.onclose = function(evt) {
appendLog($("<div><b>Connection closed.</b></div>")) appendLog($("<div><b>Connection closed.</b></div>"))
} }

View File

@ -4,6 +4,10 @@
package main package main
import (
"log"
)
// hub maintains the set of active connections and broadcasts messages to the // hub maintains the set of active connections and broadcasts messages to the
// connections. // connections.
type hub struct { type hub struct {
@ -31,7 +35,15 @@ func (h *hub) run() {
for { for {
select { select {
case c := <-h.register: case c := <-h.register:
h.connections[c] = true
// start up a single reader/writer for each connection
if _, ok := h.connections[c]; !ok {
h.connections[c] = true
c.startPumps()
}
case c := <-h.unregister: case c := <-h.unregister:
if _, ok := h.connections[c]; ok { if _, ok := h.connections[c]; ok {
delete(h.connections, c) delete(h.connections, c)
@ -41,9 +53,9 @@ func (h *hub) run() {
for c := range h.connections { for c := range h.connections {
select { select {
case c.send <- m: case c.send <- m:
log.Printf("hub sending message:msg:%s:", string(m))
default: default:
close(c.send) log.Printf("hub got an unknown message")
delete(h.connections, c)
} }
} }
} }

View File

@ -24,14 +24,23 @@ func serveHome(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
homeTempl.Execute(w, r.Host)
homeTempl.Execute(w, struct {
BcPrefix string
Hostname string
}{BcPrefix: BcPrefix, Hostname: r.Host})
} }
func main() { func main() {
flag.Parse() flag.Parse()
go h.run() go h.run()
http.HandleFunc("/", serveHome) http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", serveWs) http.HandleFunc("/ws", serveWs)
err := http.ListenAndServe(*addr, nil) err := http.ListenAndServe(*addr, nil)
if err != nil { if err != nil {
log.Fatal("ListenAndServe: ", err) log.Fatal("ListenAndServe: ", err)