diff --git a/pkg/endpoint/kafka.go b/pkg/endpoint/kafka.go index 7c1ce596..e450fe0d 100644 --- a/pkg/endpoint/kafka.go +++ b/pkg/endpoint/kafka.go @@ -60,6 +60,9 @@ func (conn *KafkaConn) Send(msg string) error { cfg.Net.DialTimeout = time.Second cfg.Net.ReadTimeout = time.Second * 5 cfg.Net.WriteTimeout = time.Second * 5 + // Fix #333 : fix backward incompatibility introduced by sarama library + cfg.Producer.Return.Successes = true + c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { return err