2019-11-18 20:33:15 +03:00
/ *
* Copyright ( c ) 2013 IBM Corp .
*
* All rights reserved . This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1 .0
* which accompanies this distribution , and is available at
* http : //www.eclipse.org/legal/epl-v10.html
*
* Contributors :
* Seth Hoenig
* Allan Stockdill - Mander
* Mike Robertson
* /
package mqtt
import (
"errors"
2021-02-04 00:30:55 +03:00
"io"
2020-09-23 02:43:58 +03:00
"sync/atomic"
2019-11-18 20:33:15 +03:00
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
)
2021-02-04 00:30:55 +03:00
// keepalive - Send ping when connection unused for set period
// connection passed in to avoid race condition on shutdown
func keepalive ( c * client , conn io . Writer ) {
2020-09-23 02:43:58 +03:00
defer c . workers . Done ( )
2019-11-18 20:33:15 +03:00
DEBUG . Println ( PNG , "keepalive starting" )
2020-09-23 02:43:58 +03:00
var checkInterval int64
var pingSent time . Time
2019-11-18 20:33:15 +03:00
2020-09-23 02:43:58 +03:00
if c . options . KeepAlive > 10 {
checkInterval = 5
} else {
checkInterval = c . options . KeepAlive / 2
}
2019-11-18 20:33:15 +03:00
2020-09-23 02:43:58 +03:00
intervalTicker := time . NewTicker ( time . Duration ( checkInterval * int64 ( time . Second ) ) )
defer intervalTicker . Stop ( )
2019-11-18 20:33:15 +03:00
for {
select {
case <- c . stop :
DEBUG . Println ( PNG , "keepalive stopped" )
return
2020-09-23 02:43:58 +03:00
case <- intervalTicker . C :
lastSent := c . lastSent . Load ( ) . ( time . Time )
lastReceived := c . lastReceived . Load ( ) . ( time . Time )
DEBUG . Println ( PNG , "ping check" , time . Since ( lastSent ) . Seconds ( ) )
if time . Since ( lastSent ) >= time . Duration ( c . options . KeepAlive * int64 ( time . Second ) ) || time . Since ( lastReceived ) >= time . Duration ( c . options . KeepAlive * int64 ( time . Second ) ) {
if atomic . LoadInt32 ( & c . pingOutstanding ) == 0 {
DEBUG . Println ( PNG , "keepalive sending ping" )
ping := packets . NewControlPacket ( packets . Pingreq ) . ( * packets . PingreqPacket )
2021-02-04 00:30:55 +03:00
// We don't want to wait behind large messages being sent, the Write call
// will block until it it able to send the packet.
2020-09-23 02:43:58 +03:00
atomic . StoreInt32 ( & c . pingOutstanding , 1 )
2021-02-04 00:30:55 +03:00
if err := ping . Write ( conn ) ; err != nil {
ERROR . Println ( PNG , err )
}
2020-09-23 02:43:58 +03:00
c . lastSent . Store ( time . Now ( ) )
pingSent = time . Now ( )
}
}
2021-02-04 00:30:55 +03:00
if atomic . LoadInt32 ( & c . pingOutstanding ) > 0 && time . Since ( pingSent ) >= c . options . PingTimeout {
2020-09-23 02:43:58 +03:00
CRITICAL . Println ( PNG , "pingresp not received, disconnecting" )
2021-02-04 00:30:55 +03:00
c . internalConnLost ( errors . New ( "pingresp not received, disconnecting" ) ) // no harm in calling this if the connection is already down (or shutdown is in progress)
2020-09-23 02:43:58 +03:00
return
}
2019-11-18 20:33:15 +03:00
}
}
}