diff --git a/multi.go b/multi.go index 091989e..cece9ec 100644 --- a/multi.go +++ b/multi.go @@ -41,27 +41,24 @@ func (c *MultiClient) Unwatch(keys ...string) *StatusReq { } func (c *MultiClient) Discard() { - c.mtx.Lock() + c.reqsMtx.Lock() c.reqs = []Req{NewStatusReq("MULTI")} - c.mtx.Unlock() + c.reqsMtx.Unlock() } func (c *MultiClient) Exec(do func()) ([]Req, error) { c.Discard() - do() + c.Queue(NewIfaceSliceReq("EXEC")) - c.mtx.Lock() - c.reqs = append(c.reqs, NewIfaceSliceReq("EXEC")) - + c.reqsMtx.Lock() reqs := c.reqs c.reqs = nil + c.reqsMtx.Unlock() if len(reqs) == 2 { - c.mtx.Unlock() return []Req{}, nil } - c.mtx.Unlock() conn, err := c.conn() if err != nil { diff --git a/pipeline.go b/pipeline.go index d7ed0d1..de0017b 100644 --- a/pipeline.go +++ b/pipeline.go @@ -22,7 +22,9 @@ func (c *Client) Pipelined(do func(*PipelineClient)) ([]Req, error) { return nil, err } defer pc.Close() + do(pc) + return pc.RunQueued() } @@ -31,20 +33,20 @@ func (c *PipelineClient) Close() error { } func (c *PipelineClient) DiscardQueued() { - c.mtx.Lock() + c.reqsMtx.Lock() c.reqs = c.reqs[:0] - c.mtx.Unlock() + c.reqsMtx.Unlock() } func (c *PipelineClient) RunQueued() ([]Req, error) { - c.mtx.Lock() - if len(c.reqs) == 0 { - c.mtx.Unlock() - return c.reqs, nil - } + c.reqsMtx.Lock() reqs := c.reqs c.reqs = make([]Req, 0) - c.mtx.Unlock() + c.reqsMtx.Unlock() + + if len(reqs) == 0 { + return []Req{}, nil + } conn, err := c.conn() if err != nil { diff --git a/redis.go b/redis.go index 0c73909..56cae7e 100644 --- a/redis.go +++ b/redis.go @@ -54,10 +54,10 @@ func AuthSelectFunc(password string, db int64) InitConnFunc { //------------------------------------------------------------------------------ type BaseClient struct { - mtx sync.Mutex ConnPool ConnPool InitConn InitConnFunc reqs []Req + reqsMtx sync.Mutex } func (c *BaseClient) WriteReq(conn *Conn, reqs ...Req) error { @@ -138,10 +138,11 @@ func (c *BaseClient) Run(req Req) { req.SetVal(val) } +// Queues request to be executed later. func (c *BaseClient) Queue(req Req) { - c.mtx.Lock() + c.reqsMtx.Lock() c.reqs = append(c.reqs, req) - c.mtx.Unlock() + c.reqsMtx.Unlock() } func (c *BaseClient) Close() error {