tile38/controller/endpoint/redis.go

106 lines
1.6 KiB
Go

package endpoint
import (
"bufio"
"errors"
"fmt"
"net"
"sync"
"time"
)
const (
redisExpiresAfter = time.Second * 30
)
type RedisEndpointConn struct {
mu sync.Mutex
ep Endpoint
ex bool
t time.Time
conn net.Conn
rd *bufio.Reader
}
func newRedisEndpointConn(ep Endpoint) *RedisEndpointConn {
return &RedisEndpointConn{
ep: ep,
t: time.Now(),
}
}
func (conn *RedisEndpointConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
if time.Now().Sub(conn.t) > redisExpiresAfter {
if conn.conn != nil {
conn.close()
}
conn.ex = true
}
}
return conn.ex
}
func (conn *RedisEndpointConn) close() {
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
}
conn.rd = nil
}
func (conn *RedisEndpointConn) Send(msg string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ex {
return errors.New("expired")
}
conn.t = time.Now()
if conn.conn == nil {
addr := fmt.Sprintf("%s:%d", conn.ep.Redis.Host, conn.ep.Redis.Port)
var err error
conn.conn, err = net.Dial("tcp", addr)
if err != nil {
return err
}
conn.rd = bufio.NewReader(conn.conn)
}
var args []string
args = append(args, "PUBLISH", conn.ep.Redis.Channel, msg)
cmd := buildRedisCommand(args)
if _, err := conn.conn.Write(cmd); err != nil {
conn.close()
return err
}
c, err := conn.rd.ReadByte()
if err != nil {
conn.close()
return err
}
if c != ':' {
conn.close()
return errors.New("invalid redis reply")
}
ln, err := conn.rd.ReadBytes('\n')
if err != nil {
conn.close()
return err
}
if string(ln[0:1]) != "1" {
conn.close()
return errors.New("invalid redis reply")
}
return nil
}