Compare commits

...

5 Commits

Author SHA1 Message Date
Fusl 2f2825976d use a reusable slice for storing args in readcommands functions 2024-05-19 08:23:38 +00:00
Josh Baker 9f71787fcd
Create README.md 2023-01-29 16:34:09 -07:00
Josh Baker d05da895e5
Merge pull request #62 from tomarrell/master
fix: race condition in initialisation of server
2023-01-20 09:11:59 -07:00
Tom Arrell c746dbc05b
fix: race condition in initialisation of server 2023-01-20 15:12:37 +01:00
tidwall 8b671291b8 Fix race condition on server close
closes #61
2023-01-12 09:18:56 -07:00
3 changed files with 119 additions and 22 deletions

View File

@ -18,6 +18,8 @@ Features
- Compatible pub/sub support
- Multithreaded
*This library is also avaliable for [Rust](https://github.com/tidwall/redcon.rs) and [C](https://github.com/tidwall/redcon.c).*
Installing
----------

2
go.mod
View File

@ -1,6 +1,6 @@
module github.com/tidwall/redcon
go 1.15
go 1.19
require (
github.com/tidwall/btree v1.1.0

137
redcon.go
View File

@ -3,6 +3,7 @@ package redcon
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"fmt"
@ -235,12 +236,14 @@ func Serve(ln net.Listener,
closed func(conn Conn, err error),
) error {
s := newServer()
s.mu.Lock()
s.net = ln.Addr().Network()
s.laddr = ln.Addr().String()
s.ln = ln
s.handler = handler
s.accept = accept
s.closed = closed
s.mu.Unlock()
return serve(s)
}
@ -296,7 +299,9 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
}
return err
}
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
if signal != nil {
signal <- nil
}
@ -305,9 +310,11 @@ func (s *Server) ListenServeAndSignal(signal chan error) error {
// Serve serves incoming connections with the given net.Listener.
func (s *Server) Serve(ln net.Listener) error {
s.mu.Lock()
s.ln = ln
s.net = ln.Addr().Network()
s.laddr = ln.Addr().String()
s.mu.Unlock()
return serve(s)
}
@ -321,7 +328,9 @@ func (s *TLSServer) ListenServeAndSignal(signal chan error) error {
}
return err
}
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
if signal != nil {
signal <- nil
}
@ -335,7 +344,7 @@ func serve(s *Server) error {
s.mu.Lock()
defer s.mu.Unlock()
for c := range s.conns {
c.Close()
c.conn.Close()
}
s.conns = nil
}()
@ -602,9 +611,9 @@ func (w *Writer) WriteNull() {
// sub-responses to the client to complete the response.
// For example to write two strings:
//
// c.WriteArray(2)
// c.WriteBulkString("item 1")
// c.WriteBulkString("item 2")
// c.WriteArray(2)
// c.WriteBulkString("item 1")
// c.WriteBulkString("item 2")
func (w *Writer) WriteArray(count int) {
if w.err != nil {
return
@ -709,17 +718,18 @@ func (w *Writer) WriteRaw(data []byte) {
}
// WriteAny writes any type to client.
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// everything-else -> bulk-string representation using fmt.Sprint()
//
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// everything-else -> bulk-string representation using fmt.Sprint()
func (w *Writer) WriteAny(v interface{}) {
if w.err != nil {
return
@ -767,8 +777,91 @@ func parseInt(b []byte) (int, bool) {
return n, true
}
var cmdPool = sync.Pool{
New: func() any {
return &ReusableSlice[Command]{}
},
}
type ReusableSlice[T any] struct {
buf []T
}
func (c *ReusableSlice[T]) Append(cmd T) {
m, ok := c.tryGrowByReslice(1)
if !ok {
m = c.grow(1)
}
c.buf[m] = cmd
}
const smallBufferSize = 64
const maxInt = int(^uint(0) >> 1)
func (c *ReusableSlice[T]) tryGrowByReslice(n int) (int, bool) {
if l := len(c.buf); n <= cap(c.buf)-l {
c.buf = c.buf[:l+n]
return l, true
}
return 0, false
}
func (c *ReusableSlice[T]) grow(n int) int {
m := c.Len()
// If buffer is empty, reset to recover space.
if m == 0 {
c.Reset()
}
// Try to grow by means of a reslice.
if i, ok := c.tryGrowByReslice(n); ok {
return i
}
if c.buf == nil && n <= smallBufferSize {
c.buf = make([]T, n, smallBufferSize)
return 0
}
cs := cap(c.buf)
if cs > maxInt-cs-n {
panic(bytes.ErrTooLarge)
} else {
// Add c.off to account for c.buf[:c.off] being sliced off the front.
c.buf = growSlice(c.buf, n)
}
// Restore len(c.buf).
c.buf = c.buf[:m+n]
return m
}
func growSlice[T any](b []T, n int) []T {
defer func() {
if recover() != nil {
panic(bytes.ErrTooLarge)
}
}()
c := len(b) + n // ensure enough space for n elements
if c < 2*cap(b) {
// The growth rate has historically always been 2x. In the future,
// we could rely purely on append to determine the growth rate.
c = 2 * cap(b)
}
b2 := append(b, make([]T, c)...)
return b2[:len(b)]
}
func (c *ReusableSlice[T]) Len() int {
return len(c.buf)
}
func (c *ReusableSlice[T]) Reset() {
c.buf = c.buf[:0]
}
func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
var cmds []Command
cmds := cmdPool.Get().(*ReusableSlice[Command])
cmds.Reset()
defer cmdPool.Put(cmds)
b := rd.buf[rd.start:rd.end]
if rd.end-rd.start == 0 && len(rd.buf) > 4096 {
rd.buf = rd.buf[:4096]
@ -860,7 +953,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
cmd.Args[i] = append([]byte(nil), cmd.Args[i]...)
}
cmd.Raw = wr.b
cmds = append(cmds, cmd)
cmds.Append(cmd)
}
b = b[i+1:]
if len(b) > 0 {
@ -934,7 +1027,7 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
for h := 0; h < len(marks); h += 2 {
cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]]
}
cmds = append(cmds, cmd)
cmds.Append(cmd)
b = b[i+1:]
if len(b) > 0 {
goto next
@ -951,8 +1044,10 @@ func (rd *Reader) readCommands(leftover *int) ([]Command, error) {
if leftover != nil {
*leftover = rd.end - rd.start
}
if len(cmds) > 0 {
return cmds, nil
if cmds.Len() > 0 {
rcmds := make([]Command, cmds.Len())
copy(rcmds, cmds.buf)
return rcmds, nil
}
if rd.rd == nil {
return nil, errIncompleteCommand
@ -987,7 +1082,7 @@ func (rd *Reader) ReadCommands() ([]Command, error) {
}
cmds, err := rd.readCommands(nil)
if err != nil {
return []Command{}, err
return nil, err
}
rd.cmds = cmds
}