@ -14,7 +14,9 @@ but it can be used as a secondary step to compressors (like Snappy) that does no
## News
* Mar 2018: First implementation released. Consider this beta software for now.
This is used as part of the [zstandard](https://github.com/klauspost/compress/tree/master/zstd#zstd) compression and decompression package.
This ensures that most functionality is well tested.
# Usage
@ -24,22 +24,21 @@ Godoc Documentation: https://godoc.org/github.com/klauspost/compress/zstd
### Status:
STABLE - there may always be subtle bugs, a wide variety of content has been tested and the library is actively
used by several projects. This library is being continuously [fuzz-tested](https://github.com/klauspost/compress-fuzz),
kindly supplied by [fuzzit.dev](https://fuzzit.dev/).
used by several projects. This library is being [fuzz-tested](https://github.com/klauspost/compress-fuzz) for all updates.
There may still be specific combinations of data types/size/settings that could lead to edge cases,
so as always, testing is recommended.
For now, a high speed (fastest) and medium-fast (default) compressor has been implemented.
The "Fastest" compression ratio is roughly equivalent to zstd level 1.
The "Default" compression ratio is roughly equivalent to zstd level 3 (default).
* The "Fastest" compression ratio is roughly equivalent to zstd level 1.
* The "Default" compression ratio is roughly equivalent to zstd level 3 (default).
* The "Better" compression ratio is roughly equivalent to zstd level 7.
* The "Best" compression ratio is roughly equivalent to zstd level 11.
In terms of speed, it is typically 2x as fast as the stdlib deflate/gzip in its fastest mode.
The compression ratio compared to stdlib is around level 3, but usually 3x as fast.
Compared to cgo zstd, the speed is around level 3 (default), but compression slightly worse, between level 1&2.
### Usage
@ -54,11 +53,11 @@ To create a writer with default options, do like this:
// Compress input to output.
func Compress(in io.Reader, out io.Writer) error {
w, err := NewWriter(output)
enc, err := zstd.NewWriter(out)
if err != nil {
return err
_, err := io.Copy(w, input)
_, err = io.Copy(enc, in)
if err != nil {
return err
@ -140,7 +139,7 @@ I have collected some speed examples to compare speed and compression against ot
* `file` is the input file.
* `out` is the compressor used. `zskp` is this package. `zstd` is the Datadog cgo library. `gzstd/gzkp` is gzip standard and this library.
* `level` is the compression level used. For `zskp` level 1 is "fastest", level 2 is "default".
* `level` is the compression level used. For `zskp` level 1 is "fastest", level 2 is "default"; 3 is "better", 4 is "best".
* `insize`/`outsize` is the input/output size.
* `millis` is the number of milliseconds used for compression.
* `mb/s` is megabytes (2^20 bytes) per second.
@ -154,11 +153,13 @@ file out level insize outsize millis mb/s
silesia.tar zskp 1 211947520 73101992 643 313.87
silesia.tar zskp 2 211947520 67504318 969 208.38
silesia.tar zskp 3 211947520 65177448 1899 106.44
silesia.tar zskp 4 211947520 61381950 8115 24.91
cgo zstd:
silesia.tar zstd 1 211947520 73605392 543 371.56
silesia.tar zstd 3 211947520 66793289 864 233.68
silesia.tar zstd 6 211947520 62916450 1913 105.66
silesia.tar zstd 9 211947520 60212393 5063 39.92
gzip, stdlib/this package:
silesia.tar gzstd 1 211947520 80007735 1654 122.21
@ -171,9 +172,11 @@ file out level insize outsize millis mb/s
gob-stream zskp 1 1911399616 235022249 3088 590.30
gob-stream zskp 2 1911399616 205669791 3786 481.34
gob-stream zskp 3 1911399616 185792019 9324 195.48
gob-stream zskp 4 1911399616 171537212 32113 56.76
gob-stream zstd 1 1911399616 249810424 2637 691.26
gob-stream zstd 3 1911399616 208192146 3490 522.31
gob-stream zstd 6 1911399616 193632038 6687 272.56
gob-stream zstd 9 1911399616 177620386 16175 112.70
gob-stream gzstd 1 1911399616 357382641 10251 177.82
gob-stream gzkp 1 1911399616 362156523 5695 320.08
@ -185,9 +188,11 @@ file out level insize outsize millis mb/s
enwik9 zskp 1 1000000000 343848582 3609 264.18
enwik9 zskp 2 1000000000 317276632 5746 165.97
enwik9 zskp 3 1000000000 294540704 11725 81.34
enwik9 zskp 4 1000000000 276609671 44029 21.66
enwik9 zstd 1 1000000000 358072021 3110 306.65
enwik9 zstd 3 1000000000 313734672 4784 199.35
enwik9 zstd 6 1000000000 295138875 10290 92.68
enwik9 zstd 9 1000000000 278348700 28549 33.40
enwik9 gzstd 1 1000000000 382578136 9604 99.30
enwik9 gzkp 1 1000000000 383825945 6544 145.73
@ -198,9 +203,11 @@ file out level insize outsize millis mb/s
github-june-2days-2019.json zskp 1 6273951764 699045015 10620 563.40
github-june-2days-2019.json zskp 2 6273951764 617881763 11687 511.96
github-june-2days-2019.json zskp 3 6273951764 537511906 29252 204.54
github-june-2days-2019.json zskp 4 6273951764 512796117 97791 61.18
github-june-2days-2019.json zstd 1 6273951764 766284037 8450 708.00
github-june-2days-2019.json zstd 3 6273951764 661889476 10927 547.57
github-june-2days-2019.json zstd 6 6273951764 642756859 22996 260.18
github-june-2days-2019.json zstd 9 6273951764 601974523 52413 114.16
github-june-2days-2019.json gzstd 1 6273951764 1164400847 29948 199.79
github-june-2days-2019.json gzkp 1 6273951764 1128755542 19236 311.03
@ -211,9 +218,11 @@ file out level insize outsize millis mb/s
rawstudio-mint14.tar zskp 1 8558382592 3667489370 20210 403.84
rawstudio-mint14.tar zskp 2 8558382592 3364592300 31873 256.07
rawstudio-mint14.tar zskp 3 8558382592 3224594213 71751 113.75
rawstudio-mint14.tar zskp 4 8558382592 3027332295 486243 16.79
rawstudio-mint14.tar zstd 1 8558382592 3609250104 17136 476.27
rawstudio-mint14.tar zstd 3 8558382592 3341679997 29262 278.92
rawstudio-mint14.tar zstd 6 8558382592 3235846406 77904 104.77
rawstudio-mint14.tar zstd 9 8558382592 3160778861 140946 57.91
rawstudio-mint14.tar gzstd 1 8558382592 3926257486 57722 141.40
rawstudio-mint14.tar gzkp 1 8558382592 3970463184 41749 195.49
@ -224,9 +233,11 @@ file out level insize outsize millis mb/s
nyc-taxi-data-10M.csv zskp 1 3325605752 641339945 8925 355.35
nyc-taxi-data-10M.csv zskp 2 3325605752 591748091 11268 281.44
nyc-taxi-data-10M.csv zskp 3 3325605752 538490114 19880 159.53
nyc-taxi-data-10M.csv zskp 4 3325605752 495986829 89368 35.49
nyc-taxi-data-10M.csv zstd 1 3325605752 687399637 8233 385.18
nyc-taxi-data-10M.csv zstd 3 3325605752 598514411 10065 315.07
nyc-taxi-data-10M.csv zstd 6 3325605752 570522953 20038 158.27
nyc-taxi-data-10M.csv zstd 9 3325605752 517554797 64565 49.12
nyc-taxi-data-10M.csv gzstd 1 3325605752 928656485 23876 132.83
nyc-taxi-data-10M.csv gzkp 1 3325605752 924718719 16388 193.53
@ -251,14 +262,14 @@ For streaming use a simple setup could look like this:
import "github.com/klauspost/compress/zstd"
func Decompress(in io.Reader, out io.Writer) error {
d, err := zstd.NewReader(input)
d, err := zstd.NewReader(in)
if err != nil {
return err
defer d.Close()
// Copy content...
_, err := io.Copy(out, d)
_, err = io.Copy(out, d)
return err
@ -613,7 +613,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
// Decode treeless literal block.
if litType == literalsBlockTreeless {
// TODO: We could send the history early WITHOUT the stream history.
// This would allow decoding treeless literials before the byte history is available.
// This would allow decoding treeless literals before the byte history is available.
// Silencia stats: Treeless 4393, with: 32775, total: 37168, 11% treeless.
// So not much obvious gain here.
@ -76,6 +76,7 @@ func (b *blockEnc) reset(prev *blockEnc) {
if prev != nil {
b.recentOffsets = prev.prevRecentOffsets
b.dictLitEnc = nil
// reset will reset the block for a new encode, but in the same stream,
@ -0,0 +1,202 @@
// Copyright 2020+ Klaus Post. All rights reserved.
// License information can be found in the LICENSE file.
package zstd
import (
// HeaderMaxSize is the maximum size of a Frame and Block Header.
// If less is sent to Header.Decode it *may* still contain enough information.
const HeaderMaxSize = 14 + 3
// Header contains information about the first frame and block within that.
type Header struct {
// Window Size the window of data to keep while decoding.
// Will only be set if HasFCS is false.
WindowSize uint64
// Frame content size.
// Expected size of the entire frame.
FrameContentSize uint64
// Dictionary ID.
// If 0, no dictionary.
DictionaryID uint32
// First block information.
FirstBlock struct {
// OK will be set if first block could be decoded.
OK bool
// Is this the last block of a frame?
Last bool
// Is the data compressed?
// If true CompressedSize will be populated.
// Unfortunately DecompressedSize cannot be determined
// without decoding the blocks.
Compressed bool
// DecompressedSize is the expected decompressed size of the block.
// Will be 0 if it cannot be determined.
DecompressedSize int
// CompressedSize of the data in the block.
// Does not include the block header.
// Will be equal to DecompressedSize if not Compressed.
CompressedSize int
// Skippable will be true if the frame is meant to be skipped.
// No other information will be populated.
Skippable bool
// If set there is a checksum present for the block content.
HasCheckSum bool
// If this is true FrameContentSize will have a valid value
HasFCS bool
SingleSegment bool
// Decode the header from the beginning of the stream.
// This will decode the frame header and the first block header if enough bytes are provided.
// It is recommended to provide at least HeaderMaxSize bytes.
// If the frame header cannot be read an error will be returned.
// If there isn't enough input, io.ErrUnexpectedEOF is returned.
// The FirstBlock.OK will indicate if enough information was available to decode the first block header.
func (h *Header) Decode(in []byte) error {
if len(in) < 4 {
return io.ErrUnexpectedEOF
b, in := in[:4], in[4:]
if !bytes.Equal(b, frameMagic) {
if !bytes.Equal(b[1:4], skippableFrameMagic) || b[0]&0xf0 != 0x50 {
return ErrMagicMismatch
*h = Header{Skippable: true}
return nil
if len(in) < 1 {
return io.ErrUnexpectedEOF
// Clear output
*h = Header{}
fhd, in := in[0], in[1:]
h.SingleSegment = fhd&(1<<5) != 0
h.HasCheckSum = fhd&(1<<2) != 0
if fhd&(1<<3) != 0 {
return errors.New("Reserved bit set on frame header")
// Read Window_Descriptor
// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#window_descriptor
if !h.SingleSegment {
if len(in) < 1 {
return io.ErrUnexpectedEOF
var wd byte
wd, in = in[0], in[1:]
windowLog := 10 + (wd >> 3)
windowBase := uint64(1) << windowLog
windowAdd := (windowBase / 8) * uint64(wd&0x7)
h.WindowSize = windowBase + windowAdd
// Read Dictionary_ID
// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#dictionary_id
if size := fhd & 3; size != 0 {
if size == 3 {
size = 4
if len(in) < int(size) {
return io.ErrUnexpectedEOF
b, in = in[:size], in[size:]
if b == nil {
return io.ErrUnexpectedEOF
switch size {
case 1:
h.DictionaryID = uint32(b[0])
case 2:
h.DictionaryID = uint32(b[0]) | (uint32(b[1]) << 8)
case 4:
h.DictionaryID = uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
// Read Frame_Content_Size
// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#frame_content_size
var fcsSize int
v := fhd >> 6
switch v {
case 0:
if h.SingleSegment {
fcsSize = 1
fcsSize = 1 << v
if fcsSize > 0 {
h.HasFCS = true
if len(in) < fcsSize {
return io.ErrUnexpectedEOF
b, in = in[:fcsSize], in[fcsSize:]
if b == nil {
return io.ErrUnexpectedEOF
switch fcsSize {
case 1:
h.FrameContentSize = uint64(b[0])
case 2:
// When FCS_Field_Size is 2, the offset of 256 is added.
h.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) + 256
case 4:
h.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) | (uint64(b[2]) << 16) | (uint64(b[3]) << 24)
case 8:
d1 := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
d2 := uint32(b[4]) | (uint32(b[5]) << 8) | (uint32(b[6]) << 16) | (uint32(b[7]) << 24)
h.FrameContentSize = uint64(d1) | (uint64(d2) << 32)
// Frame Header done, we will not fail from now on.
if len(in) < 3 {
return nil
tmp, in := in[:3], in[3:]
bh := uint32(tmp[0]) | (uint32(tmp[1]) << 8) | (uint32(tmp[2]) << 16)
h.FirstBlock.Last = bh&1 != 0
blockType := blockType((bh >> 1) & 3)
// find size.
cSize := int(bh >> 3)
switch blockType {
case blockTypeReserved:
return nil
case blockTypeRLE:
h.FirstBlock.Compressed = true
h.FirstBlock.DecompressedSize = cSize
h.FirstBlock.CompressedSize = 1
case blockTypeCompressed:
h.FirstBlock.Compressed = true
h.FirstBlock.CompressedSize = cSize
case blockTypeRaw:
h.FirstBlock.DecompressedSize = cSize
h.FirstBlock.CompressedSize = cSize
panic("Invalid block type")
h.FirstBlock.OK = true
return nil
@ -5,7 +5,6 @@
package zstd
import (
@ -85,6 +84,10 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
d.current.output = make(chan decodeOutput, d.o.concurrent)
d.current.flushed = true
if r == nil {
d.current.err = ErrDecoderNilInput
// Transfer option dicts.
d.dicts = make(map[uint32]dict, len(d.o.dicts))
for _, dc := range d.o.dicts {
@ -111,7 +114,7 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
// When the stream is done, io.EOF will be returned.
func (d *Decoder) Read(p []byte) (int, error) {
if d.stream == nil {
return 0, errors.New("no input has been initialized")
return 0, ErrDecoderNilInput
var n int
for {
@ -152,12 +155,20 @@ func (d *Decoder) Read(p []byte) (int, error) {
// Reset will reset the decoder the supplied stream after the current has finished processing.
// Note that this functionality cannot be used after Close has been called.
// Reset can be called with a nil reader to release references to the previous reader.
// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
// should be used.
func (d *Decoder) Reset(r io.Reader) error {
if d.current.err == ErrDecoderClosed {
return d.current.err
if r == nil {
return errors.New("nil Reader sent as input")
d.current.err = ErrDecoderNilInput
d.current.flushed = true
return nil
if d.stream == nil {
@ -166,14 +177,14 @@ func (d *Decoder) Reset(r io.Reader) error {
go d.startStreamDecoder(d.stream)
// If bytes buffer and < 1MB, do sync decoding anyway.
if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 {
if bb, ok := r.(byter); ok && bb.Len() < 1<<20 {
var bb2 byter
bb2 = bb
if debug {
println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
b := bb.Bytes()
b := bb2.Bytes()
var dst []byte
if cap(d.current.b) > 0 {
dst = d.current.b
@ -249,7 +260,7 @@ func (d *Decoder) drainOutput() {
// Any error encountered during the write is also returned.
func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
if d.stream == nil {
return 0, errors.New("no input has been initialized")
return 0, ErrDecoderNilInput
var n int64
for {
@ -323,19 +334,23 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
// Never preallocate moe than 1 GB up front.
if uint64(cap(dst)) < frame.FrameContentSize {
if cap(dst)-len(dst) < int(frame.FrameContentSize) {
dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
copy(dst2, dst)
dst = dst2
if cap(dst) == 0 {
// Allocate window size * 2 by default if nothing is provided and we didn't get frame content size.
size := frame.WindowSize * 2
// Allocate len(input) * 2 by default if nothing is provided
// and we didn't get frame content size.
size := len(input) * 2
// Cap to 1 MB.
if size > 1<<20 {
size = 1 << 20
if uint64(size) > d.o.maxDecodedSize {
size = int(d.o.maxDecodedSize)
dst = make([]byte, 0, size)
@ -0,0 +1,486 @@
// Copyright 2019+ Klaus Post. All rights reserved.
// License information can be found in the LICENSE file.
// Based on work by Yann Collet, released under BSD License.
package zstd
import (
const (
bestLongTableBits = 20 // Bits used in the long match table
bestLongTableSize = 1 << bestLongTableBits // Size of the table
// Note: Increasing the short table bits or making the hash shorter
// can actually lead to compression degradation since it will 'steal' more from the
// long match table and match offsets are quite big.
// This greatly depends on the type of input.
bestShortTableBits = 16 // Bits used in the short match table
bestShortTableSize = 1 << bestShortTableBits // Size of the table
// bestFastEncoder uses 2 tables, one for short matches (5 bytes) and one for long matches.
// The long match table contains the previous entry with the same hash,
// effectively making it a "chain" of length 2.
// When we find a long match we choose between the two values and select the longest.
// When we find a short match, after checking the long, we check if we can find a long at n+1
// and that it is longer (lazy matching).
type bestFastEncoder struct {
table [bestShortTableSize]prevEntry
longTable [bestLongTableSize]prevEntry
dictTable []prevEntry
dictLongTable []prevEntry
// Encode improves compression...
func (e *bestFastEncoder) Encode(blk *blockEnc, src []byte) {
const (
// Input margin is the number of bytes we read (8)
// and the maximum we will read ahead (2)
inputMargin = 8 + 4
minNonLiteralBlockSize = 16
// Protect against e.cur wraparound.
for e.cur >= bufferReset {
if len(e.hist) == 0 {
for i := range e.table[:] {
e.table[i] = prevEntry{}
for i := range e.longTable[:] {
e.longTable[i] = prevEntry{}
e.cur = e.maxMatchOff
// Shift down everything in the table that isn't already too far away.
minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff
for i := range e.table[:] {
v := e.table[i].offset
v2 := e.table[i].prev
if v < minOff {
v = 0
v2 = 0
} else {
v = v - e.cur + e.maxMatchOff
if v2 < minOff {
v2 = 0
} else {
v2 = v2 - e.cur + e.maxMatchOff
e.table[i] = prevEntry{
offset: v,
prev: v2,
for i := range e.longTable[:] {
v := e.longTable[i].offset
v2 := e.longTable[i].prev
if v < minOff {
v = 0
v2 = 0
} else {
v = v - e.cur + e.maxMatchOff
if v2 < minOff {
v2 = 0
} else {
v2 = v2 - e.cur + e.maxMatchOff
e.longTable[i] = prevEntry{
offset: v,
prev: v2,
e.cur = e.maxMatchOff
s := e.addBlock(src)
blk.size = len(src)
if len(src) < minNonLiteralBlockSize {
blk.extraLits = len(src)
blk.literals = blk.literals[:len(src)]
copy(blk.literals, src)
// Override src
src = e.hist
sLimit := int32(len(src)) - inputMargin
const kSearchStrength = 10
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := s
cv := load6432(src, s)
// Relative offsets
offset1 := int32(blk.recentOffsets[0])
offset2 := int32(blk.recentOffsets[1])
offset3 := int32(blk.recentOffsets[2])
addLiterals := func(s *seq, until int32) {
if until == nextEmit {
blk.literals = append(blk.literals, src[nextEmit:until]...)
s.litLen = uint32(until - nextEmit)
_ = addLiterals
if debug {
println("recent offsets:", blk.recentOffsets)
for {
// We allow the encoder to optionally turn off repeat offsets across blocks
canRepeat := len(blk.sequences) > 2
if debugAsserts && canRepeat && offset1 == 0 {
panic("offset0 was 0")
type match struct {
offset int32
s int32
length int32
rep int32
matchAt := func(offset int32, s int32, first uint32, rep int32) match {
if s-offset >= e.maxMatchOff || load3232(src, offset) != first {
return match{offset: offset, s: s}
return match{offset: offset, s: s, length: 4 + e.matchlen(s+4, offset+4, src), rep: rep}
bestOf := func(a, b match) match {
aScore := b.s - a.s + a.length
bScore := a.s - b.s + b.length
if a.rep < 0 {
aScore = aScore - int32(bits.Len32(uint32(a.offset)))/8
if b.rep < 0 {
bScore = bScore - int32(bits.Len32(uint32(b.offset)))/8
if aScore >= bScore {
return a
return b
const goodEnough = 100
nextHashL := hash8(cv, bestLongTableBits)
nextHashS := hash4x64(cv, bestShortTableBits)
candidateL := e.longTable[nextHashL]
candidateS := e.table[nextHashS]
best := bestOf(matchAt(candidateL.offset-e.cur, s, uint32(cv), -1), matchAt(candidateL.prev-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateS.offset-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateS.prev-e.cur, s, uint32(cv), -1))
if canRepeat && best.length < goodEnough {
best = bestOf(best, matchAt(s-offset1+1, s+1, uint32(cv>>8), 1))
best = bestOf(best, matchAt(s-offset2+1, s+1, uint32(cv>>8), 2))
best = bestOf(best, matchAt(s-offset3+1, s+1, uint32(cv>>8), 3))
if best.length > 0 {
best = bestOf(best, matchAt(s-offset1+3, s+3, uint32(cv>>24), 1))
best = bestOf(best, matchAt(s-offset2+3, s+3, uint32(cv>>24), 2))
best = bestOf(best, matchAt(s-offset3+3, s+3, uint32(cv>>24), 3))
// Load next and check...
e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: candidateL.offset}
e.table[nextHashS] = prevEntry{offset: s + e.cur, prev: candidateS.offset}
// Look far ahead, unless we have a really long match already...
if best.length < goodEnough {
// No match found, move forward on input, no need to check forward...
if best.length < 4 {
s += 1 + (s-nextEmit)>>(kSearchStrength-1)
if s >= sLimit {
break encodeLoop
cv = load6432(src, s)
candidateS = e.table[hash4x64(cv>>8, bestShortTableBits)]
cv = load6432(src, s)
cv2 := load6432(src, s+1)
candidateL = e.longTable[hash8(cv, bestLongTableBits)]
candidateL2 := e.longTable[hash8(cv2, bestLongTableBits)]
best = bestOf(best, matchAt(candidateS.offset-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateL.offset-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateL.prev-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateL2.offset-e.cur, s+1, uint32(cv2), -1))
best = bestOf(best, matchAt(candidateL2.prev-e.cur, s+1, uint32(cv2), -1))
// We have a match, we can store the forward value
if best.rep > 0 {
s = best.s
var seq seq
seq.matchLen = uint32(best.length - zstdMinMatch)
// We might be able to match backwards.
// Extend as long as we can.
start := best.s
// We end the search early, so we don't risk 0 literals
// and have to do special offset treatment.
startLimit := nextEmit + 1
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
repIndex := best.offset
for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 {
addLiterals(&seq, start)
// rep 0
seq.offset = uint32(best.rep)
if debugSequences {
println("repeat sequence", seq, "next s:", s)
blk.sequences = append(blk.sequences, seq)
// Index match start+1 (long) -> s - 1
index0 := s
s = best.s + best.length
nextEmit = s
if s >= sLimit {
if debug {
println("repeat ended", s, best.length)
break encodeLoop
// Index skipped...
off := index0 + e.cur
for index0 < s-1 {
cv0 := load6432(src, index0)
h0 := hash8(cv0, bestLongTableBits)
h1 := hash4x64(cv0, bestShortTableBits)
e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset}
e.table[h1] = prevEntry{offset: off, prev: e.table[h1].offset}
switch best.rep {
case 2:
offset1, offset2 = offset2, offset1
case 3:
offset1, offset2, offset3 = offset3, offset1, offset2
cv = load6432(src, s)
// A 4-byte match has been found. Update recent offsets.
// We'll later see if more than 4 bytes.
s = best.s
t := best.offset
offset1, offset2, offset3 = s-t, offset1, offset2
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
if debugAsserts && canRepeat && int(offset1) > len(src) {
panic("invalid offset")
// Extend the n-byte match as long as possible.
l := best.length
// Extend backwards
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength {
// Write our sequence
var seq seq
seq.litLen = uint32(s - nextEmit)
seq.matchLen = uint32(l - zstdMinMatch)
if seq.litLen > 0 {
blk.literals = append(blk.literals, src[nextEmit:s]...)
seq.offset = uint32(s-t) + 3
s += l
if debugSequences {
println("sequence", seq, "next s:", s)
blk.sequences = append(blk.sequences, seq)
nextEmit = s
if s >= sLimit {
break encodeLoop
// Index match start+1 (long) -> s - 1
index0 := s - l + 1
// every entry
for index0 < s-1 {
cv0 := load6432(src, index0)
h0 := hash8(cv0, bestLongTableBits)
h1 := hash4x64(cv0, bestShortTableBits)
off := index0 + e.cur
e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset}
e.table[h1] = prevEntry{offset: off, prev: e.table[h1].offset}
cv = load6432(src, s)
if !canRepeat {
// Check offset 2
for {
o2 := s - offset2
if load3232(src, o2) != uint32(cv) {
// Do regular search
// Store this, since we have it.
nextHashS := hash4x64(cv, bestShortTableBits)
nextHashL := hash8(cv, bestLongTableBits)
// We have at least 4 byte match.
// No need to check backwards. We come straight from a match
l := 4 + e.matchlen(s+4, o2+4, src)
e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: e.longTable[nextHashL].offset}
e.table[nextHashS] = prevEntry{offset: s + e.cur, prev: e.table[nextHashS].offset}
seq.matchLen = uint32(l) - zstdMinMatch
seq.litLen = 0
// Since litlen is always 0, this is offset 1.
seq.offset = 1
s += l
nextEmit = s
if debugSequences {
println("sequence", seq, "next s:", s)
blk.sequences = append(blk.sequences, seq)
// Swap offset 1 and 2.
offset1, offset2 = offset2, offset1
if s >= sLimit {
// Finished
break encodeLoop
cv = load6432(src, s)
if int(nextEmit) < len(src) {
blk.literals = append(blk.literals, src[nextEmit:]...)
blk.extraLits = len(src) - int(nextEmit)
blk.recentOffsets[0] = uint32(offset1)
blk.recentOffsets[1] = uint32(offset2)
blk.recentOffsets[2] = uint32(offset3)
if debug {
println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits)
// EncodeNoHist will encode a block with no history and no following blocks.
// Most notable difference is that src will not be copied for history and
// we do not need to check for max match length.
func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) {
e.Encode(blk, src)
// ResetDict will reset and set a dictionary if not nil
func (e *bestFastEncoder) Reset(d *dict, singleBlock bool) {
e.resetBase(d, singleBlock)
if d == nil {
// Init or copy dict table
if len(e.dictTable) != len(e.table) || d.id != e.lastDictID {
if len(e.dictTable) != len(e.table) {
e.dictTable = make([]prevEntry, len(e.table))
end := int32(len(d.content)) - 8 + e.maxMatchOff
for i := e.maxMatchOff; i < end; i += 4 {
const hashLog = bestShortTableBits
cv := load6432(d.content, i-e.maxMatchOff)
nextHash := hash4x64(cv, hashLog) // 0 -> 4
nextHash1 := hash4x64(cv>>8, hashLog) // 1 -> 5
nextHash2 := hash4x64(cv>>16, hashLog) // 2 -> 6
nextHash3 := hash4x64(cv>>24, hashLog) // 3 -> 7
e.dictTable[nextHash] = prevEntry{
prev: e.dictTable[nextHash].offset,
offset: i,
e.dictTable[nextHash1] = prevEntry{
prev: e.dictTable[nextHash1].offset,
offset: i + 1,
e.dictTable[nextHash2] = prevEntry{
prev: e.dictTable[nextHash2].offset,
offset: i + 2,
e.dictTable[nextHash3] = prevEntry{
prev: e.dictTable[nextHash3].offset,
offset: i + 3,
e.lastDictID = d.id
// Init or copy dict table
if len(e.dictLongTable) != len(e.longTable) || d.id != e.lastDictID {
if len(e.dictLongTable) != len(e.longTable) {
e.dictLongTable = make([]prevEntry, len(e.longTable))
if len(d.content) >= 8 {
cv := load6432(d.content, 0)
h := hash8(cv, bestLongTableBits)
e.dictLongTable[h] = prevEntry{
offset: e.maxMatchOff,
prev: e.dictLongTable[h].offset,
end := int32(len(d.content)) - 8 + e.maxMatchOff
off := 8 // First to read
for i := e.maxMatchOff + 1; i < end; i++ {
cv = cv>>8 | (uint64(d.content[off]) << 56)
h := hash8(cv, bestLongTableBits)
e.dictLongTable[h] = prevEntry{
offset: i,
prev: e.dictLongTable[h].offset,
e.lastDictID = d.id
// Reset table to initial state
copy(e.longTable[:], e.dictLongTable)
e.cur = e.maxMatchOff
// Reset table to initial state
copy(e.table[:], e.dictTable)
@ -78,7 +78,7 @@ func (e *fastEncoder) Encode(blk *blockEnc, src []byte) {
const hashLog = tableBits
// seems global, but would be nice to tweak.
const kSearchStrength = 8
const kSearchStrength = 7
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := s
@ -30,12 +30,13 @@ type encoderOptions struct {
func (o *encoderOptions) setDefault() {
*o = encoderOptions{
// use less ram: true for now, but may change.
concurrent: runtime.GOMAXPROCS(0),
crc: true,
single: nil,
blockSize: 1 << 16,
windowSize: 8 << 20,
level: SpeedDefault,
concurrent: runtime.GOMAXPROCS(0),
crc: true,
single: nil,
blockSize: 1 << 16,
windowSize: 8 << 20,
level: SpeedDefault,
allLitEntropy: true,
@ -46,6 +47,8 @@ func (o encoderOptions) encoder() encoder {
return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}}
case SpeedBetterCompression:
return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}
case SpeedBestCompression:
return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}
case SpeedFastest:
return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}
@ -142,20 +145,20 @@ const (
// By using this, notice that CPU usage may go up in the future.
// SpeedBestCompression will choose the best available compression option.
// This will offer the best compression no matter the CPU cost.
// speedLast should be kept as the last actual compression option.
// The is not for external usage, but is used to keep track of the valid options.
// SpeedBestCompression will choose the best available compression option.
// For now this is not implemented.
SpeedBestCompression = SpeedBetterCompression
// EncoderLevelFromString will convert a string representation of an encoding level back
// to a compression level. The compare is not case sensitive.
// If the string wasn't recognized, (false, SpeedDefault) will be returned.
func EncoderLevelFromString(s string) (bool, EncoderLevel) {
for l := EncoderLevel(speedNotSet + 1); l < speedLast; l++ {
for l := speedNotSet + 1; l < speedLast; l++ {
if strings.EqualFold(s, l.String()) {
return true, l
@ -172,7 +175,9 @@ func EncoderLevelFromZstd(level int) EncoderLevel {
return SpeedFastest
case level >= 3 && level < 6:
return SpeedDefault
case level > 5:
case level >= 6 && level < 10:
return SpeedBetterCompression
case level >= 10:
return SpeedBetterCompression
return SpeedDefault
@ -187,6 +192,8 @@ func (e EncoderLevel) String() string {
return "default"
case SpeedBetterCompression:
return "better"
case SpeedBestCompression:
return "best"
return "invalid"
@ -208,6 +215,8 @@ func WithEncoderLevel(l EncoderLevel) EOption {
o.windowSize = 8 << 20
case SpeedBetterCompression:
o.windowSize = 16 << 20
case SpeedBestCompression:
o.windowSize = 32 << 20
if !o.customALEntropy {
@ -181,11 +181,18 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
return fmt.Errorf("output (%d) bigger than max block size", size)
if size > cap(s.out) {
// Not enough size, will be extremely rarely triggered,
// Not enough size, which can happen under high volume block streaming conditions
// but could be if destination slice is too small for sync operations.
// We add maxBlockSize to the capacity.
s.out = append(s.out, make([]byte, maxBlockSize)...)
s.out = s.out[:len(s.out)-maxBlockSize]
// over-allocating here can create a large amount of GC pressure so we try to keep
// it as contained as possible
used := len(s.out) - startSize
addBytes := 256 + ll + ml + used>>2
// Clamp to max block size.
if used+addBytes > maxBlockSize {
addBytes = maxBlockSize - used
s.out = append(s.out, make([]byte, addBytes)...)
s.out = s.out[:len(s.out)-addBytes]
if ml > maxMatchLen {
return fmt.Errorf("match len (%d) bigger than max allowed length", ml)
@ -4,6 +4,7 @@
package zstd
import (
@ -73,6 +74,10 @@ var (
// ErrDecoderClosed will be returned if the Decoder was used after
// Close has been called.
ErrDecoderClosed = errors.New("decoder used after Close")
// ErrDecoderNilInput is returned when a nil Reader was provided
// and an operation other than Reset/DecodeAll/Close was attempted.
ErrDecoderNilInput = errors.New("nil input provided as reader")
func println(a ...interface{}) {
@ -142,3 +147,10 @@ func load64(b []byte, i int) uint64 {
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 |
uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56
type byter interface {
Bytes() []byte
Len() int
var _ byter = &bytes.Buffer{}
# Release Notes
## 0.3.0
* Removed revocation claims in favor of timestamp-based revocation maps in account and export claims.
@ -1,232 +0,0 @@
* Copyright 2018-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// NoLimit is used to indicate a limit field is unlimited in value.
const NoLimit = -1
// OperatorLimits are used to limit access by an account
type OperatorLimits struct {
Subs int64 `json:"subs,omitempty"` // Max number of subscriptions
Conn int64 `json:"conn,omitempty"` // Max number of active connections
LeafNodeConn int64 `json:"leaf,omitempty"` // Max number of active leaf node connections
Imports int64 `json:"imports,omitempty"` // Max number of imports
Exports int64 `json:"exports,omitempty"` // Max number of exports
Data int64 `json:"data,omitempty"` // Max number of bytes
Payload int64 `json:"payload,omitempty"` // Max message payload
WildcardExports bool `json:"wildcards,omitempty"` // Are wildcards allowed in exports
// IsEmpty returns true if all of the limits are 0/false.
func (o *OperatorLimits) IsEmpty() bool {
return *o == OperatorLimits{}
// IsUnlimited returns true if all limits are
func (o *OperatorLimits) IsUnlimited() bool {
return *o == OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true}
// Validate checks that the operator limits contain valid values
func (o *OperatorLimits) Validate(vr *ValidationResults) {
// negative values mean unlimited, so all numbers are valid
// Account holds account specific claims data
type Account struct {
Imports Imports `json:"imports,omitempty"`
Exports Exports `json:"exports,omitempty"`
Identities []Identity `json:"identity,omitempty"`
Limits OperatorLimits `json:"limits,omitempty"`
SigningKeys StringList `json:"signing_keys,omitempty"`
Revocations RevocationList `json:"revocations,omitempty"`
// Validate checks if the account is valid, based on the wrapper
func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) {
a.Imports.Validate(acct.Subject, vr)
for _, i := range a.Identities {
if !a.Limits.IsEmpty() && a.Limits.Imports >= 0 && int64(len(a.Imports)) > a.Limits.Imports {
vr.AddError("the account contains more imports than allowed by the operator")
// Check Imports and Exports for limit violations.
if a.Limits.Imports != NoLimit {
if int64(len(a.Imports)) > a.Limits.Imports {
vr.AddError("the account contains more imports than allowed by the operator")
if a.Limits.Exports != NoLimit {
if int64(len(a.Exports)) > a.Limits.Exports {
vr.AddError("the account contains more exports than allowed by the operator")
// Check for wildcard restrictions
if !a.Limits.WildcardExports {
for _, ex := range a.Exports {
if ex.Subject.HasWildCards() {
vr.AddError("the account contains wildcard exports that are not allowed by the operator")
for _, k := range a.SigningKeys {
if !nkeys.IsValidPublicAccountKey(k) {
vr.AddError("%s is not an account public key", k)
// AccountClaims defines the body of an account JWT
type AccountClaims struct {
Account `json:"nats,omitempty"`
// NewAccountClaims creates a new account JWT
func NewAccountClaims(subject string) *AccountClaims {
if subject == "" {
return nil
c := &AccountClaims{}
// Set to unlimited to start. We do it this way so we get compiler
// errors if we add to the OperatorLimits.
c.Limits = OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true}
c.Subject = subject
return c
// Encode converts account claims into a JWT string
func (a *AccountClaims) Encode(pair nkeys.KeyPair) (string, error) {
if !nkeys.IsValidPublicAccountKey(a.Subject) {
return "", errors.New("expected subject to be account public key")
a.ClaimsData.Type = AccountClaim
return a.ClaimsData.Encode(pair, a)
// DecodeAccountClaims decodes account claims from a JWT string
func DecodeAccountClaims(token string) (*AccountClaims, error) {
v := AccountClaims{}
if err := Decode(token, &v); err != nil {
return nil, err
return &v, nil
func (a *AccountClaims) String() string {
return a.ClaimsData.String(a)
// Payload pulls the accounts specific payload out of the claims
func (a *AccountClaims) Payload() interface{} {
return &a.Account
// Validate checks the accounts contents
func (a *AccountClaims) Validate(vr *ValidationResults) {
a.Account.Validate(a, vr)
if nkeys.IsValidPublicAccountKey(a.ClaimsData.Issuer) {
if len(a.Identities) > 0 {
vr.AddWarning("self-signed account JWTs shouldn't contain identity proofs")
if !a.Limits.IsEmpty() {
vr.AddWarning("self-signed account JWTs shouldn't contain operator limits")
// ExpectedPrefixes defines the types that can encode an account jwt, account and operator
func (a *AccountClaims) ExpectedPrefixes() []nkeys.PrefixByte {
return []nkeys.PrefixByte{nkeys.PrefixByteAccount, nkeys.PrefixByteOperator}
// Claims returns the accounts claims data
func (a *AccountClaims) Claims() *ClaimsData {
return &a.ClaimsData
// DidSign checks the claims against the account's public key and its signing keys
func (a *AccountClaims) DidSign(op Claims) bool {
if op != nil {
issuer := op.Claims().Issuer
if issuer == a.Subject {
return true
return a.SigningKeys.Contains(issuer)
return false
// Revoke enters a revocation by publickey using time.Now().
func (a *AccountClaims) Revoke(pubKey string) {
a.RevokeAt(pubKey, time.Now())
// RevokeAt enters a revocation by public key and timestamp into this account
// This will revoke all jwt issued for pubKey, prior to timestamp
// If there is already a revocation for this public key that is newer, it is kept.
func (a *AccountClaims) RevokeAt(pubKey string, timestamp time.Time) {
if a.Revocations == nil {
a.Revocations = RevocationList{}
a.Revocations.Revoke(pubKey, timestamp)
// ClearRevocation removes any revocation for the public key
func (a *AccountClaims) ClearRevocation(pubKey string) {
// IsRevokedAt checks if the public key is in the revoked list with a timestamp later than the one passed in.
// Generally this method is called with the subject and issue time of the jwt to be tested.
// DO NOT pass time.Now(), it will not produce a stable/expected response.
func (a *AccountClaims) IsRevokedAt(pubKey string, timestamp time.Time) bool {
return a.Revocations.IsRevoked(pubKey, timestamp)
// IsRevoked does not perform a valid check. Use IsRevokedAt instead.
func (a *AccountClaims) IsRevoked(_ string) bool {
return true
// IsClaimRevoked checks if the account revoked the claim passed in.
// Invalid claims (nil, no Subject or IssuedAt) will return true.
func (a *AccountClaims) IsClaimRevoked(claim *UserClaims) bool {
if claim == nil || claim.IssuedAt == 0 || claim.Subject == "" {
return true
return a.Revocations.IsRevoked(claim.Subject, time.Unix(claim.IssuedAt, 0))
@ -1,166 +0,0 @@
* Copyright 2018 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// Activation defines the custom parts of an activation claim
type Activation struct {
ImportSubject Subject `json:"subject,omitempty"`
ImportType ExportType `json:"type,omitempty"`
// IsService returns true if an Activation is for a service
func (a *Activation) IsService() bool {
return a.ImportType == Service
// IsStream returns true if an Activation is for a stream
func (a *Activation) IsStream() bool {
return a.ImportType == Stream
// Validate checks the exports and limits in an activation JWT
func (a *Activation) Validate(vr *ValidationResults) {
if !a.IsService() && !a.IsStream() {
vr.AddError("invalid export type: %q", a.ImportType)
if a.IsService() {
if a.ImportSubject.HasWildCards() {
vr.AddError("services cannot have wildcard subject: %q", a.ImportSubject)
// ActivationClaims holds the data specific to an activation JWT
type ActivationClaims struct {
Activation `json:"nats,omitempty"`
// IssuerAccount stores the public key for the account the issuer represents.
// When set, the claim was issued by a signing key.
IssuerAccount string `json:"issuer_account,omitempty"`
// NewActivationClaims creates a new activation claim with the provided sub
func NewActivationClaims(subject string) *ActivationClaims {
if subject == "" {
return nil
ac := &ActivationClaims{}
ac.Subject = subject
return ac
// Encode turns an activation claim into a JWT strimg
func (a *ActivationClaims) Encode(pair nkeys.KeyPair) (string, error) {
if !nkeys.IsValidPublicAccountKey(a.ClaimsData.Subject) {
return "", errors.New("expected subject to be an account")
a.ClaimsData.Type = ActivationClaim
return a.ClaimsData.Encode(pair, a)
// DecodeActivationClaims tries to create an activation claim from a JWT string
func DecodeActivationClaims(token string) (*ActivationClaims, error) {
v := ActivationClaims{}
if err := Decode(token, &v); err != nil {
return nil, err
return &v, nil
// Payload returns the activation specific part of the JWT
func (a *ActivationClaims) Payload() interface{} {
return a.Activation
// Validate checks the claims
func (a *ActivationClaims) Validate(vr *ValidationResults) {
if a.IssuerAccount != "" && !nkeys.IsValidPublicAccountKey(a.IssuerAccount) {
vr.AddError("account_id is not an account public key")
// ExpectedPrefixes defines the types that can sign an activation jwt, account and oeprator
func (a *ActivationClaims) ExpectedPrefixes() []nkeys.PrefixByte {
return []nkeys.PrefixByte{nkeys.PrefixByteAccount, nkeys.PrefixByteOperator}
// Claims returns the generic part of the JWT
func (a *ActivationClaims) Claims() *ClaimsData {
return &a.ClaimsData
func (a *ActivationClaims) String() string {
return a.ClaimsData.String(a)
// HashID returns a hash of the claims that can be used to identify it.
// The hash is calculated by creating a string with
// issuerPubKey.subjectPubKey.<subject> and constructing the sha-256 hash and base32 encoding that.
// <subject> is the exported subject, minus any wildcards, so foo.* becomes foo.
// the one special case is that if the export start with "*" or is ">" the <subject> "_"
func (a *ActivationClaims) HashID() (string, error) {
if a.Issuer == "" || a.Subject == "" || a.ImportSubject == "" {
return "", fmt.Errorf("not enough data in the activaion claims to create a hash")
subject := cleanSubject(string(a.ImportSubject))
base := fmt.Sprintf("%s.%s.%s", a.Issuer, a.Subject, subject)
h := sha256.New()
sha := h.Sum(nil)
hash := base32.StdEncoding.EncodeToString(sha)
return hash, nil
func cleanSubject(subject string) string {
split := strings.Split(subject, ".")
cleaned := ""
for i, tok := range split {
if tok == "*" || tok == ">" {
if i == 0 {
cleaned = "_"
cleaned = strings.Join(split[:i], ".")
if cleaned == "" {
cleaned = subject
return cleaned
@ -1,302 +0,0 @@
* Copyright 2018-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// ClaimType is used to indicate the type of JWT being stored in a Claim
type ClaimType string
const (
// AccountClaim is the type of an Account JWT
AccountClaim = "account"
//ActivationClaim is the type of an activation JWT
ActivationClaim = "activation"
//UserClaim is the type of an user JWT
UserClaim = "user"
//ServerClaim is the type of an server JWT
ServerClaim = "server"
//ClusterClaim is the type of an cluster JWT
ClusterClaim = "cluster"
//OperatorClaim is the type of an operator JWT
OperatorClaim = "operator"
// Claims is a JWT claims
type Claims interface {
Claims() *ClaimsData
Encode(kp nkeys.KeyPair) (string, error)
ExpectedPrefixes() []nkeys.PrefixByte
Payload() interface{}
String() string
Validate(vr *ValidationResults)
Verify(payload string, sig []byte) bool
// ClaimsData is the base struct for all claims
type ClaimsData struct {
Audience string `json:"aud,omitempty"`
Expires int64 `json:"exp,omitempty"`
ID string `json:"jti,omitempty"`
IssuedAt int64 `json:"iat,omitempty"`
Issuer string `json:"iss,omitempty"`
Name string `json:"name,omitempty"`
NotBefore int64 `json:"nbf,omitempty"`
Subject string `json:"sub,omitempty"`
Tags TagList `json:"tags,omitempty"`
Type ClaimType `json:"type,omitempty"`
// Prefix holds the prefix byte for an NKey
type Prefix struct {
func encodeToString(d []byte) string {
return base64.RawURLEncoding.EncodeToString(d)
func decodeString(s string) ([]byte, error) {
return base64.RawURLEncoding.DecodeString(s)
func serialize(v interface{}) (string, error) {
j, err := json.Marshal(v)
if err != nil {
return "", err
return encodeToString(j), nil
func (c *ClaimsData) doEncode(header *Header, kp nkeys.KeyPair, claim Claims) (string, error) {
if header == nil {
return "", errors.New("header is required")
if kp == nil {
return "", errors.New("keypair is required")
if c.Subject == "" {
return "", errors.New("subject is not set")
h, err := serialize(header)
if err != nil {
return "", err
issuerBytes, err := kp.PublicKey()
if err != nil {
return "", err
prefixes := claim.ExpectedPrefixes()
if prefixes != nil {
ok := false
for _, p := range prefixes {
switch p {
case nkeys.PrefixByteAccount:
if nkeys.IsValidPublicAccountKey(issuerBytes) {
ok = true
case nkeys.PrefixByteOperator:
if nkeys.IsValidPublicOperatorKey(issuerBytes) {
ok = true
case nkeys.PrefixByteServer:
if nkeys.IsValidPublicServerKey(issuerBytes) {
ok = true
case nkeys.PrefixByteCluster:
if nkeys.IsValidPublicClusterKey(issuerBytes) {
ok = true
case nkeys.PrefixByteUser:
if nkeys.IsValidPublicUserKey(issuerBytes) {
ok = true
if !ok {
return "", fmt.Errorf("unable to validate expected prefixes - %v", prefixes)
c.Issuer = string(issuerBytes)
c.IssuedAt = time.Now().UTC().Unix()
c.ID, err = c.hash()
if err != nil {
return "", err
payload, err := serialize(claim)
if err != nil {
return "", err
sig, err := kp.Sign([]byte(payload))
if err != nil {
return "", err
eSig := encodeToString(sig)
return fmt.Sprintf("%s.%s.%s", h, payload, eSig), nil
func (c *ClaimsData) hash() (string, error) {
j, err := json.Marshal(c)
if err != nil {
return "", err
h := sha512.New512_256()
return base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(h.Sum(nil)), nil
// Encode encodes a claim into a JWT token. The claim is signed with the
// provided nkey's private key
func (c *ClaimsData) Encode(kp nkeys.KeyPair, payload Claims) (string, error) {
return c.doEncode(&Header{TokenTypeJwt, AlgorithmNkey}, kp, payload)
// Returns a JSON representation of the claim
func (c *ClaimsData) String(claim interface{}) string {
j, err := json.MarshalIndent(claim, "", " ")
if err != nil {
return ""
return string(j)
func parseClaims(s string, target Claims) error {
h, err := decodeString(s)
if err != nil {
return err
return json.Unmarshal(h, &target)
// Verify verifies that the encoded payload was signed by the
// provided public key. Verify is called automatically with
// the claims portion of the token and the public key in the claim.
// Client code need to insure that the public key in the
// claim is trusted.
func (c *ClaimsData) Verify(payload string, sig []byte) bool {
// decode the public key
kp, err := nkeys.FromPublicKey(c.Issuer)
if err != nil {
return false
if err := kp.Verify([]byte(payload), sig); err != nil {
return false
return true
// Validate checks a claim to make sure it is valid. Validity checks
// include expiration and not before constraints.
func (c *ClaimsData) Validate(vr *ValidationResults) {
now := time.Now().UTC().Unix()
if c.Expires > 0 && now > c.Expires {
vr.AddTimeCheck("claim is expired")
if c.NotBefore > 0 && c.NotBefore > now {
vr.AddTimeCheck("claim is not yet valid")
// IsSelfSigned returns true if the claims issuer is the subject
func (c *ClaimsData) IsSelfSigned() bool {
return c.Issuer == c.Subject
// Decode takes a JWT string decodes it and validates it
// and return the embedded Claims. If the token header
// doesn't match the expected algorithm, or the claim is
// not valid or verification fails an error is returned.
func Decode(token string, target Claims) error {
// must have 3 chunks
chunks := strings.Split(token, ".")
if len(chunks) != 3 {
return errors.New("expected 3 chunks")
_, err := parseHeaders(chunks[0])
if err != nil {
return err
if err := parseClaims(chunks[1], target); err != nil {
return err
sig, err := decodeString(chunks[2])
if err != nil {
return err
if !target.Verify(chunks[1], sig) {
return errors.New("claim failed signature verification")
prefixes := target.ExpectedPrefixes()
if prefixes != nil {
ok := false
issuer := target.Claims().Issuer
for _, p := range prefixes {
switch p {
case nkeys.PrefixByteAccount:
if nkeys.IsValidPublicAccountKey(issuer) {
ok = true
case nkeys.PrefixByteOperator:
if nkeys.IsValidPublicOperatorKey(issuer) {
ok = true
case nkeys.PrefixByteServer:
if nkeys.IsValidPublicServerKey(issuer) {
ok = true
case nkeys.PrefixByteCluster:
if nkeys.IsValidPublicClusterKey(issuer) {
ok = true
case nkeys.PrefixByteUser:
if nkeys.IsValidPublicUserKey(issuer) {
ok = true
if !ok {
return fmt.Errorf("unable to validate expected prefixes - %v", prefixes)
return nil
@ -1,94 +0,0 @@
* Copyright 2018 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// Cluster stores the cluster specific elements of a cluster JWT
type Cluster struct {
Trust []string `json:"identity,omitempty"`
Accounts []string `json:"accts,omitempty"`
AccountURL string `json:"accturl,omitempty"`
OperatorURL string `json:"opurl,omitempty"`
// Validate checks the cluster and permissions for a cluster JWT
func (c *Cluster) Validate(vr *ValidationResults) {
// fixme validate cluster data
// ClusterClaims defines the data in a cluster JWT
type ClusterClaims struct {
Cluster `json:"nats,omitempty"`
// NewClusterClaims creates a new cluster JWT with the specified subject/public key
func NewClusterClaims(subject string) *ClusterClaims {
if subject == "" {
return nil
c := &ClusterClaims{}
c.Subject = subject
return c
// Encode tries to turn the cluster claims into a JWT string
func (c *ClusterClaims) Encode(pair nkeys.KeyPair) (string, error) {
if !nkeys.IsValidPublicClusterKey(c.Subject) {
return "", errors.New("expected subject to be a cluster public key")
c.ClaimsData.Type = ClusterClaim
return c.ClaimsData.Encode(pair, c)
// DecodeClusterClaims tries to parse cluster claims from a JWT string
func DecodeClusterClaims(token string) (*ClusterClaims, error) {
v := ClusterClaims{}
if err := Decode(token, &v); err != nil {
return nil, err
return &v, nil
func (c *ClusterClaims) String() string {
return c.ClaimsData.String(c)
// Payload returns the cluster specific data
func (c *ClusterClaims) Payload() interface{} {
return &c.Cluster
// Validate checks the generic and cluster data in the cluster claims
func (c *ClusterClaims) Validate(vr *ValidationResults) {
// ExpectedPrefixes defines the types that can encode a cluster JWT, operator or cluster
func (c *ClusterClaims) ExpectedPrefixes() []nkeys.PrefixByte {
return []nkeys.PrefixByte{nkeys.PrefixByteOperator, nkeys.PrefixByteCluster}
// Claims returns the generic data
func (c *ClusterClaims) Claims() *ClaimsData {
return &c.ClaimsData
@ -1,203 +0,0 @@
package jwt
import (
// DecorateJWT returns a decorated JWT that describes the kind of JWT
func DecorateJWT(jwtString string) ([]byte, error) {
gc, err := DecodeGeneric(jwtString)
if err != nil {
return nil, err
return formatJwt(string(gc.Type), jwtString)
func formatJwt(kind string, jwtString string) ([]byte, error) {
templ := `-----BEGIN NATS %s JWT-----
------END NATS %s JWT------
w := bytes.NewBuffer(nil)
kind = strings.ToUpper(kind)
_, err := fmt.Fprintf(w, templ, kind, jwtString, kind)
if err != nil {
return nil, err
return w.Bytes(), nil
// DecorateSeed takes a seed and returns a string that wraps
// the seed in the form:
// ************************* IMPORTANT *************************
// NKEY Seed printed below can be used sign and prove identity.
// NKEYs are sensitive and should be treated as secrets.
// ------END USER NKEY SEED------
func DecorateSeed(seed []byte) ([]byte, error) {
w := bytes.NewBuffer(nil)
ts := bytes.TrimSpace(seed)
pre := string(ts[0:2])
kind := ""
switch pre {
case "SU":
kind = "USER"
case "SA":
kind = "ACCOUNT"
case "SO":
kind = "OPERATOR"
return nil, errors.New("seed is not an operator, account or user seed")
header := `************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN %s NKEY SEED-----
_, err := fmt.Fprintf(w, header, kind)
if err != nil {
return nil, err
footer := `
------END %s NKEY SEED------
_, err = fmt.Fprintf(w, footer, kind)
if err != nil {
return nil, err
return w.Bytes(), nil
var userConfigRE = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}\r?\n))`)
// An user config file looks like this:
// -----BEGIN NATS USER JWT-----
// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
// ------END NATS USER JWT------
// ************************* IMPORTANT *************************
// NKEY Seed printed below can be used sign and prove identity.
// NKEYs are sensitive and should be treated as secrets.
// ------END USER NKEY SEED------
// FormatUserConfig returns a decorated file with a decorated JWT and decorated seed
func FormatUserConfig(jwtString string, seed []byte) ([]byte, error) {
gc, err := DecodeGeneric(jwtString)
if err != nil {
return nil, err
if gc.Type != UserClaim {
return nil, fmt.Errorf("%q cannot be serialized as a user config", string(gc.Type))
w := bytes.NewBuffer(nil)
jd, err := formatJwt(string(gc.Type), jwtString)
if err != nil {
return nil, err
_, err = w.Write(jd)
if err != nil {
return nil, err
if !bytes.HasPrefix(bytes.TrimSpace(seed), []byte("SU")) {
return nil, fmt.Errorf("nkey seed is not an user seed")
d, err := DecorateSeed(seed)
if err != nil {
return nil, err
_, err = w.Write(d)
if err != nil {
return nil, err
return w.Bytes(), nil
// ParseDecoratedJWT takes a creds file and returns the JWT portion.
func ParseDecoratedJWT(contents []byte) (string, error) {
items := userConfigRE.FindAllSubmatch(contents, -1)
if len(items) == 0 {
return string(contents), nil
// First result should be the user JWT.
// We copy here so that if the file contained a seed file too we wipe appropriately.
raw := items[0][1]
tmp := make([]byte, len(raw))
copy(tmp, raw)
return string(tmp), nil
// ParseDecoratedNKey takes a creds file, finds the NKey portion and creates a
// key pair from it.
func ParseDecoratedNKey(contents []byte) (nkeys.KeyPair, error) {
var seed []byte
items := userConfigRE.FindAllSubmatch(contents, -1)
if len(items) > 1 {
seed = items[1][1]
} else {
lines := bytes.Split(contents, []byte("\n"))
for _, line := range lines {
if bytes.HasPrefix(bytes.TrimSpace(line), []byte("SO")) ||
bytes.HasPrefix(bytes.TrimSpace(line), []byte("SA")) ||
bytes.HasPrefix(bytes.TrimSpace(line), []byte("SU")) {
seed = line
if seed == nil {
return nil, errors.New("no nkey seed found")
if !bytes.HasPrefix(seed, []byte("SO")) &&
!bytes.HasPrefix(seed, []byte("SA")) &&
!bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain a seed nkey")
kp, err := nkeys.FromSeed(seed)
if err != nil {
return nil, err
return kp, nil
// ParseDecoratedUserNKey takes a creds file, finds the NKey portion and creates a
// key pair from it. Similar to ParseDecoratedNKey but fails for non-user keys.
func ParseDecoratedUserNKey(contents []byte) (nkeys.KeyPair, error) {
nk, err := ParseDecoratedNKey(contents)
if err != nil {
return nil, err
seed, err := nk.Seed()
if err != nil {
return nil, err
if !bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain an user seed nkey")
kp, err := nkeys.FromSeed(seed)
if err != nil {
return nil, err
return kp, nil
@ -1,244 +0,0 @@
* Copyright 2018-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// ResponseType is used to store an export response type
type ResponseType string
const (
// ResponseTypeSingleton is used for a service that sends a single response only
ResponseTypeSingleton = "Singleton"
// ResponseTypeStream is used for a service that will send multiple responses
ResponseTypeStream = "Stream"
// ResponseTypeChunked is used for a service that sends a single response in chunks (so not quite a stream)
ResponseTypeChunked = "Chunked"
// ServiceLatency is used when observing and exported service for
// latency measurements.
// Sampling 1-100, represents sampling rate, defaults to 100.
// Results is the subject where the latency metrics are published.
// A metric will be defined by the nats-server's ServiceLatency. Time durations
// are in nanoseconds.
// see https://github.com/nats-io/nats-server/blob/master/server/accounts.go#L524
// e.g.
// {
// "app": "dlc22",
// "start": "2019-09-16T21:46:23.636869585-07:00",
// "svc": 219732,
// "nats": {
// "req": 320415,
// "resp": 228268,
// "sys": 0
// },
// "total": 768415
// }
type ServiceLatency struct {
Sampling int `json:"sampling,omitempty"`
Results Subject `json:"results"`
func (sl *ServiceLatency) Validate(vr *ValidationResults) {
if sl.Sampling < 1 || sl.Sampling > 100 {
vr.AddError("sampling percentage needs to be between 1-100")
if sl.Results.HasWildCards() {
vr.AddError("results subject can not contain wildcards")
// Export represents a single export
type Export struct {
Name string `json:"name,omitempty"`
Subject Subject `json:"subject,omitempty"`
Type ExportType `json:"type,omitempty"`
TokenReq bool `json:"token_req,omitempty"`
Revocations RevocationList `json:"revocations,omitempty"`
ResponseType ResponseType `json:"response_type,omitempty"`
Latency *ServiceLatency `json:"service_latency,omitempty"`
// IsService returns true if an export is for a service
func (e *Export) IsService() bool {
return e.Type == Service
// IsStream returns true if an export is for a stream
func (e *Export) IsStream() bool {
return e.Type == Stream
// IsSingleResponse returns true if an export has a single response
// or no resopnse type is set, also checks that the type is service
func (e *Export) IsSingleResponse() bool {
return e.Type == Service && (e.ResponseType == ResponseTypeSingleton || e.ResponseType == "")
// IsChunkedResponse returns true if an export has a chunked response
func (e *Export) IsChunkedResponse() bool {
return e.Type == Service && e.ResponseType == ResponseTypeChunked
// IsStreamResponse returns true if an export has a chunked response
func (e *Export) IsStreamResponse() bool {
return e.Type == Service && e.ResponseType == ResponseTypeStream
// Validate appends validation issues to the passed in results list
func (e *Export) Validate(vr *ValidationResults) {
if e == nil {
vr.AddError("null export is not allowed")
if !e.IsService() && !e.IsStream() {
vr.AddError("invalid export type: %q", e.Type)
if e.IsService() && !e.IsSingleResponse() && !e.IsChunkedResponse() && !e.IsStreamResponse() {
vr.AddError("invalid response type for service: %q", e.ResponseType)
if e.IsStream() && e.ResponseType != "" {
vr.AddError("invalid response type for stream: %q", e.ResponseType)
if e.Latency != nil {
if !e.IsService() {
vr.AddError("latency tracking only permitted for services")
// Revoke enters a revocation by publickey using time.Now().
func (e *Export) Revoke(pubKey string) {
e.RevokeAt(pubKey, time.Now())
// RevokeAt enters a revocation by publickey and timestamp into this export
// If there is already a revocation for this public key that is newer, it is kept.
func (e *Export) RevokeAt(pubKey string, timestamp time.Time) {
if e.Revocations == nil {
e.Revocations = RevocationList{}
e.Revocations.Revoke(pubKey, timestamp)
// ClearRevocation removes any revocation for the public key
func (e *Export) ClearRevocation(pubKey string) {
// IsRevokedAt checks if the public key is in the revoked list with a timestamp later than the one passed in.
// Generally this method is called with the subject and issue time of the jwt to be tested.
// DO NOT pass time.Now(), it will not produce a stable/expected response.
func (e *Export) IsRevokedAt(pubKey string, timestamp time.Time) bool {
return e.Revocations.IsRevoked(pubKey, timestamp)
// IsRevoked does not perform a valid check. Use IsRevokedAt instead.
func (e *Export) IsRevoked(_ string) bool {
return true
// Exports is a slice of exports
type Exports []*Export
// Add appends exports to the list
func (e *Exports) Add(i ...*Export) {
*e = append(*e, i...)
func isContainedIn(kind ExportType, subjects []Subject, vr *ValidationResults) {
m := make(map[string]string)
for i, ns := range subjects {
for j, s := range subjects {
if i == j {
if ns.IsContainedIn(s) {
str := string(s)
_, ok := m[str]
if !ok {
m[str] = string(ns)
if len(m) != 0 {
for k, v := range m {
var vi ValidationIssue
vi.Blocking = true
vi.Description = fmt.Sprintf("%s export subject %q already exports %q", kind, k, v)
// Validate calls validate on all of the exports
func (e *Exports) Validate(vr *ValidationResults) error {
var serviceSubjects []Subject
var streamSubjects []Subject
for _, v := range *e {
if v == nil {
vr.AddError("null export is not allowed")
if v.IsService() {
serviceSubjects = append(serviceSubjects, v.Subject)
} else {
streamSubjects = append(streamSubjects, v.Subject)
isContainedIn(Service, serviceSubjects, vr)
isContainedIn(Stream, streamSubjects, vr)
return nil
// HasExportContainingSubject checks if the export list has an export with the provided subject
func (e *Exports) HasExportContainingSubject(subject Subject) bool {
for _, s := range *e {
if subject.IsContainedIn(s.Subject) {
return true
return false
func (e Exports) Len() int {
return len(e)
func (e Exports) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
func (e Exports) Less(i, j int) bool {
return e[i].Subject < e[j].Subject
* Copyright 2018-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
const (
// Version is semantic version.
Version = "1.1.0"
// TokenTypeJwt is the JWT token type supported JWT tokens
// encoded and decoded by this library
TokenTypeJwt = "jwt"
// AlgorithmNkey is the algorithm supported by JWT tokens
// encoded and decoded by this library
AlgorithmNkey = "ed25519"
// Header is a JWT Jose Header
type Header struct {
Type string `json:"typ"`
Algorithm string `json:"alg"`
// Parses a header JWT token
func parseHeaders(s string) (*Header, error) {
h, err := decodeString(s)
if err != nil {
return nil, err
header := Header{}
if err := json.Unmarshal(h, &header); err != nil {
return nil, err
if err := header.Valid(); err != nil {
return nil, err
return &header, nil
// Valid validates the Header. It returns nil if the Header is
// a JWT header, and the algorithm used is the NKEY algorithm.
func (h *Header) Valid() error {
if TokenTypeJwt != strings.ToLower(h.Type) {
return fmt.Errorf("not supported type %q", h.Type)
if AlgorithmNkey != strings.ToLower(h.Algorithm) {
return fmt.Errorf("unexpected %q algorithm", h.Algorithm)
return nil
* Copyright 2018 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// Operator specific claims
type Operator struct {
// Slice of real identies (like websites) that can be used to identify the operator.
Identities []Identity `json:"identity,omitempty"`
// Slice of other operator NKeys that can be used to sign on behalf of the main
// operator identity.
SigningKeys StringList `json:"signing_keys,omitempty"`
// AccountServerURL is a partial URL like "https://host.domain.org:<port>/jwt/v1"
// tools will use the prefix and build queries by appending /accounts/<account_id>
// or /operator to the path provided. Note this assumes that the account server
// can handle requests in a nats-account-server compatible way. See
// https://github.com/nats-io/nats-account-server.
AccountServerURL string `json:"account_server_url,omitempty"`
// A list of NATS urls (tls://host:port) where tools can connect to the server
// using proper credentials.
OperatorServiceURLs StringList `json:"operator_service_urls,omitempty"`
// Identity of the system account
SystemAccount string `json:"system_account,omitempty"`
// Validate checks the validity of the operators contents
func (o *Operator) Validate(vr *ValidationResults) {
if err := o.validateAccountServerURL(); err != nil {
for _, v := range o.validateOperatorServiceURLs() {
if v != nil {
for _, i := range o.Identities {
for _, k := range o.SigningKeys {
if !nkeys.IsValidPublicOperatorKey(k) {
vr.AddError("%s is not an operator public key", k)
if o.SystemAccount != "" {
if !nkeys.IsValidPublicAccountKey(o.SystemAccount) {
vr.AddError("%s is not an account public key", o.SystemAccount)
func (o *Operator) validateAccountServerURL() error {
if o.AccountServerURL != "" {
// We don't care what kind of URL it is so long as it parses
// and has a protocol. The account server may impose additional
// constraints on the type of URLs that it is able to notify to
u, err := url.Parse(o.AccountServerURL)
if err != nil {
return fmt.Errorf("error parsing account server url: %v", err)
if u.Scheme == "" {
return fmt.Errorf("account server url %q requires a protocol", o.AccountServerURL)
return nil
// ValidateOperatorServiceURL returns an error if the URL is not a valid NATS or TLS url.
func ValidateOperatorServiceURL(v string) error {
// should be possible for the service url to not be expressed
if v == "" {
return nil
u, err := url.Parse(v)
if err != nil {
return fmt.Errorf("error parsing operator service url %q: %v", v, err)
if u.User != nil {
return fmt.Errorf("operator service url %q - credentials are not supported", v)
if u.Path != "" {
return fmt.Errorf("operator service url %q - paths are not supported", v)
lcs := strings.ToLower(u.Scheme)
switch lcs {
case "nats":
return nil
case "tls":
return nil
return fmt.Errorf("operator service url %q - protocol not supported (only 'nats' or 'tls' only)", v)
func (o *Operator) validateOperatorServiceURLs() []error {
var errors []error
for _, v := range o.OperatorServiceURLs {
if v != "" {
if err := ValidateOperatorServiceURL(v); err != nil {
errors = append(errors, err)
return errors
// OperatorClaims define the data for an operator JWT
type OperatorClaims struct {
Operator `json:"nats,omitempty"`
// NewOperatorClaims creates a new operator claim with the specified subject, which should be an operator public key
func NewOperatorClaims(subject string) *OperatorClaims {
if subject == "" {
return nil
c := &OperatorClaims{}
c.Subject = subject
return c
// DidSign checks the claims against the operator's public key and its signing keys
func (oc *OperatorClaims) DidSign(op Claims) bool {
if op == nil {
return false
issuer := op.Claims().Issuer
if issuer == oc.Subject {
return true
return oc.SigningKeys.Contains(issuer)
// Deprecated: AddSigningKey, use claim.SigningKeys.Add()
func (oc *OperatorClaims) AddSigningKey(pk string) {
// Encode the claims into a JWT string
func (oc *OperatorClaims) Encode(pair nkeys.KeyPair) (string, error) {
if !nkeys.IsValidPublicOperatorKey(oc.Subject) {
return "", errors.New("expected subject to be an operator public key")
err := oc.validateAccountServerURL()
if err != nil {
return "", err
oc.ClaimsData.Type = OperatorClaim
return oc.ClaimsData.Encode(pair, oc)
// DecodeOperatorClaims tries to create an operator claims from a JWt string
func DecodeOperatorClaims(token string) (*OperatorClaims, error) {
v := OperatorClaims{}
if err := Decode(token, &v); err != nil {
return nil, err
return &v, nil
func (oc *OperatorClaims) String() string {
return oc.ClaimsData.String(oc)
// Payload returns the operator specific data for an operator JWT
func (oc *OperatorClaims) Payload() interface{} {
return &oc.Operator
// Validate the contents of the claims
func (oc *OperatorClaims) Validate(vr *ValidationResults) {
// ExpectedPrefixes defines the nkey types that can sign operator claims, operator
func (oc *OperatorClaims) ExpectedPrefixes() []nkeys.PrefixByte {
return []nkeys.PrefixByte{nkeys.PrefixByteOperator}
// Claims returns the generic claims data
func (oc *OperatorClaims) Claims() *ClaimsData {
return &oc.ClaimsData
* Copyright 2018-2019 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package jwt
import (
// User defines the user specific data in a user JWT
type User struct {
BearerToken bool `json:"bearer_token,omitempty"`
// Validate checks the permissions and limits in a User jwt
func (u *User) Validate(vr *ValidationResults) {
// When BearerToken is true server will ignore any nonce-signing verification
// UserClaims defines a user JWT
type UserClaims struct {
User `json:"nats,omitempty"`
// IssuerAccount stores the public key for the account the issuer represents.
// When set, the claim was issued by a signing key.
IssuerAccount string `json:"issuer_account,omitempty"`
// NewUserClaims creates a user JWT with the specific subject/public key
func NewUserClaims(subject string) *UserClaims {
if subject == "" {
return nil
c := &UserClaims{}
c.Subject = subject
return c
// Encode tries to turn the user claims into a JWT string
func (u *UserClaims) Encode(pair nkeys.KeyPair) (string, error) {
if !nkeys.IsValidPublicUserKey(u.Subject) {
return "", errors.New("expected subject to be user public key")
u.ClaimsData.Type = UserClaim
return u.ClaimsData.Encode(pair, u)
// DecodeUserClaims tries to parse a user claims from a JWT string
func DecodeUserClaims(token string) (*UserClaims, error) {
v := UserClaims{}
if err := Decode(token, &v); err != nil {
return nil, err
return &v, nil
// Validate checks the generic and specific parts of the user jwt
func (u *UserClaims) Validate(vr *ValidationResults) {
if u.IssuerAccount != "" && !nkeys.IsValidPublicAccountKey(u.IssuerAccount) {
vr.AddError("account_id is not an account public key")
// ExpectedPrefixes defines the types that can encode a user JWT, account
func (u *UserClaims) ExpectedPrefixes() []nkeys.PrefixByte {
return []nkeys.PrefixByte{nkeys.PrefixByteAccount}
// Claims returns the generic data from a user jwt
func (u *UserClaims) Claims() *ClaimsData {
return &u.ClaimsData
// Payload returns the user specific data from a user JWT
func (u *UserClaims) Payload() interface{} {
return &u.User
func (u *UserClaims) String() string {
return u.ClaimsData.String(u)
// IsBearerToken returns true if nonce-signing requirements should be skipped
func (u *UserClaims) IsBearerToken() bool {
return u.BearerToken
@ -3,7 +3,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats?ref=badge_shield)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://godoc.org/github.com/nats-io/nats.go?status.svg)](http://godoc.org/github.com/nats-io/nats.go) [![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.com/nats-io/nats.go.svg?branch=master)](http://travis-ci.com/nats-io/nats.go) [![GoDoc](https://img.shields.io/badge/GoDoc-reference-007d9c)](https://pkg.go.dev/github.com/nats-io/nats.go)
[![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master)
## Installation
@ -284,6 +285,21 @@ nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
nats.ReconnectHandler(func(_ *nats.Conn) {
// Note that this will be invoked for the first asynchronous connect.
if err != nil {
// Should not return an error even if it can't connect, but you still
// need to check in case there are some configuration errors.
// Flush connection to server, returns when all messages have been processed.
fmt.Println("All clear!")
@ -11,9 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.7
// A Go client for the NATS messaging system (https://nats.io).
package nats
import (
@ -21,9 +18,33 @@ import (
// RequestMsgWithContext takes a context, a subject and payload
// in bytes and request expecting a single response.
func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) {
var hdr []byte
var err error
if len(msg.Header) > 0 {
if !nc.info.Headers {
return nil, ErrHeadersNotSupported
hdr, err = msg.headerBytes()
if err != nil {
return nil, err
return nc.requestWithContext(ctx, msg.Subject, hdr, msg.Data)
// RequestWithContext takes a context, a subject and payload
// in bytes and request expecting a single response.
func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
return nc.requestWithContext(ctx, subj, nil, data)
func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
if ctx == nil {
return nil, ErrInvalidContext
@ -36,49 +57,52 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte
return nil, ctx.Err()
var m *Msg
var err error
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
return nc.oldRequestWithContext(ctx, subj, data)
mch, token, err := nc.createNewRequestAndSend(subj, data)
if err != nil {
return nil, err
var ok bool
var msg *Msg
select {
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
if nc.useOldRequestStyle() {
m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
} else {
mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
if err != nil {
return nil, err
case <-ctx.Done():
delete(nc.respMap, token)
return nil, ctx.Err()
return msg, nil
var ok bool
select {
case m, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
case <-ctx.Done():
delete(nc.respMap, token)
return nil, ctx.Err()
// Check for no responder status.
if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
m, err = nil, ErrNoResponders
return m, err
// oldRequestWithContext utilizes inbox and subscription per request.
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
if err != nil {
return nil, err
defer s.Unsubscribe()
err = nc.PublishRequest(subj, inbox, data)
err = nc.publish(subj, inbox, hdr, data)
if err != nil {
return nil, err
@ -0,0 +1,13 @@
return c.Conn.publish(subject, _EMPTY_, b)
return c.Conn.publish(subject, _EMPTY_, nil, b)
// PublishRequest will perform a Publish() expecting a response on the
@ -104,7 +104,7 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
if err != nil {
return err
return c.Conn.publish(subject, reply, b)
return c.Conn.publish(subject, reply, nil, b)
// Request will create an Inbox and perform a Request() call
@ -130,7 +130,7 @@ func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, t
// Handler is a specific callback used for Subscribe. It is generalized to
// an interface{}, but we will discover its format and arguments at runtime
// and perform the correct callback, including de-marshaling JSON strings
// and perform the correct callback, including de-marshaling encoded data
// back into the appropriate struct based on the signature of the Handler.
// Handlers are expected to have one of four signatures.
@ -234,7 +234,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
return c.Conn.subscribe(subject, queue, natsCB, nil, false)
return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil)
// FlushTimeout allows a Flush operation to have an associated timeout.
@ -1,7 +1,11 @@
module github.com/nats-io/nats.go
go 1.15
require (
github.com/nats-io/jwt v0.3.2
github.com/nats-io/nkeys v0.1.4
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
@ -1,15 +1,69 @@
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM=
github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M=
github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc=
github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY=
github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1:rQnBf2Rv4P9adtAs/Ti6LfFmVtFG6HLhl/H7cVshcJU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4=
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4 h1:GStuc0W1rK45FSlpt3+7UTLzmRys2/6WSDuJFyzZ6Xg=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8 h1:jPZZofsCevE2oJl3YexVw3drWOFdo8H4AWMb/1WcVoc=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnCKg9luW1g7hgzPxUjHFRI40EuTSX7RCzgnc74Jk=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,700 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package nats
import (
// JetStreamManager is the public interface for managing JetStream streams & consumers.
type JetStreamManager interface {
// AddStream creates a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// UpdateStream updates a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
// DeleteStream deletes a stream.
DeleteStream(name string) error
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)
// Purge stream messages.
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*RawStreamMsg, error)
// DeleteMsg erases a message from a stream.
DeleteMsg(name string, seq uint64) error
// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error
// ConsumerInfo retrieves consumer information.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
MaxAge time.Duration `json:"max_age"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
// Placement is used to guide placement of streams in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster"`
Tags []string `json:"tags,omitempty"`
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
// apiError is included in all API responses if there was an error.
type apiError struct {
Code int `json:"code"`
Description string `json:"description,omitempty"`
// apiResponse is a standard response from the JetStream JSON API
type apiResponse struct {
Type string `json:"type"`
Error *apiError `json:"error,omitempty"`
// apiPaged includes variables used to create paged responses from the JSON API
type apiPaged struct {
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
// apiPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with apiPaged.
type apiPagedRequest struct {
Offset int `json:"offset"`
// AccountInfo contains info about the JetStream usage from the current account.
type AccountInfo struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
API APIStats `json:"api"`
Limits AccountLimits `json:"limits"`
// APIStats reports on API calls to JetStream for this account.
type APIStats struct {
Total uint64 `json:"total"`
Errors uint64 `json:"errors"`
// AccountLimits includes the JetStream limits of the current account.
type AccountLimits struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
MaxStreams int `json:"max_streams"`
MaxConsumers int `json:"max_consumers"`
type accountInfoResponse struct {
// AccountInfo retrieves info about the JetStream usage from the current account.
func (js *js) AccountInfo() (*AccountInfo, error) {
resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait)
if err != nil {
return nil, err
var info accountInfoResponse
if err := json.Unmarshal(resp.Data, &info); err != nil {
return nil, err
if info.Error != nil {
var err error
if strings.Contains(info.Error.Description, "not enabled for") {
err = ErrJetStreamNotEnabled
} else {
err = errors.New(info.Error.Description)
return nil, err
return &info.AccountInfo, nil
type createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
type consumerResponse struct {
// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) {
if stream == _EMPTY_ {
return nil, ErrStreamNameRequired
req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
if err != nil {
return nil, err
var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if strings.Contains(cfg.Durable, ".") {
return nil, ErrInvalidDurableName
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
return nil, err
var info consumerResponse
err = json.Unmarshal(resp.Data, &info)
if err != nil {
return nil, err
if info.Error != nil {
return nil, errors.New(info.Error.Description)
return info.ConsumerInfo, nil
// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
Success bool `json:"success,omitempty"`
// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, consumer string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
var resp consumerDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
if resp.Error != nil {
return errors.New(resp.Error.Description)
return nil
// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
stream string
js *js
err error
offset int
page []*ConsumerInfo
pageInfo *apiPaged
// consumersRequest is the type used for Consumers requests.
type consumersRequest struct {
// consumerListResponse is the response for a Consumers List request.
type consumerListResponse struct {
Consumers []*ConsumerInfo `json:"consumers"`
// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
if c.err != nil {
return false
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
return false
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
return false
req, err := json.Marshal(consumersRequest{
apiPagedRequest: apiPagedRequest{Offset: c.offset},
if err != nil {
c.err = err
return false
clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
r, err := c.js.nc.Request(clSubj, req, c.js.wait)
if err != nil {
c.err = err
return false
var resp consumerListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
c.err = err
return false
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
return false
c.pageInfo = &resp.apiPaged
c.page = resp.Consumers
c.offset += len(c.page)
return true
// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
return c.page
// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
return c.err
// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
// streamCreateResponse stream creation.
type streamCreateResponse struct {
func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
req, err := json.Marshal(cfg)
if err != nil {
return nil, err
csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
r, err := js.nc.Request(csSubj, req, js.wait)
if err != nil {
return nil, err
var resp streamCreateResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
return resp.StreamInfo, nil
type streamInfoResponse = streamCreateResponse
func (js *js) StreamInfo(stream string) (*StreamInfo, error) {
csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.nc.Request(csSubj, nil, js.wait)
if err != nil {
return nil, err
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
return resp.StreamInfo, nil
// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time.Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
req, err := json.Marshal(cfg)
if err != nil {
return nil, err
usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
if err != nil {
return nil, err
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
return resp.StreamInfo, nil
// streamDeleteResponse is the response for a Stream delete request.
type streamDeleteResponse struct {
Success bool `json:"success,omitempty"`
// DeleteStream deletes a Stream.
func (js *js) DeleteStream(name string) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
if err != nil {
return err
var resp streamDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
if resp.Error != nil {
return errors.New(resp.Error.Description)
return nil
type apiMsgGetRequest struct {
Seq uint64 `json:"seq"`
// RawStreamMsg is a raw message stored in JetStream.
type RawStreamMsg struct {
Subject string
Sequence uint64
Header http.Header
Data []byte
Time time.Time
// storedMsg is a raw message stored in JetStream.
type storedMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time.Time `json:"time"`
// apiMsgGetResponse is the response for a Stream get request.
type apiMsgGetResponse struct {
Message *storedMsg `json:"message,omitempty"`
Success bool `json:"success,omitempty"`
// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) {
if name == _EMPTY_ {
return nil, ErrStreamNameRequired
req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
if err != nil {
return nil, err
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
if err != nil {
return nil, err
var resp apiMsgGetResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
msg := resp.Message
var hdr http.Header
if msg.Header != nil {
hdr, err = decodeHeadersMsg(msg.Header)
if err != nil {
return nil, err
return &RawStreamMsg{
Subject: msg.Subject,
Sequence: msg.Sequence,
Header: hdr,
Data: msg.Data,
Time: msg.Time,
}, nil
type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
// msgDeleteResponse is the response for a Stream delete request.
type msgDeleteResponse struct {
Success bool `json:"success,omitempty"`
// DeleteMsg deletes a message from a stream.
func (js *js) DeleteMsg(name string, seq uint64) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
if err != nil {
return err
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
if err != nil {
return err
var resp msgDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
if resp.Error != nil {
return errors.New(resp.Error.Description)
return nil
type streamPurgeResponse struct {
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(name string) error {
psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, js.wait)
if err != nil {
return err
var resp streamPurgeResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
if resp.Error != nil {
return errors.New(resp.Error.Description)
return nil
// StreamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
js *js
page []*StreamInfo
err error
offset int
pageInfo *apiPaged
// streamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type streamListResponse struct {
Streams []*StreamInfo `json:"streams"`
// streamNamesRequest is used for Stream Name requests.
type streamNamesRequest struct {
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
if s.err != nil {
return false
if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
return false
req, err := json.Marshal(streamNamesRequest{
apiPagedRequest: apiPagedRequest{Offset: s.offset},
if err != nil {
s.err = err
return false
slSubj := s.js.apiSubj(apiStreamList)
r, err := s.js.nc.Request(slSubj, req, s.js.wait)
if err != nil {
s.err = err
return false
var resp streamListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
s.err = err
return false
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
return false
s.pageInfo = &resp.apiPaged
s.page = resp.Streams
s.offset += len(s.page)
return true
// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
return s.page
// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
return s.err
// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
File diff suppressed because it is too large
Load Diff
@ -107,5 +107,5 @@ func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (
return c.Conn.subscribe(subject, queue, cb, nil, false)
return c.Conn.subscribe(subject, queue, cb, nil, false, nil)
@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -21,6 +21,7 @@ type msgArg struct {
subject []byte
reply []byte
sid int64
hdr int
size int
@ -30,6 +31,7 @@ type parseState struct {
state int
as int
drop int
hdr int
ma msgArg
argBuf []byte
msgBuf []byte
@ -54,6 +56,7 @@ const (
@ -83,6 +86,12 @@ func (nc *Conn) parse(buf []byte) error {
switch b {
case 'M', 'm':
nc.ps.state = OP_M
nc.ps.hdr = -1
nc.ps.ma.hdr = -1
case 'H', 'h':
nc.ps.state = OP_H
nc.ps.hdr = 0
nc.ps.ma.hdr = 0
case 'P', 'p':
nc.ps.state = OP_P
case '+':
@ -94,6 +103,13 @@ func (nc *Conn) parse(buf []byte) error {
goto parseErr
case OP_H:
switch b {
case 'M', 'm':
nc.ps.state = OP_M
goto parseErr
case OP_M:
switch b {
case 'S', 's':
@ -140,8 +156,7 @@ func (nc *Conn) parse(buf []byte) error {
nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
// jump ahead with the index. If this overruns
// what is left we fall out and process split
// buffer.
// what is left we fall out and process a split buffer.
i = nc.ps.as + nc.ps.ma.size - 1
if nc.ps.argBuf != nil {
@ -415,6 +430,11 @@ func (nc *Conn) cloneMsgArg() {
const argsLenMax = 4
func (nc *Conn) processMsgArgs(arg []byte) error {
// Use separate function for header based messages.
if nc.ps.hdr >= 0 {
return nc.processHeaderMsgArgs(arg)
// Unroll splitArgs to avoid runtime/heap issues
a := [argsLenMax][]byte{}
args := a[:0]
@ -459,6 +479,57 @@ func (nc *Conn) processMsgArgs(arg []byte) error {
return nil
// processHeaderMsgArgs is for a header based message.
func (nc *Conn) processHeaderMsgArgs(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [argsLenMax][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t', '\r', '\n':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
if start < 0 {
start = i
if start >= 0 {
args = append(args, arg[start:])
switch len(args) {
case 4:
nc.ps.ma.subject = args[0]
nc.ps.ma.sid = parseInt64(args[1])
nc.ps.ma.reply = nil
nc.ps.ma.hdr = int(parseInt64(args[2]))
nc.ps.ma.size = int(parseInt64(args[3]))
case 5:
nc.ps.ma.subject = args[0]
nc.ps.ma.sid = parseInt64(args[1])
nc.ps.ma.reply = args[2]
nc.ps.ma.hdr = int(parseInt64(args[3]))
nc.ps.ma.size = int(parseInt64(args[4]))
return fmt.Errorf("nats: processHeaderMsgArgs Parse Error: '%s'", arg)
if nc.ps.ma.sid < 0 {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Sid: '%s'", arg)
if nc.ps.ma.hdr < 0 || nc.ps.ma.hdr > nc.ps.ma.size {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
if nc.ps.ma.size < 0 {
return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Size: '%s'", arg)
return nil
// Ascii numbers 0-9
const (
ascii_0 = 48
@ -1,8 +1,12 @@
language: go
sudo: false
- amd64
- ppc64le
- 1.12.x
- 1.11.x
- 1.16.x
- 1.15.x
- go get -t ./...
@ -28,4 +32,4 @@ script:
# script: curl -sL http://git.io/goreleaser | bash
# on:
# tags: true
# condition: $TRAVIS_OS_NAME = linux
# condition: $TRAVIS_OS_NAME = linux
@ -2,5 +2,7 @@
Maintainership is on a per project basis.
### Core-maintainers
- Derek Collison <derek@nats.io> [@derekcollison](https://github.com/derekcollison)
### Maintainers
- Derek Collison <derek@nats.io> [@derekcollison](https://github.com/derekcollison)
- Ivan Kozlovic <ivan@nats.io> [@kozlovic](https://github.com/kozlovic)
- Waldemar Quevedo <wally@nats.io> [@wallyqs](https://github.com/wallyqs)
@ -2,10 +2,9 @@
[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![Build Status](https://travis-ci.org/nats-io/nkeys.svg?branch=master)](http://travis-ci.org/nats-io/nkeys)
[![Build Status](https://travis-ci.com/nats-io/nkeys.svg?branch=master)](http://travis-ci.com/nats-io/nkeys)
[![Coverage Status](https://coveralls.io/repos/github/nats-io/nkeys/badge.svg?branch=master&service=github)](https://coveralls.io/github/nats-io/nkeys?branch=master)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fnkeys.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fnkeys?ref=badge_shield)
A public-key signature system based on [Ed25519](https://ed25519.cr.yp.to/) for the NATS ecosystem.
@ -68,5 +67,3 @@ user2, _ := nkeys.FromRawSeed(PrefixByteUser, rawSeed)
Unless otherwise noted, the NATS source files are distributed
under the Apache Version 2.0 license found in the LICENSE file.
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fnkeys.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fnkeys?ref=badge_large)
@ -0,0 +1,78 @@
package nkeys
import (
var userConfigRE = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}\r?\n))`)
// ParseDecoratedJWT takes a creds file and returns the JWT portion.
func ParseDecoratedJWT(contents []byte) (string, error) {
items := userConfigRE.FindAllSubmatch(contents, -1)
if len(items) == 0 {
return string(contents), nil
// First result should be the user JWT.
// We copy here so that if the file contained a seed file too we wipe appropriately.
raw := items[0][1]
tmp := make([]byte, len(raw))
copy(tmp, raw)
return string(tmp), nil
// ParseDecoratedNKey takes a creds file, finds the NKey portion and creates a
// key pair from it.
func ParseDecoratedNKey(contents []byte) (KeyPair, error) {
var seed []byte
items := userConfigRE.FindAllSubmatch(contents, -1)
if len(items) > 1 {
seed = items[1][1]
} else {
lines := bytes.Split(contents, []byte("\n"))
for _, line := range lines {
if bytes.HasPrefix(bytes.TrimSpace(line), []byte("SO")) ||
bytes.HasPrefix(bytes.TrimSpace(line), []byte("SA")) ||
bytes.HasPrefix(bytes.TrimSpace(line), []byte("SU")) {
seed = line
if seed == nil {
return nil, errors.New("no nkey seed found")
if !bytes.HasPrefix(seed, []byte("SO")) &&
!bytes.HasPrefix(seed, []byte("SA")) &&
!bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain a seed nkey")
kp, err := FromSeed(seed)
if err != nil {
return nil, err
return kp, nil
// ParseDecoratedUserNKey takes a creds file, finds the NKey portion and creates a
// key pair from it. Similar to ParseDecoratedNKey but fails for non-user keys.
func ParseDecoratedUserNKey(contents []byte) (KeyPair, error) {
nk, err := ParseDecoratedNKey(contents)
if err != nil {
return nil, err
seed, err := nk.Seed()
if err != nil {
return nil, err
if !bytes.HasPrefix(seed, []byte("SU")) {
return nil, errors.New("doesn't contain an user seed nkey")
kp, err := FromSeed(seed)
if err != nil {
return nil, err
return kp, nil
@ -1,3 +1,5 @@
module github.com/nats-io/nkeys
require golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
go 1.16
require golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
@ -1,7 +1,7 @@
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -20,7 +20,7 @@ import (
// Version is our current version
const Version = "0.1.4"
const Version = "0.3.0"
// Errors
var (
@ -5,6 +5,7 @@
// In Go 1.13, the ed25519 package was promoted to the standard library as
// crypto/ed25519, and this package became a wrapper for the standard library one.
//go:build !go1.13
// +build !go1.13
// Package ed25519 implements the Ed25519 signature algorithm. See
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.13
// +build go1.13
// Package ed25519 implements the Ed25519 signature algorithm. See
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.7
// +build go1.7
package context
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.9
// +build go1.9
package context
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.7
// +build !go1.7
package context
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.9
// +build !go1.9
package context
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.11
// +build go1.11
package http2
@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.11
// +build !go1.11
package http2
@ -1293,7 +1293,9 @@ func (sc *serverConn) startGracefulShutdown() {
sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
// After sending GOAWAY, the connection will close after goAwayTimeout.
// After sending GOAWAY with an error code (non-graceful shutdown), the
// connection will close after goAwayTimeout.
// If we close the connection immediately after sending GOAWAY, there may
// be unsent data in our kernel receive buffer, which will cause the kernel
// to send a TCP RST on close() instead of a FIN. This RST will abort the
@ -1629,23 +1631,37 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *serverConn) processData(f *DataFrame) error {
if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
id := f.Header().StreamID
if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || id > sc.maxClientStreamID) {
// Discard all DATA frames if the GOAWAY is due to an
// error, or:
// Section 6.8: After sending a GOAWAY frame, the sender
// can discard frames for streams initiated by the
// receiver with identifiers higher than the identified
// last stream.
return nil
data := f.Data()
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID
data := f.Data()
state, st := sc.state(id)
if id == 0 || state == stateIdle {
// Section 6.1: "DATA frames MUST be associated with a
// stream. If a DATA frame is received whose stream
// identifier field is 0x0, the recipient MUST respond
// with a connection error (Section 5.4.1) of type
// Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
return ConnectionError(ErrCodeProtocol)
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
@ -4,6 +4,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.10
// +build go1.10
// Package idna implements IDNA2008 using the compatibility processing
@ -4,6 +4,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.10
// +build !go1.10
// Package idna implements IDNA2008 using the compatibility processing
@ -1,5 +1,6 @@
// Code generated by running "go generate" in golang.org/x/text. DO NOT EDIT.
//go:build go1.10 && !go1.13
// +build go1.10,!go1.13
package idna
@ -1,5 +1,6 @@
// Code generated by running "go generate" in golang.org/x/text. DO NOT EDIT.
//go:build go1.13 && !go1.14
// +build go1.13,!go1.14
package idna
@ -1,5 +1,6 @@
// Code generated by running "go generate" in golang.org/x/text. DO NOT EDIT.
//go:build go1.14 && !go1.16
// +build go1.14,!go1.16
package idna
@ -1,5 +1,6 @@
// Code generated by running "go generate" in golang.org/x/text. DO NOT EDIT.
//go:build go1.16
// +build go1.16
package idna
@ -1,5 +1,6 @@
// Code generated by running "go generate" in golang.org/x/text. DO NOT EDIT.
//go:build !go1.10
// +build !go1.10
package idna
@ -112,7 +112,7 @@ github.com/jmespath/go-jmespath
# github.com/klauspost/compress v1.11.0
# github.com/klauspost/compress v1.11.12
@ -125,16 +125,14 @@ github.com/matttproud/golang_protobuf_extensions/pbutil
# github.com/mmcloughlin/geohash v0.10.0
## explicit
# github.com/nats-io/jwt v1.1.0
# github.com/nats-io/nats-server/v2 v2.1.9
# github.com/nats-io/nats-server/v2 v2.2.0
## explicit
# github.com/nats-io/nats.go v1.10.0
# github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac
## explicit
# github.com/nats-io/nkeys v0.1.4
# github.com/nats-io/nkeys v0.3.0
# github.com/nats-io/nuid v1.0.1
@ -242,7 +240,7 @@ go.opencensus.io/trace
# golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
# golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
@ -253,7 +251,7 @@ golang.org/x/lint/golint
# golang.org/x/mod v0.2.0
# golang.org/x/net v0.0.0-20210119194325-5f4716e94777
# golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
## explicit
Reference in New Issue