tile38/pkg/endpoint/disque.go

126 lines
2.4 KiB
Go
Raw Normal View History

2016-09-12 05:01:24 +03:00
package endpoint
import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
)
const (
disqueExpiresAfter = time.Second * 30
)
2018-04-19 19:25:39 +03:00
// DisqueConn is an endpoint connection
type DisqueConn struct {
2016-09-12 05:01:24 +03:00
mu sync.Mutex
ep Endpoint
ex bool
t time.Time
conn net.Conn
rd *bufio.Reader
}
2018-04-19 19:25:39 +03:00
func newDisqueConn(ep Endpoint) *DisqueConn {
return &DisqueConn{
2016-09-12 05:01:24 +03:00
ep: ep,
t: time.Now(),
}
}
2018-04-19 19:25:39 +03:00
// Expired returns true if the connection has expired
func (conn *DisqueConn) Expired() bool {
2016-09-12 05:01:24 +03:00
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
2017-01-11 09:06:19 +03:00
if time.Now().Sub(conn.t) > disqueExpiresAfter {
2016-09-12 05:01:24 +03:00
if conn.conn != nil {
conn.close()
}
conn.ex = true
}
}
return conn.ex
}
2018-04-19 19:25:39 +03:00
func (conn *DisqueConn) close() {
2016-09-12 05:01:24 +03:00
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
}
conn.rd = nil
}
2018-04-19 19:25:39 +03:00
// Send sends a message
func (conn *DisqueConn) Send(msg string) error {
2016-09-12 05:01:24 +03:00
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ex {
2017-02-10 15:27:02 +03:00
return errExpired
2016-09-12 05:01:24 +03:00
}
conn.t = time.Now()
if conn.conn == nil {
addr := fmt.Sprintf("%s:%d", conn.ep.Disque.Host, conn.ep.Disque.Port)
var err error
2016-09-12 07:09:02 +03:00
conn.conn, err = net.Dial("tcp", addr)
2016-09-12 05:01:24 +03:00
if err != nil {
return err
}
conn.rd = bufio.NewReader(conn.conn)
}
var args []string
args = append(args, "ADDJOB", conn.ep.Disque.QueueName, msg, "0")
if conn.ep.Disque.Options.Replicate > 0 {
args = append(args, "REPLICATE", strconv.FormatInt(int64(conn.ep.Disque.Options.Replicate), 10))
}
cmd := buildRedisCommand(args)
if _, err := conn.conn.Write(cmd); err != nil {
conn.close()
return err
}
c, err := conn.rd.ReadByte()
if err != nil {
conn.close()
return err
}
if c != '-' && c != '+' {
conn.close()
return errors.New("invalid disque reply")
}
ln, err := conn.rd.ReadBytes('\n')
if err != nil {
conn.close()
return err
}
if len(ln) < 2 || ln[len(ln)-2] != '\r' {
conn.close()
return errors.New("invalid disque reply")
}
id := string(ln[:len(ln)-2])
p := strings.Split(id, "-")
if len(p) != 4 {
conn.close()
return errors.New("invalid disque reply")
}
return nil
}
func buildRedisCommand(args []string) []byte {
var cmd []byte
cmd = append(cmd, '*')
2017-01-13 18:04:33 +03:00
cmd = strconv.AppendInt(cmd, int64(len(args)), 10)
2016-09-12 05:01:24 +03:00
cmd = append(cmd, '\r', '\n')
for _, arg := range args {
cmd = append(cmd, '$')
2017-01-13 18:04:33 +03:00
cmd = strconv.AppendInt(cmd, int64(len(arg)), 10)
2016-09-12 05:01:24 +03:00
cmd = append(cmd, '\r', '\n')
cmd = append(cmd, arg...)
cmd = append(cmd, '\r', '\n')
}
return cmd
}