mirror of https://github.com/go-redis/redis.git
Fix PubSub panic on concurrent Close.
This commit is contained in:
parent
f9e280505c
commit
62ce552959
12
pool.go
12
pool.go
|
@ -415,12 +415,12 @@ func (p *stickyConnPool) put() (err error) {
|
||||||
func (p *stickyConnPool) Put(cn *conn) error {
|
func (p *stickyConnPool) Put(cn *conn) error {
|
||||||
defer p.mx.Unlock()
|
defer p.mx.Unlock()
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
if p.cn != cn {
|
|
||||||
panic("p.cn != cn")
|
|
||||||
}
|
|
||||||
if p.closed {
|
if p.closed {
|
||||||
return errClosed
|
return errClosed
|
||||||
}
|
}
|
||||||
|
if p.cn != cn {
|
||||||
|
panic("p.cn != cn")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,15 +433,15 @@ func (p *stickyConnPool) remove() (err error) {
|
||||||
func (p *stickyConnPool) Remove(cn *conn) error {
|
func (p *stickyConnPool) Remove(cn *conn) error {
|
||||||
defer p.mx.Unlock()
|
defer p.mx.Unlock()
|
||||||
p.mx.Lock()
|
p.mx.Lock()
|
||||||
|
if p.closed {
|
||||||
|
return errClosed
|
||||||
|
}
|
||||||
if p.cn == nil {
|
if p.cn == nil {
|
||||||
panic("p.cn == nil")
|
panic("p.cn == nil")
|
||||||
}
|
}
|
||||||
if cn != nil && p.cn != cn {
|
if cn != nil && p.cn != cn {
|
||||||
panic("p.cn != cn")
|
panic("p.cn != cn")
|
||||||
}
|
}
|
||||||
if p.closed {
|
|
||||||
return errClosed
|
|
||||||
}
|
|
||||||
if cn == nil {
|
if cn == nil {
|
||||||
return p.remove()
|
return p.remove()
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -235,8 +235,11 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
time.Sleep(readTimeout + 100*time.Millisecond)
|
time.Sleep(readTimeout + 100*time.Millisecond)
|
||||||
n, err := client.Publish("mychannel", "hello").Result()
|
n, err := client.Publish("mychannel", "hello").Result()
|
||||||
|
@ -248,6 +251,8 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msg.Channel).To(Equal("mychannel"))
|
Expect(msg.Channel).To(Equal("mychannel"))
|
||||||
Expect(msg.Payload).To(Equal("hello"))
|
Expect(msg.Payload).To(Equal("hello"))
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should reconnect on ReceiveMessage error", func() {
|
It("should reconnect on ReceiveMessage error", func() {
|
||||||
|
@ -281,4 +286,24 @@ var _ = Describe("PubSub", func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should not panic on Close", func() {
|
||||||
|
pubsub, err := client.Subscribe("mychannel")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
defer pubsub.Close()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
_, err := pubsub.ReceiveMessage()
|
||||||
|
Expect(err).To(MatchError("redis: client is closed"))
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
err = pubsub.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue