diff --git a/cluster.go b/cluster.go index 592db7b..2fb0491 100644 --- a/cluster.go +++ b/cluster.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "time" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/hashtag" "gopkg.in/redis.v4/internal/pool" ) @@ -273,13 +274,13 @@ func (c *ClusterClient) reloadSlots() { client, err := c.randomClient() if err != nil { - Logger.Printf("randomClient failed: %s", err) + internal.Logf("randomClient failed: %s", err) return } slots, err := client.ClusterSlots().Result() if err != nil { - Logger.Printf("ClusterSlots failed: %s", err) + internal.Logf("ClusterSlots failed: %s", err) return } c.setSlots(slots) @@ -306,14 +307,14 @@ func (c *ClusterClient) reaper(frequency time.Duration) { for _, client := range c.getClients() { nn, err := client.connPool.(*pool.ConnPool).ReapStaleConns() if err != nil { - Logger.Printf("ReapStaleConns failed: %s", err) + internal.Logf("ReapStaleConns failed: %s", err) } else { n += nn } } s := c.PoolStats() - Logger.Printf( + internal.Logf( "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, ) diff --git a/commands.go b/commands.go index ee2fe86..96bd137 100644 --- a/commands.go +++ b/commands.go @@ -4,6 +4,8 @@ import ( "io" "strconv" "time" + + "gopkg.in/redis.v4/internal" ) func formatInt(i int64) string { @@ -31,7 +33,7 @@ func usePrecise(dur time.Duration) bool { func formatMs(dur time.Duration) string { if dur > 0 && dur < time.Millisecond { - Logger.Printf( + internal.Logf( "specified duration is %s, but minimal supported value is %s", dur, time.Millisecond, ) @@ -41,7 +43,7 @@ func formatMs(dur time.Duration) string { func formatSec(dur time.Duration) string { if dur > 0 && dur < time.Second { - Logger.Printf( + internal.Logf( "specified duration is %s, but minimal supported value is %s", dur, time.Second, ) diff --git a/export_test.go b/export_test.go index 948cfb1..ff37e81 100644 --- a/export_test.go +++ b/export_test.go @@ -14,6 +14,6 @@ func (c *PubSub) Pool() pool.Pooler { return c.base.connPool } -func SetReceiveMessageTimeout(d time.Duration) { - receiveMessageTimeout = d +func (c *PubSub) ReceiveMessageTimeout(timeout time.Duration) (*Message, error) { + return c.receiveMessage(timeout) } diff --git a/internal/log.go b/internal/log.go new file mode 100644 index 0000000..c1cdbf4 --- /dev/null +++ b/internal/log.go @@ -0,0 +1,22 @@ +package internal + +import ( + "fmt" + "io/ioutil" + "log" +) + +var Debug bool + +var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags) + +func Debugf(s string, args ...interface{}) { + if !Debug { + return + } + Logger.Output(2, fmt.Sprintf(s, args...)) +} + +func Logf(s string, args ...interface{}) { + Logger.Output(2, fmt.Sprintf(s, args...)) +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 302a2f8..330767c 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -3,17 +3,15 @@ package pool import ( "errors" "fmt" - "io/ioutil" - "log" "net" "sync" "sync/atomic" "time" "gopkg.in/bsm/ratelimit.v1" -) -var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags) + "gopkg.in/redis.v4/internal" +) var ( ErrClosed = errors.New("redis: client is closed") @@ -210,7 +208,7 @@ func (p *ConnPool) Put(cn *Conn) error { if cn.Rd.Buffered() != 0 { b, _ := cn.Rd.Peek(cn.Rd.Buffered()) err := fmt.Errorf("connection has unread data: %q", b) - Logger.Print(err) + internal.Logf(err.Error()) return p.Remove(cn, err) } p.freeConnsMu.Lock() @@ -342,11 +340,11 @@ func (p *ConnPool) reaper(frequency time.Duration) { } n, err := p.ReapStaleConns() if err != nil { - Logger.Printf("ReapStaleConns failed: %s", err) + internal.Logf("ReapStaleConns failed: %s", err) continue } s := p.Stats() - Logger.Printf( + internal.Logf( "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)", n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts, ) diff --git a/pubsub.go b/pubsub.go index 595c9d1..aef4a76 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,11 +5,10 @@ import ( "net" "time" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/pool" ) -var receiveMessageTimeout = 5 * time.Second - // Posts a message to the given channel. func (c *Client) Publish(channel, message string) *IntCmd { req := NewIntCmd("PUBLISH", channel, message) @@ -241,9 +240,13 @@ func (c *PubSub) Receive() (interface{}, error) { // messages. It automatically reconnects to Redis Server and resubscribes // to channels in case of network errors. func (c *PubSub) ReceiveMessage() (*Message, error) { + return c.receiveMessage(5 * time.Second) +} + +func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { var errNum uint for { - msgi, err := c.ReceiveTimeout(receiveMessageTimeout) + msgi, err := c.ReceiveTimeout(timeout) if err != nil { if !isNetworkError(err) { return nil, err @@ -254,7 +257,7 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { err := c.Ping("") if err != nil { - Logger.Printf("PubSub.Ping failed: %s", err) + internal.Logf("PubSub.Ping failed: %s", err) } } } else { @@ -294,12 +297,12 @@ func (c *PubSub) resubscribe() { } if len(c.channels) > 0 { if err := c.Subscribe(c.channels...); err != nil { - Logger.Printf("Subscribe failed: %s", err) + internal.Logf("Subscribe failed: %s", err) } } if len(c.patterns) > 0 { if err := c.PSubscribe(c.patterns...); err != nil { - Logger.Printf("PSubscribe failed: %s", err) + internal.Logf("PSubscribe failed: %s", err) } } } diff --git a/pubsub_test.go b/pubsub_test.go index 11b14b4..116099b 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -256,8 +256,7 @@ var _ = Describe("PubSub", func() { }) It("should ReceiveMessage after timeout", func() { - timeout := time.Second - redis.SetReceiveMessageTimeout(timeout) + timeout := 100 * time.Millisecond pubsub, err := client.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) @@ -276,7 +275,7 @@ var _ = Describe("PubSub", func() { Expect(n).To(Equal(int64(1))) }() - msg, err := pubsub.ReceiveMessage() + msg, err := pubsub.ReceiveMessageTimeout(timeout) Expect(err).NotTo(HaveOccurred()) Expect(msg.Channel).To(Equal("mychannel")) Expect(msg.Payload).To(Equal("hello")) diff --git a/redis.go b/redis.go index 3e37a00..be3aa02 100644 --- a/redis.go +++ b/redis.go @@ -2,18 +2,16 @@ package redis // import "gopkg.in/redis.v4" import ( "fmt" - "io/ioutil" "log" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/pool" ) -// Deprecated. Use SetLogger instead. -var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags) +var Logger *log.Logger func SetLogger(logger *log.Logger) { - Logger = logger - pool.Logger = logger + internal.Logger = logger } type baseClient struct { diff --git a/ring.go b/ring.go index 0cd3121..cac25d4 100644 --- a/ring.go +++ b/ring.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/consistenthash" "gopkg.in/redis.v4/internal/hashtag" "gopkg.in/redis.v4/internal/pool" @@ -204,7 +205,7 @@ func (ring *Ring) heartbeat() { for _, shard := range ring.shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { - Logger.Printf("ring shard state changed: %s", shard) + internal.Logf("ring shard state changed: %s", shard) rebalance = true } } diff --git a/sentinel.go b/sentinel.go index cf8e838..53c40a9 100644 --- a/sentinel.go +++ b/sentinel.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/pool" ) @@ -165,11 +166,11 @@ func (d *sentinelFailover) MasterAddr() (string, error) { if d.sentinel != nil { addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() if err != nil { - Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) + internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) d._resetSentinel() } else { addr := net.JoinHostPort(addr[0], addr[1]) - Logger.Printf("sentinel: %q addr is %s", d.masterName, addr) + internal.Logf("sentinel: %q addr is %s", d.masterName, addr) return addr, nil } } @@ -188,7 +189,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) { }) masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() if err != nil { - Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) + internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) sentinel.Close() continue } @@ -198,7 +199,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) { d.setSentinel(sentinel) addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) - Logger.Printf("sentinel: %q addr is %s", d.masterName, addr) + internal.Logf("sentinel: %q addr is %s", d.masterName, addr) return addr, nil } @@ -230,7 +231,7 @@ func (d *sentinelFailover) _resetSentinel() error { func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { sentinels, err := sentinel.Sentinels(d.masterName).Result() if err != nil { - Logger.Printf("sentinel: Sentinels %q failed: %s", d.masterName, err) + internal.Logf("sentinel: Sentinels %q failed: %s", d.masterName, err) return } for _, sentinel := range sentinels { @@ -240,7 +241,7 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { if key == "name" { sentinelAddr := vals[i+1].(string) if !contains(d.sentinelAddrs, sentinelAddr) { - Logger.Printf( + internal.Logf( "sentinel: discovered new %q sentinel: %s", d.masterName, sentinelAddr, ) @@ -268,7 +269,7 @@ func (d *sentinelFailover) closeOldConns(newMaster string) { "sentinel: closing connection to the old master %s", cn.RemoteAddr(), ) - Logger.Print(err) + internal.Logf(err.Error()) d.pool.Remove(cn, err) } else { cnsToPut = append(cnsToPut, cn) @@ -286,7 +287,7 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) { if pubsub == nil { pubsub = sentinel.PubSub() if err := pubsub.Subscribe("+switch-master"); err != nil { - Logger.Printf("sentinel: Subscribe failed: %s", err) + internal.Logf("sentinel: Subscribe failed: %s", err) d.resetSentinel() return } @@ -294,7 +295,7 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) { msg, err := pubsub.ReceiveMessage() if err != nil { - Logger.Printf("sentinel: ReceiveMessage failed: %s", err) + internal.Logf("sentinel: ReceiveMessage failed: %s", err) pubsub.Close() d.resetSentinel() return @@ -304,12 +305,12 @@ func (d *sentinelFailover) listen(sentinel *sentinelClient) { case "+switch-master": parts := strings.Split(msg.Payload, " ") if parts[0] != d.masterName { - Logger.Printf("sentinel: ignore new %s addr", parts[0]) + internal.Logf("sentinel: ignore new %s addr", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - Logger.Printf( + internal.Logf( "sentinel: new %q addr is %s", d.masterName, addr, ) diff --git a/tx.go b/tx.go index 3273e2c..5274af4 100644 --- a/tx.go +++ b/tx.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "gopkg.in/redis.v4/internal" "gopkg.in/redis.v4/internal/pool" ) @@ -58,7 +59,7 @@ func (tx *Tx) process(cmd Cmder) { func (tx *Tx) Close() error { tx.closed = true if err := tx.Unwatch().Err(); err != nil { - Logger.Printf("Unwatch failed: %s", err) + internal.Logf("Unwatch failed: %s", err) } return tx.base.Close() }