mirror of https://github.com/tidwall/tile38.git
Use WaitTimeout for MQTT
This commit is contained in:
parent
6b2025ab0c
commit
b092cea0d2
|
@ -3,9 +3,9 @@ package endpoint
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"math/rand"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
mqttExpiresAfter = time.Second * 30
|
mqttExpiresAfter = time.Second * 30
|
||||||
|
mqttPublishTimeout = time.Second * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// MQTTConn is an endpoint connection
|
// MQTTConn is an endpoint connection
|
||||||
|
@ -90,7 +91,7 @@ func (conn *MQTTConn) Send(msg string) error {
|
||||||
_, err := rand.Read(b)
|
_, err := rand.Read(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Failed to generate guid for the mqtt client. The endpoint will not work")
|
log.Debugf("Failed to generate guid for the mqtt client. The endpoint will not work")
|
||||||
return err;
|
return err
|
||||||
}
|
}
|
||||||
uuid := fmt.Sprintf("tile38-%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
|
uuid := fmt.Sprintf("tile38-%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
|
||||||
|
|
||||||
|
@ -106,9 +107,8 @@ func (conn *MQTTConn) Send(msg string) error {
|
||||||
|
|
||||||
t := conn.conn.Publish(conn.ep.MQTT.QueueName, conn.ep.MQTT.Qos,
|
t := conn.conn.Publish(conn.ep.MQTT.QueueName, conn.ep.MQTT.Qos,
|
||||||
conn.ep.MQTT.Retained, msg)
|
conn.ep.MQTT.Retained, msg)
|
||||||
t.Wait()
|
|
||||||
|
|
||||||
if t.Error() != nil {
|
if !t.WaitTimeout(mqttPublishTimeout) || t.Error() != nil {
|
||||||
conn.close()
|
conn.close()
|
||||||
return t.Error()
|
return t.Error()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue