diff --git a/pubsub.go b/pubsub.go index efc2354a..b43ed524 100644 --- a/pubsub.go +++ b/pubsub.go @@ -470,6 +470,12 @@ func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan in type ChannelOption func(c *channel) +func WithLogger(logger internal.Logging) ChannelOption { + return func(c *channel) { + c.logger = logger + } +} + // WithChannelSize specifies the Go chan size that is used to buffer incoming messages. // // The default is 100 messages. @@ -510,6 +516,8 @@ type channel struct { chanSize int chanSendTimeout time.Duration checkInterval time.Duration + + logger internal.Logging } func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { @@ -519,6 +527,7 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { chanSize: 100, chanSendTimeout: time.Minute, checkInterval: 3 * time.Second, + logger: internal.Logger, } for _, opt := range opts { opt(c) @@ -602,12 +611,12 @@ func (c *channel) initMsgChan() { <-timer.C } case <-timer.C: - internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", + c.logger.Printf( + ctx, "redis: %+v channel is full for %s (message is dropped)", c, c.chanSendTimeout) } default: - internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) + c.logger.Printf(ctx, "redis: unknown message type: %T", msg) } } }() @@ -656,12 +665,12 @@ func (c *channel) initAllChan() { <-timer.C } case <-timer.C: - internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", + c.logger.Printf( + ctx, "redis: %+v channel is full for %s (message is dropped)", c, c.chanSendTimeout) } default: - internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) + c.logger.Printf(ctx, "redis: unknown message type: %T", msg) } } }()