mirror of https://github.com/tidwall/redcon.git
Compare commits
5 Commits
6952cec09f
...
2f2825976d
Author | SHA1 | Date |
---|---|---|
Fusl | 2f2825976d | |
Josh Baker | 9f71787fcd | |
Josh Baker | d05da895e5 | |
Tom Arrell | c746dbc05b | |
tidwall | 8b671291b8 |
|
@ -18,6 +18,8 @@ Features
|
||||||
- Compatible pub/sub support
|
- Compatible pub/sub support
|
||||||
- Multithreaded
|
- Multithreaded
|
||||||
|
|
||||||
|
*This library is also avaliable for [Rust](https://github.com/tidwall/redcon.rs) and [C](https://github.com/tidwall/redcon.c).*
|
||||||
|
|
||||||
Installing
|
Installing
|
||||||
----------
|
----------
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -1,6 +1,6 @@
|
||||||
module github.com/tidwall/redcon
|
module github.com/tidwall/redcon
|
||||||
|
|
||||||
go 1.15
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/tidwall/btree v1.1.0
|
github.com/tidwall/btree v1.1.0
|
||||||
|
|
109
redcon.go
109
redcon.go
|
@ -3,6 +3,7 @@ package redcon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -235,12 +236,14 @@ func Serve(ln net.Listener,
|
||||||
closed func(conn Conn, err error),
|
closed func(conn Conn, err error),
|
||||||
) error {
|
) error {
|
||||||
s := newServer()
|
s := newServer()
|
||||||
|
s.mu.Lock()
|
||||||
s.net = ln.Addr().Network()
|
s.net = ln.Addr().Network()
|
||||||
s.laddr = ln.Addr().String()
|
s.laddr = ln.Addr().String()
|
||||||
s.ln = ln
|
s.ln = ln
|
||||||
s.handler = handler
|
s.handler = handler
|
||||||
s.accept = accept
|
s.accept = accept
|
||||||
s.closed = closed
|
s.closed = closed
|
||||||
|
s.mu.Unlock()
|
||||||
return serve(s)
|
return serve(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,7 +299,9 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.mu.Lock()
|
||||||
s.ln = ln
|
s.ln = ln
|
||||||
|
s.mu.Unlock()
|
||||||
if signal != nil {
|
if signal != nil {
|
||||||
signal <- nil
|
signal <- nil
|
||||||
}
|
}
|
||||||
|
@ -305,9 +310,11 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
|
||||||
|
|
||||||
// Serve serves incoming connections with the given net.Listener.
|
// Serve serves incoming connections with the given net.Listener.
|
||||||
func (s *Server) Serve(ln net.Listener) error {
|
func (s *Server) Serve(ln net.Listener) error {
|
||||||
|
s.mu.Lock()
|
||||||
s.ln = ln
|
s.ln = ln
|
||||||
s.net = ln.Addr().Network()
|
s.net = ln.Addr().Network()
|
||||||
s.laddr = ln.Addr().String()
|
s.laddr = ln.Addr().String()
|
||||||
|
s.mu.Unlock()
|
||||||
return serve(s)
|
return serve(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,7 +328,9 @@ func (s *TLSServer) ListenServeAndSignal(signal chan error) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.mu.Lock()
|
||||||
s.ln = ln
|
s.ln = ln
|
||||||
|
s.mu.Unlock()
|
||||||
if signal != nil {
|
if signal != nil {
|
||||||
signal <- nil
|
signal <- nil
|
||||||
}
|
}
|
||||||
|
@ -335,7 +344,7 @@ func serve(s *Server) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
for c := range s.conns {
|
for c := range s.conns {
|
||||||
c.Close()
|
c.conn.Close()
|
||||||
}
|
}
|
||||||
s.conns = nil
|
s.conns = nil
|
||||||
}()
|
}()
|
||||||
|
@ -709,6 +718,7 @@ func (w *Writer) WriteRaw(data []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteAny writes any type to client.
|
// WriteAny writes any type to client.
|
||||||
|
//
|
||||||
// nil -> null
|
// nil -> null
|
||||||
// error -> error (adds "ERR " when first word is not uppercase)
|
// error -> error (adds "ERR " when first word is not uppercase)
|
||||||
// string -> bulk-string
|
// string -> bulk-string
|
||||||
|
@ -767,8 +777,91 @@ func parseInt(b []byte) (int, bool) {
|
||||||
return n, true
|
return n, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var cmdPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return &ReusableSlice[Command]{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReusableSlice[T any] struct {
|
||||||
|
buf []T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ReusableSlice[T]) Append(cmd T) {
|
||||||
|
m, ok := c.tryGrowByReslice(1)
|
||||||
|
if !ok {
|
||||||
|
m = c.grow(1)
|
||||||
|
}
|
||||||
|
c.buf[m] = cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
const smallBufferSize = 64
|
||||||
|
const maxInt = int(^uint(0) >> 1)
|
||||||
|
|
||||||
|
func (c *ReusableSlice[T]) tryGrowByReslice(n int) (int, bool) {
|
||||||
|
if l := len(c.buf); n <= cap(c.buf)-l {
|
||||||
|
c.buf = c.buf[:l+n]
|
||||||
|
return l, true
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ReusableSlice[T]) grow(n int) int {
|
||||||
|
m := c.Len()
|
||||||
|
// If buffer is empty, reset to recover space.
|
||||||
|
if m == 0 {
|
||||||
|
c.Reset()
|
||||||
|
}
|
||||||
|
// Try to grow by means of a reslice.
|
||||||
|
if i, ok := c.tryGrowByReslice(n); ok {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
if c.buf == nil && n <= smallBufferSize {
|
||||||
|
c.buf = make([]T, n, smallBufferSize)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
cs := cap(c.buf)
|
||||||
|
if cs > maxInt-cs-n {
|
||||||
|
panic(bytes.ErrTooLarge)
|
||||||
|
} else {
|
||||||
|
// Add c.off to account for c.buf[:c.off] being sliced off the front.
|
||||||
|
c.buf = growSlice(c.buf, n)
|
||||||
|
}
|
||||||
|
// Restore len(c.buf).
|
||||||
|
c.buf = c.buf[:m+n]
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func growSlice[T any](b []T, n int) []T {
|
||||||
|
defer func() {
|
||||||
|
if recover() != nil {
|
||||||
|
panic(bytes.ErrTooLarge)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
c := len(b) + n // ensure enough space for n elements
|
||||||
|
if c < 2*cap(b) {
|
||||||
|
// The growth rate has historically always been 2x. In the future,
|
||||||
|
// we could rely purely on append to determine the growth rate.
|
||||||
|
c = 2 * cap(b)
|
||||||
|
}
|
||||||
|
b2 := append(b, make([]T, c)...)
|
||||||
|
return b2[:len(b)]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ReusableSlice[T]) Len() int {
|
||||||
|
return len(c.buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ReusableSlice[T]) Reset() {
|
||||||
|
c.buf = c.buf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
||||||
var cmds []Command
|
cmds := cmdPool.Get().(*ReusableSlice[Command])
|
||||||
|
cmds.Reset()
|
||||||
|
defer cmdPool.Put(cmds)
|
||||||
|
|
||||||
b := rd.buf[rd.start:rd.end]
|
b := rd.buf[rd.start:rd.end]
|
||||||
if rd.end-rd.start == 0 && len(rd.buf) > 4096 {
|
if rd.end-rd.start == 0 && len(rd.buf) > 4096 {
|
||||||
rd.buf = rd.buf[:4096]
|
rd.buf = rd.buf[:4096]
|
||||||
|
@ -860,7 +953,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
||||||
cmd.Args[i] = append([]byte(nil), cmd.Args[i]...)
|
cmd.Args[i] = append([]byte(nil), cmd.Args[i]...)
|
||||||
}
|
}
|
||||||
cmd.Raw = wr.b
|
cmd.Raw = wr.b
|
||||||
cmds = append(cmds, cmd)
|
cmds.Append(cmd)
|
||||||
}
|
}
|
||||||
b = b[i+1:]
|
b = b[i+1:]
|
||||||
if len(b) > 0 {
|
if len(b) > 0 {
|
||||||
|
@ -934,7 +1027,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
||||||
for h := 0; h < len(marks); h += 2 {
|
for h := 0; h < len(marks); h += 2 {
|
||||||
cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]]
|
cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]]
|
||||||
}
|
}
|
||||||
cmds = append(cmds, cmd)
|
cmds.Append(cmd)
|
||||||
b = b[i+1:]
|
b = b[i+1:]
|
||||||
if len(b) > 0 {
|
if len(b) > 0 {
|
||||||
goto next
|
goto next
|
||||||
|
@ -951,8 +1044,10 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
||||||
if leftover != nil {
|
if leftover != nil {
|
||||||
*leftover = rd.end - rd.start
|
*leftover = rd.end - rd.start
|
||||||
}
|
}
|
||||||
if len(cmds) > 0 {
|
if cmds.Len() > 0 {
|
||||||
return cmds, nil
|
rcmds := make([]Command, cmds.Len())
|
||||||
|
copy(rcmds, cmds.buf)
|
||||||
|
return rcmds, nil
|
||||||
}
|
}
|
||||||
if rd.rd == nil {
|
if rd.rd == nil {
|
||||||
return nil, errIncompleteCommand
|
return nil, errIncompleteCommand
|
||||||
|
@ -987,7 +1082,7 @@ func (rd *Reader) ReadCommands() ([]Command, error) {
|
||||||
}
|
}
|
||||||
cmds, err := rd.readCommands(nil)
|
cmds, err := rd.readCommands(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []Command{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rd.cmds = cmds
|
rd.cmds = cmds
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue