mirror of https://github.com/tidwall/redcon.git
Compare commits
5 Commits
6952cec09f
...
2f2825976d
Author | SHA1 | Date |
---|---|---|
Fusl | 2f2825976d | |
Josh Baker | 9f71787fcd | |
Josh Baker | d05da895e5 | |
Tom Arrell | c746dbc05b | |
tidwall | 8b671291b8 |
|
@ -18,6 +18,8 @@ Features
|
|||
- Compatible pub/sub support
|
||||
- Multithreaded
|
||||
|
||||
*This library is also avaliable for [Rust](https://github.com/tidwall/redcon.rs) and [C](https://github.com/tidwall/redcon.c).*
|
||||
|
||||
Installing
|
||||
----------
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -1,6 +1,6 @@
|
|||
module github.com/tidwall/redcon
|
||||
|
||||
go 1.15
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/tidwall/btree v1.1.0
|
||||
|
|
137
redcon.go
137
redcon.go
|
@ -3,6 +3,7 @@ package redcon
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -235,12 +236,14 @@ func Serve(ln net.Listener,
|
|||
closed func(conn Conn, err error),
|
||||
) error {
|
||||
s := newServer()
|
||||
s.mu.Lock()
|
||||
s.net = ln.Addr().Network()
|
||||
s.laddr = ln.Addr().String()
|
||||
s.ln = ln
|
||||
s.handler = handler
|
||||
s.accept = accept
|
||||
s.closed = closed
|
||||
s.mu.Unlock()
|
||||
return serve(s)
|
||||
}
|
||||
|
||||
|
@ -296,7 +299,9 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.ln = ln
|
||||
s.mu.Unlock()
|
||||
if signal != nil {
|
||||
signal <- nil
|
||||
}
|
||||
|
@ -305,9 +310,11 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
|
|||
|
||||
// Serve serves incoming connections with the given net.Listener.
|
||||
func (s *Server) Serve(ln net.Listener) error {
|
||||
s.mu.Lock()
|
||||
s.ln = ln
|
||||
s.net = ln.Addr().Network()
|
||||
s.laddr = ln.Addr().String()
|
||||
s.mu.Unlock()
|
||||
return serve(s)
|
||||
}
|
||||
|
||||
|
@ -321,7 +328,9 @@ func (s *TLSServer) ListenServeAndSignal(signal chan error) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.ln = ln
|
||||
s.mu.Unlock()
|
||||
if signal != nil {
|
||||
signal <- nil
|
||||
}
|
||||
|
@ -335,7 +344,7 @@ func serve(s *Server) error {
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for c := range s.conns {
|
||||
c.Close()
|
||||
c.conn.Close()
|
||||
}
|
||||
s.conns = nil
|
||||
}()
|
||||
|
@ -602,9 +611,9 @@ func (w *Writer) WriteNull() {
|
|||
// sub-responses to the client to complete the response.
|
||||
// For example to write two strings:
|
||||
//
|
||||
// c.WriteArray(2)
|
||||
// c.WriteBulkString("item 1")
|
||||
// c.WriteBulkString("item 2")
|
||||
// c.WriteArray(2)
|
||||
// c.WriteBulkString("item 1")
|
||||
// c.WriteBulkString("item 2")
|
||||
func (w *Writer) WriteArray(count int) {
|
||||
if w.err != nil {
|
||||
return
|
||||
|
@ -709,17 +718,18 @@ func (w *Writer) WriteRaw(data []byte) {
|
|||
}
|
||||
|
||||
// WriteAny writes any type to client.
|
||||
// nil -> null
|
||||
// error -> error (adds "ERR " when first word is not uppercase)
|
||||
// string -> bulk-string
|
||||
// numbers -> bulk-string
|
||||
// []byte -> bulk-string
|
||||
// bool -> bulk-string ("0" or "1")
|
||||
// slice -> array
|
||||
// map -> array with key/value pairs
|
||||
// SimpleString -> string
|
||||
// SimpleInt -> integer
|
||||
// everything-else -> bulk-string representation using fmt.Sprint()
|
||||
//
|
||||
// nil -> null
|
||||
// error -> error (adds "ERR " when first word is not uppercase)
|
||||
// string -> bulk-string
|
||||
// numbers -> bulk-string
|
||||
// []byte -> bulk-string
|
||||
// bool -> bulk-string ("0" or "1")
|
||||
// slice -> array
|
||||
// map -> array with key/value pairs
|
||||
// SimpleString -> string
|
||||
// SimpleInt -> integer
|
||||
// everything-else -> bulk-string representation using fmt.Sprint()
|
||||
func (w *Writer) WriteAny(v interface{}) {
|
||||
if w.err != nil {
|
||||
return
|
||||
|
@ -767,8 +777,91 @@ func parseInt(b []byte) (int, bool) {
|
|||
return n, true
|
||||
}
|
||||
|
||||
var cmdPool = sync.Pool{
|
||||
New: func() any {
|
||||
return &ReusableSlice[Command]{}
|
||||
},
|
||||
}
|
||||
|
||||
type ReusableSlice[T any] struct {
|
||||
buf []T
|
||||
}
|
||||
|
||||
func (c *ReusableSlice[T]) Append(cmd T) {
|
||||
m, ok := c.tryGrowByReslice(1)
|
||||
if !ok {
|
||||
m = c.grow(1)
|
||||
}
|
||||
c.buf[m] = cmd
|
||||
}
|
||||
|
||||
const smallBufferSize = 64
|
||||
const maxInt = int(^uint(0) >> 1)
|
||||
|
||||
func (c *ReusableSlice[T]) tryGrowByReslice(n int) (int, bool) {
|
||||
if l := len(c.buf); n <= cap(c.buf)-l {
|
||||
c.buf = c.buf[:l+n]
|
||||
return l, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (c *ReusableSlice[T]) grow(n int) int {
|
||||
m := c.Len()
|
||||
// If buffer is empty, reset to recover space.
|
||||
if m == 0 {
|
||||
c.Reset()
|
||||
}
|
||||
// Try to grow by means of a reslice.
|
||||
if i, ok := c.tryGrowByReslice(n); ok {
|
||||
return i
|
||||
}
|
||||
if c.buf == nil && n <= smallBufferSize {
|
||||
c.buf = make([]T, n, smallBufferSize)
|
||||
return 0
|
||||
}
|
||||
cs := cap(c.buf)
|
||||
if cs > maxInt-cs-n {
|
||||
panic(bytes.ErrTooLarge)
|
||||
} else {
|
||||
// Add c.off to account for c.buf[:c.off] being sliced off the front.
|
||||
c.buf = growSlice(c.buf, n)
|
||||
}
|
||||
// Restore len(c.buf).
|
||||
c.buf = c.buf[:m+n]
|
||||
return m
|
||||
}
|
||||
|
||||
func growSlice[T any](b []T, n int) []T {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
panic(bytes.ErrTooLarge)
|
||||
}
|
||||
}()
|
||||
|
||||
c := len(b) + n // ensure enough space for n elements
|
||||
if c < 2*cap(b) {
|
||||
// The growth rate has historically always been 2x. In the future,
|
||||
// we could rely purely on append to determine the growth rate.
|
||||
c = 2 * cap(b)
|
||||
}
|
||||
b2 := append(b, make([]T, c)...)
|
||||
return b2[:len(b)]
|
||||
}
|
||||
|
||||
func (c *ReusableSlice[T]) Len() int {
|
||||
return len(c.buf)
|
||||
}
|
||||
|
||||
func (c *ReusableSlice[T]) Reset() {
|
||||
c.buf = c.buf[:0]
|
||||
}
|
||||
|
||||
func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
||||
var cmds []Command
|
||||
cmds := cmdPool.Get().(*ReusableSlice[Command])
|
||||
cmds.Reset()
|
||||
defer cmdPool.Put(cmds)
|
||||
|
||||
b := rd.buf[rd.start:rd.end]
|
||||
if rd.end-rd.start == 0 && len(rd.buf) > 4096 {
|
||||
rd.buf = rd.buf[:4096]
|
||||
|
@ -860,7 +953,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
|||
cmd.Args[i] = append([]byte(nil), cmd.Args[i]...)
|
||||
}
|
||||
cmd.Raw = wr.b
|
||||
cmds = append(cmds, cmd)
|
||||
cmds.Append(cmd)
|
||||
}
|
||||
b = b[i+1:]
|
||||
if len(b) > 0 {
|
||||
|
@ -934,7 +1027,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
|||
for h := 0; h < len(marks); h += 2 {
|
||||
cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]]
|
||||
}
|
||||
cmds = append(cmds, cmd)
|
||||
cmds.Append(cmd)
|
||||
b = b[i+1:]
|
||||
if len(b) > 0 {
|
||||
goto next
|
||||
|
@ -951,8 +1044,10 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
|
|||
if leftover != nil {
|
||||
*leftover = rd.end - rd.start
|
||||
}
|
||||
if len(cmds) > 0 {
|
||||
return cmds, nil
|
||||
if cmds.Len() > 0 {
|
||||
rcmds := make([]Command, cmds.Len())
|
||||
copy(rcmds, cmds.buf)
|
||||
return rcmds, nil
|
||||
}
|
||||
if rd.rd == nil {
|
||||
return nil, errIncompleteCommand
|
||||
|
@ -987,7 +1082,7 @@ func (rd *Reader) ReadCommands() ([]Command, error) {
|
|||
}
|
||||
cmds, err := rd.readCommands(nil)
|
||||
if err != nil {
|
||||
return []Command{}, err
|
||||
return nil, err
|
||||
}
|
||||
rd.cmds = cmds
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue