tile38/vendor/github.com/eclipse/paho.mqtt.golang/fvt_client_test.go

1043 lines
26 KiB
Go

/*
* Copyright (c) 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/
package mqtt
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"testing"
"time"
)
func Test_Start(t *testing.T) {
ops := NewClientOptions().SetClientID("Start").AddBroker(FVTTCP)
c := NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.Disconnect(250)
}
/* uncomment this if you have connection policy disallowing FailClientID
func Test_InvalidConnRc(t *testing.T) {
ops := NewClientOptions().SetClientID("FailClientID").
AddBroker("tcp://" + FVT_IP + ":17003").
SetStore(NewFileStore("/tmp/fvt/InvalidConnRc"))
c := NewClient(ops)
_, err := c.Connect()
if err != ErrNotAuthorized {
t.Fatalf("Did not receive error as expected, got %v", err)
}
c.Disconnect(250)
}
*/
// Helper function for Test_Start_Ssl
func NewTLSConfig() *tls.Config {
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile("samples/samplecerts/CAfile.pem")
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
cert, err := tls.LoadX509KeyPair("samples/samplecerts/client-crt.pem", "samples/samplecerts/client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
}
}
/* uncomment this if you have ssl setup
func Test_Start_Ssl(t *testing.T) {
tlsconfig := NewTlsConfig()
ops := NewClientOptions().SetClientID("StartSsl").
AddBroker(FVT_SSL).
SetStore(NewFileStore("/tmp/fvt/Start_Ssl")).
SetTlsConfig(tlsconfig)
c := NewClient(ops)
_, err := c.Connect()
if err != nil {
t.Fatalf("Error on Client.Connect(): %v", err)
}
c.Disconnect(250)
}
*/
func Test_Publish_1(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("Publish_1")
c := NewClient(ops)
token := c.Connect()
if token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.Publish("test/Publish", 0, false, "Publish qo0")
c.Disconnect(250)
}
func Test_Publish_2(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("Publish_2")
c := NewClient(ops)
token := c.Connect()
if token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.Publish("/test/Publish", 0, false, "Publish1 qos0")
c.Publish("/test/Publish", 0, false, "Publish2 qos0")
c.Disconnect(250)
}
func Test_Publish_3(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("Publish_3")
c := NewClient(ops)
token := c.Connect()
if token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.Publish("/test/Publish", 0, false, "Publish1 qos0")
c.Publish("/test/Publish", 1, false, "Publish2 qos1")
c.Publish("/test/Publish", 2, false, "Publish2 qos2")
c.Disconnect(250)
}
func Test_Subscribe(t *testing.T) {
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("Subscribe_tx")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("Subscribe_rx")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
sToken := s.Connect()
if sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}
s.Subscribe("/test/sub", 0, nil)
pToken := p.Connect()
if pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
}
p.Publish("/test/sub", 0, false, "Publish qos0")
p.Disconnect(250)
s.Disconnect(250)
}
func Test_Will(t *testing.T) {
willmsgc := make(chan string, 1)
sops := NewClientOptions().AddBroker(FVTTCP)
sops.SetClientID("will-giver")
sops.SetWill("/wills", "good-byte!", 0, false)
sops.SetConnectionLostHandler(func(client Client, err error) {
fmt.Println("OnConnectionLost!")
})
sops.SetAutoReconnect(false)
c := NewClient(sops).(*client)
wops := NewClientOptions()
wops.AddBroker(FVTTCP)
wops.SetClientID("will-subscriber")
wops.SetDefaultPublishHandler(func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
willmsgc <- string(msg.Payload())
})
wops.SetAutoReconnect(false)
wsub := NewClient(wops)
if wToken := wsub.Connect(); wToken.Wait() && wToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", wToken.Error())
}
if wsubToken := wsub.Subscribe("/wills", 0, nil); wsubToken.Wait() && wsubToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", wsubToken.Error())
}
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.forceDisconnect()
if <-willmsgc != "good-byte!" {
t.Fatalf("will message did not have correct payload")
}
wsub.Disconnect(250)
}
func Test_CleanSession(t *testing.T) {
clsnc := make(chan string, 1)
sops := NewClientOptions().AddBroker(FVTTCP)
sops.SetClientID("clsn-sender")
sops.SetConnectionLostHandler(func(client Client, err error) {
fmt.Println("OnConnectionLost!")
})
sops.SetAutoReconnect(false)
c := NewClient(sops).(*client)
wops := NewClientOptions()
wops.AddBroker(FVTTCP)
wops.SetClientID("clsn-tester")
wops.SetCleanSession(false)
wops.SetDefaultPublishHandler(func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
clsnc <- string(msg.Payload())
})
wops.SetAutoReconnect(false)
wsub := NewClient(wops)
if wToken := wsub.Connect(); wToken.Wait() && wToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", wToken.Error())
}
if wsubToken := wsub.Subscribe("clean", 1, nil); wsubToken.Wait() && wsubToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", wsubToken.Error())
}
wsub.Disconnect(250)
time.Sleep(2 * time.Second)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if pToken := c.Publish("clean", 1, false, "clean!"); pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Publish(): %v", pToken.Error())
}
c.Disconnect(250)
wsub = NewClient(wops)
if wToken := wsub.Connect(); wToken.Wait() && wToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", wToken.Error())
}
select {
case msg := <-clsnc:
if msg != "clean!" {
t.Fatalf("will message did not have correct payload")
}
case <-time.NewTicker(5 * time.Second).C:
t.Fatalf("failed to receive publish")
}
wsub.Disconnect(250)
wops.SetCleanSession(true)
wsub = NewClient(wops)
if wToken := wsub.Connect(); wToken.Wait() && wToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", wToken.Error())
}
wsub.Disconnect(250)
}
func Test_Binary_Will(t *testing.T) {
willmsgc := make(chan []byte, 1)
will := []byte{
0xDE,
0xAD,
0xBE,
0xEF,
}
sops := NewClientOptions().AddBroker(FVTTCP)
sops.SetClientID("will-giver")
sops.SetBinaryWill("/wills", will, 0, false)
sops.SetConnectionLostHandler(func(client Client, err error) {
})
sops.SetAutoReconnect(false)
c := NewClient(sops).(*client)
wops := NewClientOptions().AddBroker(FVTTCP)
wops.SetClientID("will-subscriber")
wops.SetDefaultPublishHandler(func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %v\n", msg.Payload())
willmsgc <- msg.Payload()
})
wops.SetAutoReconnect(false)
wsub := NewClient(wops)
if wToken := wsub.Connect(); wToken.Wait() && wToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", wToken.Error())
}
if wsubToken := wsub.Subscribe("/wills", 0, nil); wsubToken.Wait() && wsubToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe() %v", wsubToken.Error())
}
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
c.forceDisconnect()
if !bytes.Equal(<-willmsgc, will) {
t.Fatalf("will message did not have correct payload")
}
wsub.Disconnect(250)
}
/**
"[...] a publisher is responsible for determining the maximum QoS a
message can be delivered at, but a subscriber is able to downgrade
the QoS to one more suitable for its usage.
The QoS of a message is never upgraded."
**/
/***********************************
* Tests to cover the 9 QoS combos *
***********************************/
func wait(c chan bool) {
fmt.Println("choke is waiting")
<-c
}
// Pub 0, Sub 0
func Test_p0s0(t *testing.T) {
topic := "/test/p0s0"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p0s0-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p0s0-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 0, false, "p0s0 payload 1")
p.Publish(topic, 0, false, "p0s0 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 0, false, "p0s0 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 0, Sub 1
func Test_p0s1(t *testing.T) {
topic := "/test/p0s1"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p0s1-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p0s1-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 0, false, "p0s1 payload 1")
p.Publish(topic, 0, false, "p0s1 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 0, false, "p0s1 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 0, Sub 2
func Test_p0s2(t *testing.T) {
topic := "/test/p0s2"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p0s2-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p0s2-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 0, false, "p0s2 payload 1")
p.Publish(topic, 0, false, "p0s2 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 0, false, "p0s2 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 1, Sub 0
func Test_p1s0(t *testing.T) {
topic := "/test/p1s0"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p1s0-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p1s0-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 1, false, "p1s0 payload 1")
p.Publish(topic, 1, false, "p1s0 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 1, false, "p1s0 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 1, Sub 1
func Test_p1s1(t *testing.T) {
topic := "/test/p1s1"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p1s1-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p1s1-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 1, false, "p1s1 payload 1")
p.Publish(topic, 1, false, "p1s1 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 1, false, "p1s1 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 1, Sub 2
func Test_p1s2(t *testing.T) {
topic := "/test/p1s2"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p1s2-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p1s2-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 1, false, "p1s2 payload 1")
p.Publish(topic, 1, false, "p1s2 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 1, false, "p1s2 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 2, Sub 0
func Test_p2s0(t *testing.T) {
topic := "/test/p2s0"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p2s0-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p2s0-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 2, false, "p2s0 payload 1")
p.Publish(topic, 2, false, "p2s0 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 2, false, "p2s0 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 2, Sub 1
func Test_p2s1(t *testing.T) {
topic := "/test/p2s1"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p2s1-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p2s1-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 2, false, "p2s1 payload 1")
p.Publish(topic, 2, false, "p2s1 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 2, false, "p2s1 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// Pub 2, Sub 2
func Test_p2s2(t *testing.T) {
topic := "/test/p2s2"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("p2s2-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("p2s2-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
p.Publish(topic, 2, false, "p2s2 payload 1")
p.Publish(topic, 2, false, "p2s2 payload 2")
wait(choke)
wait(choke)
p.Publish(topic, 2, false, "p2s2 payload 3")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
func Test_PublishMessage(t *testing.T) {
topic := "/test/pubmsg"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("pubmsg-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("pubmsg-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
if string(msg.Payload()) != "pubmsg payload" {
fmt.Println("Message payload incorrect", msg.Payload(), len("pubmsg payload"))
t.Fatalf("Message payload incorrect")
}
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if token := s.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
text := "pubmsg payload"
p.Publish(topic, 0, false, text)
p.Publish(topic, 0, false, text)
wait(choke)
wait(choke)
p.Publish(topic, 0, false, text)
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
func Test_PublishEmptyMessage(t *testing.T) {
topic := "/test/pubmsgempty"
choke := make(chan bool)
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("pubmsgempty-pub")
p := NewClient(pops)
sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetClientID("pubmsgempty-sub")
var f MessageHandler = func(client Client, msg Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
if string(msg.Payload()) != "" {
t.Fatalf("Message payload incorrect")
}
choke <- true
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}
if sToken := s.Subscribe(topic, 2, nil); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", sToken.Error())
}
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
}
p.Publish(topic, 0, false, "")
p.Publish(topic, 0, false, "")
wait(choke)
wait(choke)
p.Publish(topic, 0, false, "")
wait(choke)
p.Disconnect(250)
s.Disconnect(250)
}
// func Test_Cleanstore(t *testing.T) {
// store := "/tmp/fvt/cleanstore"
// topic := "/test/cleanstore"
// pops := NewClientOptions()
// pops.AddBroker(FVTTCP)
// pops.SetClientID("cleanstore-pub")
// pops.SetStore(NewFileStore(store + "/p"))
// p := NewClient(pops)
// var s *Client
// sops := NewClientOptions()
// sops.AddBroker(FVTTCP)
// sops.SetClientID("cleanstore-sub")
// sops.SetCleanSession(false)
// sops.SetStore(NewFileStore(store + "/s"))
// var f MessageHandler = func(client Client, msg Message) {
// fmt.Printf("TOPIC: %s\n", msg.Topic())
// fmt.Printf("MSG: %s\n", msg.Payload())
// // Close the connection after receiving
// // the first message so that hopefully
// // there is something in the store to be
// // cleaned.
// s.ForceDisconnect()
// }
// sops.SetDefaultPublishHandler(f)
// s = NewClient(sops)
// sToken := s.Connect()
// if sToken.Wait() && sToken.Error() != nil {
// t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
// }
// sToken = s.Subscribe(topic, 2, nil)
// if sToken.Wait() && sToken.Error() != nil {
// t.Fatalf("Error on Client.Subscribe(): %v", sToken.Error())
// }
// pToken := p.Connect()
// if pToken.Wait() && pToken.Error() != nil {
// t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
// }
// text := "test message"
// p.Publish(topic, 0, false, text)
// p.Publish(topic, 0, false, text)
// p.Publish(topic, 0, false, text)
// p.Disconnect(250)
// s2ops := NewClientOptions()
// s2ops.AddBroker(FVTTCP)
// s2ops.SetClientID("cleanstore-sub")
// s2ops.SetCleanSession(true)
// s2ops.SetStore(NewFileStore(store + "/s"))
// s2ops.SetDefaultPublishHandler(f)
// s2 := NewClient(s2ops)
// sToken = s2.Connect()
// if sToken.Wait() && sToken.Error() != nil {
// t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
// }
// // at this point existing state should be cleared...
// // how to check?
// }
func Test_MultipleURLs(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker("tcp://127.0.0.1:10000")
ops.AddBroker(FVTTCP)
ops.SetClientID("MultiURL")
c := NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
if pToken := c.Publish("/test/MultiURL", 0, false, "Publish qo0"); pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Publish(): %v", pToken.Error())
}
c.Disconnect(250)
}
// A test to make sure ping mechanism is working
func Test_ping1_idle5(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("p3i10")
ops.SetConnectionLostHandler(func(c Client, err error) {
t.Fatalf("Connection-lost handler was called: %s", err)
})
ops.SetKeepAlive(3 * time.Second)
c := NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
time.Sleep(8 * time.Second)
c.Disconnect(250)
}
func Test_autoreconnect(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("auto_reconnect")
ops.SetAutoReconnect(true)
ops.SetOnConnectHandler(func(c Client) {
t.Log("Connected")
})
ops.SetKeepAlive(2 * time.Second)
c := NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
time.Sleep(5 * time.Second)
fmt.Println("Breaking connection")
c.(*client).internalConnLost(fmt.Errorf("Autoreconnect test"))
time.Sleep(5 * time.Second)
if !c.IsConnected() {
t.Fail()
}
c.Disconnect(250)
}
func Test_cleanUpMids(t *testing.T) {
ops := NewClientOptions()
ops.AddBroker(FVTTCP)
ops.SetClientID("auto_reconnect")
ops.SetCleanSession(true)
ops.SetAutoReconnect(true)
ops.SetKeepAlive(10 * time.Second)
c := NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}
token := c.Publish("/test/cleanUP", 2, false, "cleanup test")
time.Sleep(10 * time.Millisecond)
fmt.Println("Breaking connection", len(c.(*client).messageIds.index))
if len(c.(*client).messageIds.index) == 0 {
t.Fatalf("Should be a token in the messageIDs, none found")
}
c.(*client).internalConnLost(fmt.Errorf("cleanup test"))
time.Sleep(5 * time.Second)
if !c.IsConnected() {
t.Fail()
}
if len(c.(*client).messageIds.index) > 0 {
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
}
if token.Error() == nil {
t.Fatal("token should have received an error on connection loss")
}
fmt.Println(token.Error())
c.Disconnect(250)
}