diff --git a/pubsub.go b/pubsub.go index 650a745..d30b296 100644 --- a/pubsub.go +++ b/pubsub.go @@ -38,46 +38,40 @@ func (c *PubSubClient) consumeMessages(conn *Conn) { req := NewIfaceSliceReq() for { - for { - msg := &Message{} + msg := &Message{} - replyIface, err := req.ParseReply(conn.Rd) - if err != nil { - msg.Err = err - c.ch <- msg - return - } - reply, ok := replyIface.([]interface{}) - if !ok { - msg.Err = fmt.Errorf("redis: unexpected reply type %T", replyIface) - c.ch <- msg - return - } - - msgName := reply[0].(string) - switch msgName { - case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": - msg.Name = msgName - msg.Channel = reply[1].(string) - msg.Number = reply[2].(int64) - case "message": - msg.Name = msgName - msg.Channel = reply[1].(string) - msg.Message = reply[2].(string) - case "pmessage": - msg.Name = msgName - msg.ChannelPattern = reply[1].(string) - msg.Channel = reply[2].(string) - msg.Message = reply[3].(string) - default: - msg.Err = fmt.Errorf("Unsupported message name: %q.", msgName) - } + replyIface, err := req.ParseReply(conn.Rd) + if err != nil { + msg.Err = err c.ch <- msg - - if conn.Rd.Buffered() <= 0 { - break - } + return } + reply, ok := replyIface.([]interface{}) + if !ok { + msg.Err = fmt.Errorf("redis: unexpected reply type %T", replyIface) + c.ch <- msg + return + } + + msgName := reply[0].(string) + switch msgName { + case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": + msg.Name = msgName + msg.Channel = reply[1].(string) + msg.Number = reply[2].(int64) + case "message": + msg.Name = msgName + msg.Channel = reply[1].(string) + msg.Message = reply[2].(string) + case "pmessage": + msg.Name = msgName + msg.ChannelPattern = reply[1].(string) + msg.Channel = reply[2].(string) + msg.Message = reply[3].(string) + default: + msg.Err = fmt.Errorf("Unsupported message name: %q.", msgName) + } + c.ch <- msg } }