mirror of https://github.com/tidwall/tile38.git
Added go-nats to vendor
This commit is contained in:
parent
53271ebad6
commit
4cae040470
|
@ -0,0 +1,20 @@
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2012-2017 Apcera Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
this software and associated documentation files (the "Software"), to deal in
|
||||||
|
the Software without restriction, including without limitation the rights to
|
||||||
|
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
@ -0,0 +1,350 @@
|
||||||
|
# NATS - Go Client
|
||||||
|
A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io).
|
||||||
|
|
||||||
|
[![License MIT](https://img.shields.io/badge/License-MIT-blue.svg)](http://opensource.org/licenses/MIT)
|
||||||
|
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/go-nats)](https://goreportcard.com/report/github.com/nats-io/go-nats) [![Build Status](https://travis-ci.org/nats-io/go-nats.svg?branch=master)](http://travis-ci.org/nats-io/go-nats) [![GoDoc](https://godoc.org/github.com/nats-io/go-nats?status.svg)](http://godoc.org/github.com/nats-io/go-nats) [![Coverage Status](https://coveralls.io/repos/nats-io/go-nats/badge.svg?branch=master)](https://coveralls.io/r/nats-io/go-nats?branch=master)
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Go client
|
||||||
|
go get github.com/nats-io/go-nats
|
||||||
|
|
||||||
|
# Server
|
||||||
|
go get github.com/nats-io/gnatsd
|
||||||
|
```
|
||||||
|
|
||||||
|
## Basic Usage
|
||||||
|
|
||||||
|
```go
|
||||||
|
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
|
||||||
|
// Simple Publisher
|
||||||
|
nc.Publish("foo", []byte("Hello World"))
|
||||||
|
|
||||||
|
// Simple Async Subscriber
|
||||||
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
||||||
|
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||||
|
})
|
||||||
|
|
||||||
|
// Simple Sync Subscriber
|
||||||
|
sub, err := nc.SubscribeSync("foo")
|
||||||
|
m, err := sub.NextMsg(timeout)
|
||||||
|
|
||||||
|
// Channel Subscriber
|
||||||
|
ch := make(chan *nats.Msg, 64)
|
||||||
|
sub, err := nc.ChanSubscribe("foo", ch)
|
||||||
|
msg := <- ch
|
||||||
|
|
||||||
|
// Unsubscribe
|
||||||
|
sub.Unsubscribe()
|
||||||
|
|
||||||
|
// Requests
|
||||||
|
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)
|
||||||
|
|
||||||
|
// Replies
|
||||||
|
nc.Subscribe("help", func(m *Msg) {
|
||||||
|
nc.Publish(m.Reply, []byte("I can help!"))
|
||||||
|
})
|
||||||
|
|
||||||
|
// Close connection
|
||||||
|
nc, _ := nats.Connect("nats://localhost:4222")
|
||||||
|
nc.Close();
|
||||||
|
```
|
||||||
|
|
||||||
|
## Encoded Connections
|
||||||
|
|
||||||
|
```go
|
||||||
|
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// Simple Publisher
|
||||||
|
c.Publish("foo", "Hello World")
|
||||||
|
|
||||||
|
// Simple Async Subscriber
|
||||||
|
c.Subscribe("foo", func(s string) {
|
||||||
|
fmt.Printf("Received a message: %s\n", s)
|
||||||
|
})
|
||||||
|
|
||||||
|
// EncodedConn can Publish any raw Go type using the registered Encoder
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go type Subscriber
|
||||||
|
c.Subscribe("hello", func(p *person) {
|
||||||
|
fmt.Printf("Received a person: %+v\n", p)
|
||||||
|
})
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}
|
||||||
|
|
||||||
|
// Go type Publisher
|
||||||
|
c.Publish("hello", me)
|
||||||
|
|
||||||
|
// Unsubscribe
|
||||||
|
sub, err := c.Subscribe("foo", nil)
|
||||||
|
...
|
||||||
|
sub.Unsubscribe()
|
||||||
|
|
||||||
|
// Requests
|
||||||
|
var response string
|
||||||
|
err := c.Request("help", "help me", &response, 10*time.Millisecond)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Request failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replying
|
||||||
|
c.Subscribe("help", func(subj, reply string, msg string) {
|
||||||
|
c.Publish(reply, "I can help!")
|
||||||
|
})
|
||||||
|
|
||||||
|
// Close connection
|
||||||
|
c.Close();
|
||||||
|
```
|
||||||
|
|
||||||
|
## TLS
|
||||||
|
|
||||||
|
```go
|
||||||
|
// tls as a scheme will enable secure connections by default. This will also verify the server name.
|
||||||
|
nc, err := nats.Connect("tls://nats.demo.io:4443")
|
||||||
|
|
||||||
|
// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
|
||||||
|
// We provide a helper method to make this case easier.
|
||||||
|
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
|
||||||
|
|
||||||
|
// If the server requires client certificate, there is an helper function for that too:
|
||||||
|
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
|
||||||
|
nc, err = nats.Connect("tls://localhost:4443", cert)
|
||||||
|
|
||||||
|
// You can also supply a complete tls.Config
|
||||||
|
|
||||||
|
certFile := "./configs/certs/client-cert.pem"
|
||||||
|
keyFile := "./configs/certs/client-key.pem"
|
||||||
|
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error parsing X509 certificate/key pair: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
config := &tls.Config{
|
||||||
|
ServerName: opts.Host,
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: pool,
|
||||||
|
MinVersion: tls.VersionTLS12,
|
||||||
|
}
|
||||||
|
|
||||||
|
nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Using Go Channels (netchan)
|
||||||
|
|
||||||
|
```go
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||||
|
defer ec.Close()
|
||||||
|
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
recvCh := make(chan *person)
|
||||||
|
ec.BindRecvChan("hello", recvCh)
|
||||||
|
|
||||||
|
sendCh := make(chan *person)
|
||||||
|
ec.BindSendChan("hello", sendCh)
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}
|
||||||
|
|
||||||
|
// Send via Go channels
|
||||||
|
sendCh <- me
|
||||||
|
|
||||||
|
// Receive via Go channels
|
||||||
|
who := <- recvCh
|
||||||
|
```
|
||||||
|
|
||||||
|
## Wildcard Subscriptions
|
||||||
|
|
||||||
|
```go
|
||||||
|
|
||||||
|
// "*" matches any token, at any level of the subject.
|
||||||
|
nc.Subscribe("foo.*.baz", func(m *Msg) {
|
||||||
|
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||||
|
})
|
||||||
|
|
||||||
|
nc.Subscribe("foo.bar.*", func(m *Msg) {
|
||||||
|
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||||
|
})
|
||||||
|
|
||||||
|
// ">" matches any length of the tail of a subject, and can only be the last token
|
||||||
|
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
|
||||||
|
nc.Subscribe("foo.>", func(m *Msg) {
|
||||||
|
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
|
||||||
|
})
|
||||||
|
|
||||||
|
// Matches all of the above
|
||||||
|
nc.Publish("foo.bar.baz", []byte("Hello World"))
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Queue Groups
|
||||||
|
|
||||||
|
```go
|
||||||
|
// All subscriptions with the same queue name will form a queue group.
|
||||||
|
// Each message will be delivered to only one subscriber per queue group,
|
||||||
|
// using queuing semantics. You can have as many queue groups as you wish.
|
||||||
|
// Normal subscribers will continue to work as expected.
|
||||||
|
|
||||||
|
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
|
||||||
|
received += 1;
|
||||||
|
})
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Advanced Usage
|
||||||
|
|
||||||
|
```go
|
||||||
|
|
||||||
|
// Flush connection to server, returns when all messages have been processed.
|
||||||
|
nc.Flush()
|
||||||
|
fmt.Println("All clear!")
|
||||||
|
|
||||||
|
// FlushTimeout specifies a timeout value as well.
|
||||||
|
err := nc.FlushTimeout(1*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("All clear!")
|
||||||
|
} else {
|
||||||
|
fmt.Println("Flushed timed out!")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Auto-unsubscribe after MAX_WANTED messages received
|
||||||
|
const MAX_WANTED = 10
|
||||||
|
sub, err := nc.Subscribe("foo")
|
||||||
|
sub.AutoUnsubscribe(MAX_WANTED)
|
||||||
|
|
||||||
|
// Multiple connections
|
||||||
|
nc1 := nats.Connect("nats://host1:4222")
|
||||||
|
nc2 := nats.Connect("nats://host2:4222")
|
||||||
|
|
||||||
|
nc1.Subscribe("foo", func(m *Msg) {
|
||||||
|
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||||
|
})
|
||||||
|
|
||||||
|
nc2.Publish("foo", []byte("Hello World!"));
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Clustered Usage
|
||||||
|
|
||||||
|
```go
|
||||||
|
|
||||||
|
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
|
||||||
|
|
||||||
|
nc, err := nats.Connect(servers)
|
||||||
|
|
||||||
|
// Optionally set ReconnectWait and MaxReconnect attempts.
|
||||||
|
// This example means 10 seconds total per backend.
|
||||||
|
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
|
||||||
|
|
||||||
|
// Optionally disable randomization of the server pool
|
||||||
|
nc, err = nats.Connect(servers, nats.DontRandomize())
|
||||||
|
|
||||||
|
// Setup callbacks to be notified on disconnects, reconnects and connection closed.
|
||||||
|
nc, err = nats.Connect(servers,
|
||||||
|
nats.DisconnectHandler(func(nc *nats.Conn) {
|
||||||
|
fmt.Printf("Got disconnected!\n")
|
||||||
|
}),
|
||||||
|
nats.ReconnectHandler(func(_ *nats.Conn) {
|
||||||
|
fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
|
||||||
|
}),
|
||||||
|
nats.ClosedHandler(func(nc *nats.Conn) {
|
||||||
|
fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
// When connecting to a mesh of servers with auto-discovery capabilities,
|
||||||
|
// you may need to provide a username/password or token in order to connect
|
||||||
|
// to any server in that mesh when authentication is required.
|
||||||
|
// Instead of providing the credentials in the initial URL, you will use
|
||||||
|
// new option setters:
|
||||||
|
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
|
||||||
|
|
||||||
|
// For token based authentication:
|
||||||
|
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
|
||||||
|
|
||||||
|
// You can even pass the two at the same time in case one of the server
|
||||||
|
// in the mesh requires token instead of user name and password.
|
||||||
|
nc, err = nats.Connect("nats://localhost:4222",
|
||||||
|
nats.UserInfo("foo", "bar"),
|
||||||
|
nats.Token("S3cretT0ken"))
|
||||||
|
|
||||||
|
// Note that if credentials are specified in the initial URLs, they take
|
||||||
|
// precedence on the credentials specfied through the options.
|
||||||
|
// For instance, in the connect call below, the client library will use
|
||||||
|
// the user "my" and password "pwd" to connect to locahost:4222, however,
|
||||||
|
// it will use username "foo" and password "bar" when (re)connecting to
|
||||||
|
// a different server URL that it got as part of the auto-discovery.
|
||||||
|
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Context support (+Go 1.7)
|
||||||
|
|
||||||
|
```go
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
|
||||||
|
// Request with context
|
||||||
|
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
|
||||||
|
|
||||||
|
// Synchronous subscriber with context
|
||||||
|
sub, err := nc.SubscribeSync("foo")
|
||||||
|
msg, err := sub.NextMsgWithContext(ctx)
|
||||||
|
|
||||||
|
// Encoded Request with context
|
||||||
|
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||||
|
type request struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
type response struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
}
|
||||||
|
req := &request{Message: "Hello"}
|
||||||
|
resp := &response{}
|
||||||
|
err := c.RequestWithContext(ctx, "foo", req, resp)
|
||||||
|
```
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
(The MIT License)
|
||||||
|
|
||||||
|
Copyright (c) 2012-2017 Apcera Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to
|
||||||
|
deal in the Software without restriction, including without limitation the
|
||||||
|
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||||
|
sell copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||||
|
IN THE SOFTWARE.
|
|
@ -0,0 +1,26 @@
|
||||||
|
|
||||||
|
- [ ] Better constructors, options handling
|
||||||
|
- [ ] Functions for callback settings after connection created.
|
||||||
|
- [ ] Better options for subscriptions. Slow Consumer state settable, Go routines vs Inline.
|
||||||
|
- [ ] Move off of channels for subscribers, use syncPool linkedLists, etc with highwater.
|
||||||
|
- [ ] Test for valid subjects on publish and subscribe?
|
||||||
|
- [ ] SyncSubscriber and Next for EncodedConn
|
||||||
|
- [ ] Fast Publisher?
|
||||||
|
- [ ] pooling for structs used? leaky bucket?
|
||||||
|
- [ ] Timeout 0 should work as no timeout
|
||||||
|
- [x] Ping timer
|
||||||
|
- [x] Name in Connect for gnatsd
|
||||||
|
- [x] Asynchronous error handling
|
||||||
|
- [x] Parser rewrite
|
||||||
|
- [x] Reconnect
|
||||||
|
- [x] Hide Lock
|
||||||
|
- [x] Easier encoder interface
|
||||||
|
- [x] QueueSubscribeSync
|
||||||
|
- [x] Make nats specific errors prefixed with 'nats:'
|
||||||
|
- [x] API test for closed connection
|
||||||
|
- [x] TLS/SSL
|
||||||
|
- [x] Stats collection
|
||||||
|
- [x] Disconnect detection
|
||||||
|
- [x] Optimized Publish (coalescing)
|
||||||
|
- [x] Do Examples via Go style
|
||||||
|
- [x] Standardized Errors
|
|
@ -0,0 +1,166 @@
|
||||||
|
// Copyright 2012-2017 Apcera Inc. All rights reserved.
|
||||||
|
|
||||||
|
// +build go1.7
|
||||||
|
|
||||||
|
// A Go client for the NATS messaging system (https://nats.io).
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequestWithContext takes a context, a subject and payload
|
||||||
|
// in bytes and request expecting a single response.
|
||||||
|
func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||||
|
if ctx == nil {
|
||||||
|
return nil, ErrInvalidContext
|
||||||
|
}
|
||||||
|
if nc == nil {
|
||||||
|
return nil, ErrInvalidConnection
|
||||||
|
}
|
||||||
|
|
||||||
|
nc.mu.Lock()
|
||||||
|
// If user wants the old style.
|
||||||
|
if nc.Opts.UseOldRequestStyle {
|
||||||
|
nc.mu.Unlock()
|
||||||
|
return nc.oldRequestWithContext(ctx, subj, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do setup for the new style.
|
||||||
|
if nc.respMap == nil {
|
||||||
|
// _INBOX wildcard
|
||||||
|
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
|
||||||
|
nc.respMap = make(map[string]chan *Msg)
|
||||||
|
}
|
||||||
|
// Create literal Inbox and map to a chan msg.
|
||||||
|
mch := make(chan *Msg, RequestChanLen)
|
||||||
|
respInbox := nc.newRespInbox()
|
||||||
|
token := respToken(respInbox)
|
||||||
|
nc.respMap[token] = mch
|
||||||
|
createSub := nc.respMux == nil
|
||||||
|
ginbox := nc.respSub
|
||||||
|
nc.mu.Unlock()
|
||||||
|
|
||||||
|
if createSub {
|
||||||
|
// Make sure scoped subscription is setup only once.
|
||||||
|
var err error
|
||||||
|
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := nc.PublishRequest(subj, respInbox, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
var msg *Msg
|
||||||
|
|
||||||
|
select {
|
||||||
|
case msg, ok = <-mch:
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrConnectionClosed
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
nc.mu.Lock()
|
||||||
|
delete(nc.respMap, token)
|
||||||
|
nc.mu.Unlock()
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// oldRequestWithContext utilizes inbox and subscription per request.
|
||||||
|
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
|
||||||
|
inbox := NewInbox()
|
||||||
|
ch := make(chan *Msg, RequestChanLen)
|
||||||
|
|
||||||
|
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.AutoUnsubscribe(1)
|
||||||
|
defer s.Unsubscribe()
|
||||||
|
|
||||||
|
err = nc.PublishRequest(subj, inbox, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.NextMsgWithContext(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextMsgWithContext takes a context and returns the next message
|
||||||
|
// available to a synchronous subscriber, blocking until it is delivered
|
||||||
|
// or context gets canceled.
|
||||||
|
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
|
||||||
|
if ctx == nil {
|
||||||
|
return nil, ErrInvalidContext
|
||||||
|
}
|
||||||
|
if s == nil {
|
||||||
|
return nil, ErrBadSubscription
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
err := s.validateNextMsgState()
|
||||||
|
if err != nil {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshot
|
||||||
|
mch := s.mch
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
var msg *Msg
|
||||||
|
|
||||||
|
select {
|
||||||
|
case msg, ok = <-mch:
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrConnectionClosed
|
||||||
|
}
|
||||||
|
err := s.processNextMsgDelivered(msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestWithContext will create an Inbox and perform a Request
|
||||||
|
// using the provided cancellation context with the Inbox reply
|
||||||
|
// for the data v. A response will be decoded into the vPtrResponse.
|
||||||
|
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error {
|
||||||
|
if ctx == nil {
|
||||||
|
return ErrInvalidContext
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := c.Enc.Encode(subject, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m, err := c.Conn.RequestWithContext(ctx, subject, b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if reflect.TypeOf(vPtr) == emptyMsgType {
|
||||||
|
mPtr := vPtr.(*Msg)
|
||||||
|
*mPtr = *m
|
||||||
|
} else {
|
||||||
|
err := c.Enc.Decode(m.Subject, m.Data, vPtr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,249 @@
|
||||||
|
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||||
|
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
// Default Encoders
|
||||||
|
. "github.com/nats-io/go-nats/encoders/builtin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Encoder interface is for all register encoders
|
||||||
|
type Encoder interface {
|
||||||
|
Encode(subject string, v interface{}) ([]byte, error)
|
||||||
|
Decode(subject string, data []byte, vPtr interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var encMap map[string]Encoder
|
||||||
|
var encLock sync.Mutex
|
||||||
|
|
||||||
|
// Indexe names into the Registered Encoders.
|
||||||
|
const (
|
||||||
|
JSON_ENCODER = "json"
|
||||||
|
GOB_ENCODER = "gob"
|
||||||
|
DEFAULT_ENCODER = "default"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
encMap = make(map[string]Encoder)
|
||||||
|
// Register json, gob and default encoder
|
||||||
|
RegisterEncoder(JSON_ENCODER, &JsonEncoder{})
|
||||||
|
RegisterEncoder(GOB_ENCODER, &GobEncoder{})
|
||||||
|
RegisterEncoder(DEFAULT_ENCODER, &DefaultEncoder{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
|
||||||
|
// a nats server and have an extendable encoder system that will encode and decode messages
|
||||||
|
// from raw Go types.
|
||||||
|
type EncodedConn struct {
|
||||||
|
Conn *Conn
|
||||||
|
Enc Encoder
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
|
||||||
|
// encoder.
|
||||||
|
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
|
||||||
|
if c == nil {
|
||||||
|
return nil, errors.New("nats: Nil Connection")
|
||||||
|
}
|
||||||
|
if c.IsClosed() {
|
||||||
|
return nil, ErrConnectionClosed
|
||||||
|
}
|
||||||
|
ec := &EncodedConn{Conn: c, Enc: EncoderForType(encType)}
|
||||||
|
if ec.Enc == nil {
|
||||||
|
return nil, fmt.Errorf("No encoder registered for '%s'", encType)
|
||||||
|
}
|
||||||
|
return ec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterEncoder will register the encType with the given Encoder. Useful for customization.
|
||||||
|
func RegisterEncoder(encType string, enc Encoder) {
|
||||||
|
encLock.Lock()
|
||||||
|
defer encLock.Unlock()
|
||||||
|
encMap[encType] = enc
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncoderForType will return the registered Encoder for the encType.
|
||||||
|
func EncoderForType(encType string) Encoder {
|
||||||
|
encLock.Lock()
|
||||||
|
defer encLock.Unlock()
|
||||||
|
return encMap[encType]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish publishes the data argument to the given subject. The data argument
|
||||||
|
// will be encoded using the associated encoder.
|
||||||
|
func (c *EncodedConn) Publish(subject string, v interface{}) error {
|
||||||
|
b, err := c.Enc.Encode(subject, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.Conn.publish(subject, _EMPTY_, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishRequest will perform a Publish() expecting a response on the
|
||||||
|
// reply subject. Use Request() for automatically waiting for a response
|
||||||
|
// inline.
|
||||||
|
func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
|
||||||
|
b, err := c.Enc.Encode(subject, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.Conn.publish(subject, reply, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request will create an Inbox and perform a Request() call
|
||||||
|
// with the Inbox reply for the data v. A response will be
|
||||||
|
// decoded into the vPtrResponse.
|
||||||
|
func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
|
||||||
|
b, err := c.Enc.Encode(subject, v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m, err := c.Conn.Request(subject, b, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if reflect.TypeOf(vPtr) == emptyMsgType {
|
||||||
|
mPtr := vPtr.(*Msg)
|
||||||
|
*mPtr = *m
|
||||||
|
} else {
|
||||||
|
err = c.Enc.Decode(m.Subject, m.Data, vPtr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler is a specific callback used for Subscribe. It is generalized to
|
||||||
|
// an interface{}, but we will discover its format and arguments at runtime
|
||||||
|
// and perform the correct callback, including de-marshaling JSON strings
|
||||||
|
// back into the appropriate struct based on the signature of the Handler.
|
||||||
|
//
|
||||||
|
// Handlers are expected to have one of four signatures.
|
||||||
|
//
|
||||||
|
// type person struct {
|
||||||
|
// Name string `json:"name,omitempty"`
|
||||||
|
// Age uint `json:"age,omitempty"`
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// handler := func(m *Msg)
|
||||||
|
// handler := func(p *person)
|
||||||
|
// handler := func(subject string, o *obj)
|
||||||
|
// handler := func(subject, reply string, o *obj)
|
||||||
|
//
|
||||||
|
// These forms allow a callback to request a raw Msg ptr, where the processing
|
||||||
|
// of the message from the wire is untouched. Process a JSON representation
|
||||||
|
// and demarshal it into the given struct, e.g. person.
|
||||||
|
// There are also variants where the callback wants either the subject, or the
|
||||||
|
// subject and the reply subject.
|
||||||
|
type Handler interface{}
|
||||||
|
|
||||||
|
// Dissect the cb Handler's signature
|
||||||
|
func argInfo(cb Handler) (reflect.Type, int) {
|
||||||
|
cbType := reflect.TypeOf(cb)
|
||||||
|
if cbType.Kind() != reflect.Func {
|
||||||
|
panic("nats: Handler needs to be a func")
|
||||||
|
}
|
||||||
|
numArgs := cbType.NumIn()
|
||||||
|
if numArgs == 0 {
|
||||||
|
return nil, numArgs
|
||||||
|
}
|
||||||
|
return cbType.In(numArgs - 1), numArgs
|
||||||
|
}
|
||||||
|
|
||||||
|
var emptyMsgType = reflect.TypeOf(&Msg{})
|
||||||
|
|
||||||
|
// Subscribe will create a subscription on the given subject and process incoming
|
||||||
|
// messages using the specified Handler. The Handler should be a func that matches
|
||||||
|
// a signature from the description of Handler from above.
|
||||||
|
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
|
||||||
|
return c.subscribe(subject, _EMPTY_, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueSubscribe will create a queue subscription on the given subject and process
|
||||||
|
// incoming messages using the specified Handler. The Handler should be a func that
|
||||||
|
// matches a signature from the description of Handler from above.
|
||||||
|
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
|
||||||
|
return c.subscribe(subject, queue, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal implementation that all public functions will use.
|
||||||
|
func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
|
||||||
|
if cb == nil {
|
||||||
|
return nil, errors.New("nats: Handler required for EncodedConn Subscription")
|
||||||
|
}
|
||||||
|
argType, numArgs := argInfo(cb)
|
||||||
|
if argType == nil {
|
||||||
|
return nil, errors.New("nats: Handler requires at least one argument")
|
||||||
|
}
|
||||||
|
|
||||||
|
cbValue := reflect.ValueOf(cb)
|
||||||
|
wantsRaw := (argType == emptyMsgType)
|
||||||
|
|
||||||
|
natsCB := func(m *Msg) {
|
||||||
|
var oV []reflect.Value
|
||||||
|
if wantsRaw {
|
||||||
|
oV = []reflect.Value{reflect.ValueOf(m)}
|
||||||
|
} else {
|
||||||
|
var oPtr reflect.Value
|
||||||
|
if argType.Kind() != reflect.Ptr {
|
||||||
|
oPtr = reflect.New(argType)
|
||||||
|
} else {
|
||||||
|
oPtr = reflect.New(argType.Elem())
|
||||||
|
}
|
||||||
|
if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
|
||||||
|
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||||
|
c.Conn.ach <- func() {
|
||||||
|
c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if argType.Kind() != reflect.Ptr {
|
||||||
|
oPtr = reflect.Indirect(oPtr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Callback Arity
|
||||||
|
switch numArgs {
|
||||||
|
case 1:
|
||||||
|
oV = []reflect.Value{oPtr}
|
||||||
|
case 2:
|
||||||
|
subV := reflect.ValueOf(m.Subject)
|
||||||
|
oV = []reflect.Value{subV, oPtr}
|
||||||
|
case 3:
|
||||||
|
subV := reflect.ValueOf(m.Subject)
|
||||||
|
replyV := reflect.ValueOf(m.Reply)
|
||||||
|
oV = []reflect.Value{subV, replyV, oPtr}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
cbValue.Call(oV)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Conn.subscribe(subject, queue, natsCB, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlushTimeout allows a Flush operation to have an associated timeout.
|
||||||
|
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
|
||||||
|
return c.Conn.FlushTimeout(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush will perform a round trip to the server and return when it
|
||||||
|
// receives the internal reply.
|
||||||
|
func (c *EncodedConn) Flush() error {
|
||||||
|
return c.Conn.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close will close the connection to the server. This call will release
|
||||||
|
// all blocking calls, such as Flush(), etc.
|
||||||
|
func (c *EncodedConn) Close() {
|
||||||
|
c.Conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastError reports the last error encountered via the Connection.
|
||||||
|
func (c *EncodedConn) LastError() error {
|
||||||
|
return c.Conn.err
|
||||||
|
}
|
|
@ -0,0 +1,257 @@
|
||||||
|
package nats_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/nats-io/go-nats"
|
||||||
|
"github.com/nats-io/go-nats/encoders/protobuf"
|
||||||
|
"github.com/nats-io/go-nats/encoders/protobuf/testdata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Since we import above nats packages, we need to have a different
|
||||||
|
// const name than TEST_PORT that we used on the other packages.
|
||||||
|
const ENC_TEST_PORT = 8268
|
||||||
|
|
||||||
|
var options = Options{
|
||||||
|
Url: fmt.Sprintf("nats://localhost:%d", ENC_TEST_PORT),
|
||||||
|
AllowReconnect: true,
|
||||||
|
MaxReconnect: 10,
|
||||||
|
ReconnectWait: 100 * time.Millisecond,
|
||||||
|
Timeout: DefaultTimeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Encoded connection tests
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
func TestPublishErrorAfterSubscribeDecodeError(t *testing.T) {
|
||||||
|
ts := RunServerOnPort(ENC_TEST_PORT)
|
||||||
|
defer ts.Shutdown()
|
||||||
|
opts := options
|
||||||
|
nc, _ := opts.Connect()
|
||||||
|
defer nc.Close()
|
||||||
|
c, _ := NewEncodedConn(nc, JSON_ENCODER)
|
||||||
|
|
||||||
|
//Test message type
|
||||||
|
type Message struct {
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
const testSubj = "test"
|
||||||
|
|
||||||
|
c.Subscribe(testSubj, func(msg *Message) {})
|
||||||
|
|
||||||
|
//Publish invalid json to catch decode error in subscription callback
|
||||||
|
c.Publish(testSubj, `foo`)
|
||||||
|
c.Flush()
|
||||||
|
|
||||||
|
//Next publish should be successful
|
||||||
|
if err := c.Publish(testSubj, Message{"2"}); err != nil {
|
||||||
|
t.Error("Fail to send correct json message after decode error in subscription")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPublishErrorAfterInvalidPublishMessage(t *testing.T) {
|
||||||
|
ts := RunServerOnPort(ENC_TEST_PORT)
|
||||||
|
defer ts.Shutdown()
|
||||||
|
opts := options
|
||||||
|
nc, _ := opts.Connect()
|
||||||
|
defer nc.Close()
|
||||||
|
c, _ := NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER)
|
||||||
|
const testSubj = "test"
|
||||||
|
|
||||||
|
c.Publish(testSubj, &testdata.Person{Name: "Anatolii"})
|
||||||
|
|
||||||
|
//Publish invalid protobuff message to catch decode error
|
||||||
|
c.Publish(testSubj, "foo")
|
||||||
|
|
||||||
|
//Next publish with valid protobuf message should be successful
|
||||||
|
if err := c.Publish(testSubj, &testdata.Person{Name: "Anatolii"}); err != nil {
|
||||||
|
t.Error("Fail to send correct protobuf message after invalid message publishing", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVariousFailureConditions(t *testing.T) {
|
||||||
|
ts := RunServerOnPort(ENC_TEST_PORT)
|
||||||
|
defer ts.Shutdown()
|
||||||
|
|
||||||
|
dch := make(chan bool)
|
||||||
|
|
||||||
|
opts := options
|
||||||
|
opts.AsyncErrorCB = func(_ *Conn, _ *Subscription, e error) {
|
||||||
|
dch <- true
|
||||||
|
}
|
||||||
|
nc, _ := opts.Connect()
|
||||||
|
nc.Close()
|
||||||
|
|
||||||
|
if _, err := NewEncodedConn(nil, protobuf.PROTOBUF_ENCODER); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER); err == nil || err != ErrConnectionClosed {
|
||||||
|
t.Fatalf("Wrong error: %v instead of %v", err, ErrConnectionClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
nc, _ = opts.Connect()
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
if _, err := NewEncodedConn(nc, "foo"); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to create encoded connection: %v", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
if _, err := c.Subscribe("bar", func(subj, obj string) {}); err != nil {
|
||||||
|
t.Fatalf("Unable to create subscription: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Publish("bar", &testdata.Person{Name: "Ivan"}); err != nil {
|
||||||
|
t.Fatalf("Unable to publish: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get the async error callback")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.PublishRequest("foo", "bar", "foo"); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Request("foo", "foo", nil, 2*time.Second); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
nc.Close()
|
||||||
|
|
||||||
|
if err := c.PublishRequest("foo", "bar", &testdata.Person{Name: "Ivan"}); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &testdata.Person{}
|
||||||
|
if err := c.Request("foo", &testdata.Person{Name: "Ivan"}, resp, 2*time.Second); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := c.Subscribe("foo", nil); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := c.Subscribe("foo", func() {}); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if _, err := c.Subscribe("foo", "bar"); err == nil {
|
||||||
|
t.Fatal("Expected an error")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRequest(t *testing.T) {
|
||||||
|
ts := RunServerOnPort(ENC_TEST_PORT)
|
||||||
|
defer ts.Shutdown()
|
||||||
|
|
||||||
|
dch := make(chan bool)
|
||||||
|
|
||||||
|
opts := options
|
||||||
|
nc, _ := opts.Connect()
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
c, err := NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to create encoded connection: %v", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
sentName := "Ivan"
|
||||||
|
recvName := "Kozlovic"
|
||||||
|
|
||||||
|
if _, err := c.Subscribe("foo", func(_, reply string, p *testdata.Person) {
|
||||||
|
if p.Name != sentName {
|
||||||
|
t.Fatalf("Got wrong name: %v instead of %v", p.Name, sentName)
|
||||||
|
}
|
||||||
|
c.Publish(reply, &testdata.Person{Name: recvName})
|
||||||
|
dch <- true
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Unable to create subscription: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := c.Subscribe("foo", func(_ string, p *testdata.Person) {
|
||||||
|
if p.Name != sentName {
|
||||||
|
t.Fatalf("Got wrong name: %v instead of %v", p.Name, sentName)
|
||||||
|
}
|
||||||
|
dch <- true
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Unable to create subscription: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Publish("foo", &testdata.Person{Name: sentName}); err != nil {
|
||||||
|
t.Fatalf("Unable to publish: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get message")
|
||||||
|
}
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get message")
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &testdata.Person{}
|
||||||
|
if err := c.Request("foo", &testdata.Person{Name: sentName}, response, 2*time.Second); err != nil {
|
||||||
|
t.Fatalf("Unable to publish: %v", err)
|
||||||
|
}
|
||||||
|
if response == nil {
|
||||||
|
t.Fatal("No response received")
|
||||||
|
} else if response.Name != recvName {
|
||||||
|
t.Fatalf("Wrong response: %v instead of %v", response.Name, recvName)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get message")
|
||||||
|
}
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get message")
|
||||||
|
}
|
||||||
|
|
||||||
|
c2, err := NewEncodedConn(nc, GOB_ENCODER)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to create encoded connection: %v", err)
|
||||||
|
}
|
||||||
|
defer c2.Close()
|
||||||
|
|
||||||
|
if _, err := c2.QueueSubscribe("bar", "baz", func(m *Msg) {
|
||||||
|
response := &Msg{Subject: m.Reply, Data: []byte(recvName)}
|
||||||
|
c2.Conn.PublishMsg(response)
|
||||||
|
dch <- true
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("Unable to create subscription: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mReply := Msg{}
|
||||||
|
if err := c2.Request("bar", &Msg{Data: []byte(sentName)}, &mReply, 2*time.Second); err != nil {
|
||||||
|
t.Fatalf("Unable to send request: %v", err)
|
||||||
|
}
|
||||||
|
if string(mReply.Data) != recvName {
|
||||||
|
t.Fatalf("Wrong reply: %v instead of %v", string(mReply.Data), recvName)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Wait(dch); err != nil {
|
||||||
|
t.Fatal("Did not get message")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.LastError() != nil {
|
||||||
|
t.Fatalf("Unexpected connection error: %v", c.LastError())
|
||||||
|
}
|
||||||
|
if c2.LastError() != nil {
|
||||||
|
t.Fatalf("Unexpected connection error: %v", c2.LastError())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,266 @@
|
||||||
|
package nats_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nats-io/go-nats"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Shows different ways to create a Conn
|
||||||
|
func ExampleConnect() {
|
||||||
|
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
nc.Close()
|
||||||
|
|
||||||
|
nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222")
|
||||||
|
nc.Close()
|
||||||
|
|
||||||
|
nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443")
|
||||||
|
nc.Close()
|
||||||
|
|
||||||
|
opts := nats.Options{
|
||||||
|
AllowReconnect: true,
|
||||||
|
MaxReconnect: 10,
|
||||||
|
ReconnectWait: 5 * time.Second,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
nc, _ = opts.Connect()
|
||||||
|
nc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// This Example shows an asynchronous subscriber.
|
||||||
|
func ExampleConn_Subscribe() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
||||||
|
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// This Example shows a synchronous subscriber.
|
||||||
|
func ExampleConn_SubscribeSync() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
sub, _ := nc.SubscribeSync("foo")
|
||||||
|
m, err := sub.NextMsg(1 * time.Second)
|
||||||
|
if err == nil {
|
||||||
|
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||||
|
} else {
|
||||||
|
fmt.Println("NextMsg timed out.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleSubscription_NextMsg() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
sub, _ := nc.SubscribeSync("foo")
|
||||||
|
m, err := sub.NextMsg(1 * time.Second)
|
||||||
|
if err == nil {
|
||||||
|
fmt.Printf("Received a message: %s\n", string(m.Data))
|
||||||
|
} else {
|
||||||
|
fmt.Println("NextMsg timed out.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleSubscription_Unsubscribe() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
sub, _ := nc.SubscribeSync("foo")
|
||||||
|
// ...
|
||||||
|
sub.Unsubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_Publish() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
nc.Publish("foo", []byte("Hello World!"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_PublishMsg() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
|
||||||
|
nc.PublishMsg(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_Flush() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
nc.PublishMsg(msg)
|
||||||
|
}
|
||||||
|
err := nc.Flush()
|
||||||
|
if err == nil {
|
||||||
|
// Everything has been processed by the server for nc *Conn.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_FlushTimeout() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
nc.PublishMsg(msg)
|
||||||
|
}
|
||||||
|
// Only wait for up to 1 second for Flush
|
||||||
|
err := nc.FlushTimeout(1 * time.Second)
|
||||||
|
if err == nil {
|
||||||
|
// Everything has been processed by the server for nc *Conn.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_Request() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
||||||
|
nc.Publish(m.Reply, []byte("I will help you"))
|
||||||
|
})
|
||||||
|
nc.Request("foo", []byte("help"), 50*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_QueueSubscribe() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
received := 0
|
||||||
|
|
||||||
|
nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {
|
||||||
|
received++
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleSubscription_AutoUnsubscribe() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
received, wanted, total := 0, 10, 100
|
||||||
|
|
||||||
|
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
|
||||||
|
received++
|
||||||
|
})
|
||||||
|
sub.AutoUnsubscribe(wanted)
|
||||||
|
|
||||||
|
for i := 0; i < total; i++ {
|
||||||
|
nc.Publish("foo", []byte("Hello"))
|
||||||
|
}
|
||||||
|
nc.Flush()
|
||||||
|
|
||||||
|
fmt.Printf("Received = %d", received)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleConn_Close() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
nc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shows how to wrap a Conn into an EncodedConn
|
||||||
|
func ExampleNewEncodedConn() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, "json")
|
||||||
|
c.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodedConn can publish virtually anything just
|
||||||
|
// by passing it in. The encoder will be used to properly
|
||||||
|
// encode the raw Go type
|
||||||
|
func ExampleEncodedConn_Publish() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, "json")
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
|
||||||
|
c.Publish("hello", me)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodedConn's subscribers will automatically decode the
|
||||||
|
// wire data into the requested Go type using the Decode()
|
||||||
|
// method of the registered Encoder. The callback signature
|
||||||
|
// can also vary to include additional data, such as subject
|
||||||
|
// and reply subjects.
|
||||||
|
func ExampleEncodedConn_Subscribe() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, "json")
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Subscribe("hello", func(p *person) {
|
||||||
|
fmt.Printf("Received a person! %+v\n", p)
|
||||||
|
})
|
||||||
|
|
||||||
|
c.Subscribe("hello", func(subj, reply string, p *person) {
|
||||||
|
fmt.Printf("Received a person on subject %s! %+v\n", subj, p)
|
||||||
|
})
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
|
||||||
|
c.Publish("hello", me)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindSendChan() allows binding of a Go channel to a nats
|
||||||
|
// subject for publish operations. The Encoder attached to the
|
||||||
|
// EncodedConn will be used for marshaling.
|
||||||
|
func ExampleEncodedConn_BindSendChan() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, "json")
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *person)
|
||||||
|
c.BindSendChan("hello", ch)
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
|
||||||
|
ch <- me
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindRecvChan() allows binding of a Go channel to a nats
|
||||||
|
// subject for subscribe operations. The Encoder attached to the
|
||||||
|
// EncodedConn will be used for un-marshaling.
|
||||||
|
func ExampleEncodedConn_BindRecvChan() {
|
||||||
|
nc, _ := nats.Connect(nats.DefaultURL)
|
||||||
|
c, _ := nats.NewEncodedConn(nc, "json")
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
type person struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Age int
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *person)
|
||||||
|
c.BindRecvChan("hello", ch)
|
||||||
|
|
||||||
|
me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
|
||||||
|
c.Publish("hello", me)
|
||||||
|
|
||||||
|
// Receive the publish directly on a channel
|
||||||
|
who := <-ch
|
||||||
|
|
||||||
|
fmt.Printf("%v says hello!\n", who)
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,100 @@
|
||||||
|
// Copyright 2013-2017 Apcera Inc. All rights reserved.
|
||||||
|
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This allows the functionality for network channels by binding send and receive Go chans
|
||||||
|
// to subjects and optionally queue groups.
|
||||||
|
// Data will be encoded and decoded via the EncodedConn and its associated encoders.
|
||||||
|
|
||||||
|
// BindSendChan binds a channel for send operations to NATS.
|
||||||
|
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error {
|
||||||
|
chVal := reflect.ValueOf(channel)
|
||||||
|
if chVal.Kind() != reflect.Chan {
|
||||||
|
return ErrChanArg
|
||||||
|
}
|
||||||
|
go chPublish(c, chVal, subject)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish all values that arrive on the channel until it is closed or we
|
||||||
|
// encounter an error.
|
||||||
|
func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
|
||||||
|
for {
|
||||||
|
val, ok := chVal.Recv()
|
||||||
|
if !ok {
|
||||||
|
// Channel has most likely been closed.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if e := c.Publish(subject, val.Interface()); e != nil {
|
||||||
|
// Do this under lock.
|
||||||
|
c.Conn.mu.Lock()
|
||||||
|
defer c.Conn.mu.Unlock()
|
||||||
|
|
||||||
|
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||||
|
// FIXME(dlc) - Not sure this is the right thing to do.
|
||||||
|
// FIXME(ivan) - If the connection is not yet closed, try to schedule the callback
|
||||||
|
if c.Conn.isClosed() {
|
||||||
|
go c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e)
|
||||||
|
} else {
|
||||||
|
c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindRecvChan binds a channel for receive operations from NATS.
|
||||||
|
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error) {
|
||||||
|
return c.bindRecvChan(subject, _EMPTY_, channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
|
||||||
|
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error) {
|
||||||
|
return c.bindRecvChan(subject, queue, channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal function to bind receive operations for a channel.
|
||||||
|
func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (*Subscription, error) {
|
||||||
|
chVal := reflect.ValueOf(channel)
|
||||||
|
if chVal.Kind() != reflect.Chan {
|
||||||
|
return nil, ErrChanArg
|
||||||
|
}
|
||||||
|
argType := chVal.Type().Elem()
|
||||||
|
|
||||||
|
cb := func(m *Msg) {
|
||||||
|
var oPtr reflect.Value
|
||||||
|
if argType.Kind() != reflect.Ptr {
|
||||||
|
oPtr = reflect.New(argType)
|
||||||
|
} else {
|
||||||
|
oPtr = reflect.New(argType.Elem())
|
||||||
|
}
|
||||||
|
if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
|
||||||
|
c.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + err.Error())
|
||||||
|
if c.Conn.Opts.AsyncErrorCB != nil {
|
||||||
|
c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, c.Conn.err) }
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if argType.Kind() != reflect.Ptr {
|
||||||
|
oPtr = reflect.Indirect(oPtr)
|
||||||
|
}
|
||||||
|
// This is a bit hacky, but in this instance we may be trying to send to a closed channel.
|
||||||
|
// and the user does not know when it is safe to close the channel.
|
||||||
|
defer func() {
|
||||||
|
// If we have panicked, recover and close the subscription.
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
m.Sub.Unsubscribe()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// Actually do the send to the channel.
|
||||||
|
chVal.Send(oPtr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Conn.subscribe(subject, queue, cb, nil)
|
||||||
|
}
|
|
@ -0,0 +1,470 @@
|
||||||
|
// Copyright 2012-2017 Apcera Inc. All rights reserved.
|
||||||
|
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type msgArg struct {
|
||||||
|
subject []byte
|
||||||
|
reply []byte
|
||||||
|
sid int64
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
const MAX_CONTROL_LINE_SIZE = 1024
|
||||||
|
|
||||||
|
type parseState struct {
|
||||||
|
state int
|
||||||
|
as int
|
||||||
|
drop int
|
||||||
|
ma msgArg
|
||||||
|
argBuf []byte
|
||||||
|
msgBuf []byte
|
||||||
|
scratch [MAX_CONTROL_LINE_SIZE]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
OP_START = iota
|
||||||
|
OP_PLUS
|
||||||
|
OP_PLUS_O
|
||||||
|
OP_PLUS_OK
|
||||||
|
OP_MINUS
|
||||||
|
OP_MINUS_E
|
||||||
|
OP_MINUS_ER
|
||||||
|
OP_MINUS_ERR
|
||||||
|
OP_MINUS_ERR_SPC
|
||||||
|
MINUS_ERR_ARG
|
||||||
|
OP_M
|
||||||
|
OP_MS
|
||||||
|
OP_MSG
|
||||||
|
OP_MSG_SPC
|
||||||
|
MSG_ARG
|
||||||
|
MSG_PAYLOAD
|
||||||
|
MSG_END
|
||||||
|
OP_P
|
||||||
|
OP_PI
|
||||||
|
OP_PIN
|
||||||
|
OP_PING
|
||||||
|
OP_PO
|
||||||
|
OP_PON
|
||||||
|
OP_PONG
|
||||||
|
OP_I
|
||||||
|
OP_IN
|
||||||
|
OP_INF
|
||||||
|
OP_INFO
|
||||||
|
OP_INFO_SPC
|
||||||
|
INFO_ARG
|
||||||
|
)
|
||||||
|
|
||||||
|
// parse is the fast protocol parser engine.
|
||||||
|
func (nc *Conn) parse(buf []byte) error {
|
||||||
|
var i int
|
||||||
|
var b byte
|
||||||
|
|
||||||
|
// Move to loop instead of range syntax to allow jumping of i
|
||||||
|
for i = 0; i < len(buf); i++ {
|
||||||
|
b = buf[i]
|
||||||
|
|
||||||
|
switch nc.ps.state {
|
||||||
|
case OP_START:
|
||||||
|
switch b {
|
||||||
|
case 'M', 'm':
|
||||||
|
nc.ps.state = OP_M
|
||||||
|
case 'P', 'p':
|
||||||
|
nc.ps.state = OP_P
|
||||||
|
case '+':
|
||||||
|
nc.ps.state = OP_PLUS
|
||||||
|
case '-':
|
||||||
|
nc.ps.state = OP_MINUS
|
||||||
|
case 'I', 'i':
|
||||||
|
nc.ps.state = OP_I
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_M:
|
||||||
|
switch b {
|
||||||
|
case 'S', 's':
|
||||||
|
nc.ps.state = OP_MS
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MS:
|
||||||
|
switch b {
|
||||||
|
case 'G', 'g':
|
||||||
|
nc.ps.state = OP_MSG
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MSG:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
nc.ps.state = OP_MSG_SPC
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MSG_SPC:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
nc.ps.state = MSG_ARG
|
||||||
|
nc.ps.as = i
|
||||||
|
}
|
||||||
|
case MSG_ARG:
|
||||||
|
switch b {
|
||||||
|
case '\r':
|
||||||
|
nc.ps.drop = 1
|
||||||
|
case '\n':
|
||||||
|
var arg []byte
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
arg = nc.ps.argBuf
|
||||||
|
} else {
|
||||||
|
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||||
|
}
|
||||||
|
if err := nc.processMsgArgs(arg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
|
||||||
|
|
||||||
|
// jump ahead with the index. If this overruns
|
||||||
|
// what is left we fall out and process split
|
||||||
|
// buffer.
|
||||||
|
i = nc.ps.as + nc.ps.ma.size - 1
|
||||||
|
default:
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case MSG_PAYLOAD:
|
||||||
|
if nc.ps.msgBuf != nil {
|
||||||
|
if len(nc.ps.msgBuf) >= nc.ps.ma.size {
|
||||||
|
nc.processMsg(nc.ps.msgBuf)
|
||||||
|
nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
|
||||||
|
} else {
|
||||||
|
// copy as much as we can to the buffer and skip ahead.
|
||||||
|
toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
|
||||||
|
avail := len(buf) - i
|
||||||
|
|
||||||
|
if avail < toCopy {
|
||||||
|
toCopy = avail
|
||||||
|
}
|
||||||
|
|
||||||
|
if toCopy > 0 {
|
||||||
|
start := len(nc.ps.msgBuf)
|
||||||
|
// This is needed for copy to work.
|
||||||
|
nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
|
||||||
|
copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
|
||||||
|
// Update our index
|
||||||
|
i = (i + toCopy) - 1
|
||||||
|
} else {
|
||||||
|
nc.ps.msgBuf = append(nc.ps.msgBuf, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if i-nc.ps.as >= nc.ps.ma.size {
|
||||||
|
nc.processMsg(buf[nc.ps.as:i])
|
||||||
|
nc.ps.argBuf, nc.ps.msgBuf, nc.ps.state = nil, nil, MSG_END
|
||||||
|
}
|
||||||
|
case MSG_END:
|
||||||
|
switch b {
|
||||||
|
case '\n':
|
||||||
|
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
case OP_PLUS:
|
||||||
|
switch b {
|
||||||
|
case 'O', 'o':
|
||||||
|
nc.ps.state = OP_PLUS_O
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PLUS_O:
|
||||||
|
switch b {
|
||||||
|
case 'K', 'k':
|
||||||
|
nc.ps.state = OP_PLUS_OK
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PLUS_OK:
|
||||||
|
switch b {
|
||||||
|
case '\n':
|
||||||
|
nc.processOK()
|
||||||
|
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||||
|
}
|
||||||
|
case OP_MINUS:
|
||||||
|
switch b {
|
||||||
|
case 'E', 'e':
|
||||||
|
nc.ps.state = OP_MINUS_E
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MINUS_E:
|
||||||
|
switch b {
|
||||||
|
case 'R', 'r':
|
||||||
|
nc.ps.state = OP_MINUS_ER
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MINUS_ER:
|
||||||
|
switch b {
|
||||||
|
case 'R', 'r':
|
||||||
|
nc.ps.state = OP_MINUS_ERR
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MINUS_ERR:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
nc.ps.state = OP_MINUS_ERR_SPC
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_MINUS_ERR_SPC:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
nc.ps.state = MINUS_ERR_ARG
|
||||||
|
nc.ps.as = i
|
||||||
|
}
|
||||||
|
case MINUS_ERR_ARG:
|
||||||
|
switch b {
|
||||||
|
case '\r':
|
||||||
|
nc.ps.drop = 1
|
||||||
|
case '\n':
|
||||||
|
var arg []byte
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
arg = nc.ps.argBuf
|
||||||
|
nc.ps.argBuf = nil
|
||||||
|
} else {
|
||||||
|
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||||
|
}
|
||||||
|
nc.processErr(string(arg))
|
||||||
|
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||||
|
default:
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case OP_P:
|
||||||
|
switch b {
|
||||||
|
case 'I', 'i':
|
||||||
|
nc.ps.state = OP_PI
|
||||||
|
case 'O', 'o':
|
||||||
|
nc.ps.state = OP_PO
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PO:
|
||||||
|
switch b {
|
||||||
|
case 'N', 'n':
|
||||||
|
nc.ps.state = OP_PON
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PON:
|
||||||
|
switch b {
|
||||||
|
case 'G', 'g':
|
||||||
|
nc.ps.state = OP_PONG
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PONG:
|
||||||
|
switch b {
|
||||||
|
case '\n':
|
||||||
|
nc.processPong()
|
||||||
|
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||||
|
}
|
||||||
|
case OP_PI:
|
||||||
|
switch b {
|
||||||
|
case 'N', 'n':
|
||||||
|
nc.ps.state = OP_PIN
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PIN:
|
||||||
|
switch b {
|
||||||
|
case 'G', 'g':
|
||||||
|
nc.ps.state = OP_PING
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_PING:
|
||||||
|
switch b {
|
||||||
|
case '\n':
|
||||||
|
nc.processPing()
|
||||||
|
nc.ps.drop, nc.ps.state = 0, OP_START
|
||||||
|
}
|
||||||
|
case OP_I:
|
||||||
|
switch b {
|
||||||
|
case 'N', 'n':
|
||||||
|
nc.ps.state = OP_IN
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_IN:
|
||||||
|
switch b {
|
||||||
|
case 'F', 'f':
|
||||||
|
nc.ps.state = OP_INF
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_INF:
|
||||||
|
switch b {
|
||||||
|
case 'O', 'o':
|
||||||
|
nc.ps.state = OP_INFO
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_INFO:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
nc.ps.state = OP_INFO_SPC
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
case OP_INFO_SPC:
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t':
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
nc.ps.state = INFO_ARG
|
||||||
|
nc.ps.as = i
|
||||||
|
}
|
||||||
|
case INFO_ARG:
|
||||||
|
switch b {
|
||||||
|
case '\r':
|
||||||
|
nc.ps.drop = 1
|
||||||
|
case '\n':
|
||||||
|
var arg []byte
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
arg = nc.ps.argBuf
|
||||||
|
nc.ps.argBuf = nil
|
||||||
|
} else {
|
||||||
|
arg = buf[nc.ps.as : i-nc.ps.drop]
|
||||||
|
}
|
||||||
|
nc.processAsyncInfo(arg)
|
||||||
|
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
|
||||||
|
default:
|
||||||
|
if nc.ps.argBuf != nil {
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
goto parseErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check for split buffer scenarios
|
||||||
|
if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
|
||||||
|
nc.ps.argBuf = nc.ps.scratch[:0]
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
|
||||||
|
// FIXME, check max len
|
||||||
|
}
|
||||||
|
// Check for split msg
|
||||||
|
if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
|
||||||
|
// We need to clone the msgArg if it is still referencing the
|
||||||
|
// read buffer and we are not able to process the msg.
|
||||||
|
if nc.ps.argBuf == nil {
|
||||||
|
nc.cloneMsgArg()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we will overflow the scratch buffer, just create a
|
||||||
|
// new buffer to hold the split message.
|
||||||
|
if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
|
||||||
|
lrem := len(buf[nc.ps.as:])
|
||||||
|
|
||||||
|
nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
|
||||||
|
copy(nc.ps.msgBuf, buf[nc.ps.as:])
|
||||||
|
} else {
|
||||||
|
nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
|
||||||
|
nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
parseErr:
|
||||||
|
return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
|
||||||
|
// we need to hold onto it into the next read.
|
||||||
|
func (nc *Conn) cloneMsgArg() {
|
||||||
|
nc.ps.argBuf = nc.ps.scratch[:0]
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
|
||||||
|
nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
|
||||||
|
nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
|
||||||
|
if nc.ps.ma.reply != nil {
|
||||||
|
nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const argsLenMax = 4
|
||||||
|
|
||||||
|
func (nc *Conn) processMsgArgs(arg []byte) error {
|
||||||
|
// Unroll splitArgs to avoid runtime/heap issues
|
||||||
|
a := [argsLenMax][]byte{}
|
||||||
|
args := a[:0]
|
||||||
|
start := -1
|
||||||
|
for i, b := range arg {
|
||||||
|
switch b {
|
||||||
|
case ' ', '\t', '\r', '\n':
|
||||||
|
if start >= 0 {
|
||||||
|
args = append(args, arg[start:i])
|
||||||
|
start = -1
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if start < 0 {
|
||||||
|
start = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if start >= 0 {
|
||||||
|
args = append(args, arg[start:])
|
||||||
|
}
|
||||||
|
|
||||||
|
switch len(args) {
|
||||||
|
case 3:
|
||||||
|
nc.ps.ma.subject = args[0]
|
||||||
|
nc.ps.ma.sid = parseInt64(args[1])
|
||||||
|
nc.ps.ma.reply = nil
|
||||||
|
nc.ps.ma.size = int(parseInt64(args[2]))
|
||||||
|
case 4:
|
||||||
|
nc.ps.ma.subject = args[0]
|
||||||
|
nc.ps.ma.sid = parseInt64(args[1])
|
||||||
|
nc.ps.ma.reply = args[2]
|
||||||
|
nc.ps.ma.size = int(parseInt64(args[3]))
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
|
||||||
|
}
|
||||||
|
if nc.ps.ma.sid < 0 {
|
||||||
|
return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
|
||||||
|
}
|
||||||
|
if nc.ps.ma.size < 0 {
|
||||||
|
return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ascii numbers 0-9
|
||||||
|
const (
|
||||||
|
ascii_0 = 48
|
||||||
|
ascii_9 = 57
|
||||||
|
)
|
||||||
|
|
||||||
|
// parseInt64 expects decimal positive numbers. We
|
||||||
|
// return -1 to signal error
|
||||||
|
func parseInt64(d []byte) (n int64) {
|
||||||
|
if len(d) == 0 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
for _, dec := range d {
|
||||||
|
if dec < ascii_0 || dec > ascii_9 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
n = n*10 + (int64(dec) - ascii_0)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
github.com/nats-io/go-nats/*_test.go:SA2002
|
||||||
|
github.com/nats-io/go-nats/*/*_test.go:SA2002
|
||||||
|
github.com/nats-io/go-nats/test/context_test.go:SA1012
|
||||||
|
github.com/nats-io/go-nats/nats.go:SA6000
|
|
@ -0,0 +1,43 @@
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// global pool of *time.Timer's. can be used by multiple goroutines concurrently.
|
||||||
|
var globalTimerPool timerPool
|
||||||
|
|
||||||
|
// timerPool provides GC-able pooling of *time.Timer's.
|
||||||
|
// can be used by multiple goroutines concurrently.
|
||||||
|
type timerPool struct {
|
||||||
|
p sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns a timer that completes after the given duration.
|
||||||
|
func (tp *timerPool) Get(d time.Duration) *time.Timer {
|
||||||
|
if t, _ := tp.p.Get().(*time.Timer); t != nil {
|
||||||
|
t.Reset(d)
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.NewTimer(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put pools the given timer.
|
||||||
|
//
|
||||||
|
// There is no need to call t.Stop() before calling Put.
|
||||||
|
//
|
||||||
|
// Put will try to stop the timer before pooling. If the
|
||||||
|
// given timer already expired, Put will read the unreceived
|
||||||
|
// value if there is one.
|
||||||
|
func (tp *timerPool) Put(t *time.Timer) {
|
||||||
|
if !t.Stop() {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tp.p.Put(t)
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTimerPool(t *testing.T) {
|
||||||
|
var tp timerPool
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
tm := tp.Get(time.Millisecond * 20)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-tm.C:
|
||||||
|
t.Errorf("Timer already expired")
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-tm.C:
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Errorf("Timer didn't expire in time")
|
||||||
|
}
|
||||||
|
|
||||||
|
tp.Put(tm)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"comment": "",
|
||||||
|
"ignore": "",
|
||||||
|
"package": [
|
||||||
|
{
|
||||||
|
"checksumSHA1": "nWIa0L7ux21Cb8kzB4rJHXMblpI=",
|
||||||
|
"path": "github.com/nats-io/go-nats",
|
||||||
|
"revision": "f0d9c5988d4c2a17ad466fcdffe010165c46434e",
|
||||||
|
"revisionTime": "2017-11-14T23:23:38Z"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rootPath": "github.com/tidwall/tile38"
|
||||||
|
}
|
Loading…
Reference in New Issue