forked from mirror/redis
redis: Rename mtx to reqsMtx.
This commit is contained in:
parent
ee844aaf1d
commit
dbcfb0984e
13
multi.go
13
multi.go
|
@ -41,27 +41,24 @@ func (c *MultiClient) Unwatch(keys ...string) *StatusReq {
|
|||
}
|
||||
|
||||
func (c *MultiClient) Discard() {
|
||||
c.mtx.Lock()
|
||||
c.reqsMtx.Lock()
|
||||
c.reqs = []Req{NewStatusReq("MULTI")}
|
||||
c.mtx.Unlock()
|
||||
c.reqsMtx.Unlock()
|
||||
}
|
||||
|
||||
func (c *MultiClient) Exec(do func()) ([]Req, error) {
|
||||
c.Discard()
|
||||
|
||||
do()
|
||||
c.Queue(NewIfaceSliceReq("EXEC"))
|
||||
|
||||
c.mtx.Lock()
|
||||
c.reqs = append(c.reqs, NewIfaceSliceReq("EXEC"))
|
||||
|
||||
c.reqsMtx.Lock()
|
||||
reqs := c.reqs
|
||||
c.reqs = nil
|
||||
c.reqsMtx.Unlock()
|
||||
|
||||
if len(reqs) == 2 {
|
||||
c.mtx.Unlock()
|
||||
return []Req{}, nil
|
||||
}
|
||||
c.mtx.Unlock()
|
||||
|
||||
conn, err := c.conn()
|
||||
if err != nil {
|
||||
|
|
18
pipeline.go
18
pipeline.go
|
@ -22,7 +22,9 @@ func (c *Client) Pipelined(do func(*PipelineClient)) ([]Req, error) {
|
|||
return nil, err
|
||||
}
|
||||
defer pc.Close()
|
||||
|
||||
do(pc)
|
||||
|
||||
return pc.RunQueued()
|
||||
}
|
||||
|
||||
|
@ -31,20 +33,20 @@ func (c *PipelineClient) Close() error {
|
|||
}
|
||||
|
||||
func (c *PipelineClient) DiscardQueued() {
|
||||
c.mtx.Lock()
|
||||
c.reqsMtx.Lock()
|
||||
c.reqs = c.reqs[:0]
|
||||
c.mtx.Unlock()
|
||||
c.reqsMtx.Unlock()
|
||||
}
|
||||
|
||||
func (c *PipelineClient) RunQueued() ([]Req, error) {
|
||||
c.mtx.Lock()
|
||||
if len(c.reqs) == 0 {
|
||||
c.mtx.Unlock()
|
||||
return c.reqs, nil
|
||||
}
|
||||
c.reqsMtx.Lock()
|
||||
reqs := c.reqs
|
||||
c.reqs = make([]Req, 0)
|
||||
c.mtx.Unlock()
|
||||
c.reqsMtx.Unlock()
|
||||
|
||||
if len(reqs) == 0 {
|
||||
return []Req{}, nil
|
||||
}
|
||||
|
||||
conn, err := c.conn()
|
||||
if err != nil {
|
||||
|
|
7
redis.go
7
redis.go
|
@ -54,10 +54,10 @@ func AuthSelectFunc(password string, db int64) InitConnFunc {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type BaseClient struct {
|
||||
mtx sync.Mutex
|
||||
ConnPool ConnPool
|
||||
InitConn InitConnFunc
|
||||
reqs []Req
|
||||
reqsMtx sync.Mutex
|
||||
}
|
||||
|
||||
func (c *BaseClient) WriteReq(conn *Conn, reqs ...Req) error {
|
||||
|
@ -138,10 +138,11 @@ func (c *BaseClient) Run(req Req) {
|
|||
req.SetVal(val)
|
||||
}
|
||||
|
||||
// Queues request to be executed later.
|
||||
func (c *BaseClient) Queue(req Req) {
|
||||
c.mtx.Lock()
|
||||
c.reqsMtx.Lock()
|
||||
c.reqs = append(c.reqs, req)
|
||||
c.mtx.Unlock()
|
||||
c.reqsMtx.Unlock()
|
||||
}
|
||||
|
||||
func (c *BaseClient) Close() error {
|
||||
|
|
Loading…
Reference in New Issue