ledisdb/ledis/sort.go

234 lines
4.7 KiB
Go

package ledis
import (
"bytes"
"fmt"
"sort"
"strconv"
"github.com/siddontang/ledisdb/store"
)
type Limit struct {
Offset int
Size int
}
func getSortRange(values [][]byte, offset int, size int) (int, int) {
var start = 0
if offset > 0 {
start = offset
}
valueLen := len(values)
var end = valueLen - 1
if size > 0 {
end = start + size - 1
}
if start >= valueLen {
start = valueLen - 1
end = valueLen - 2
}
if end >= valueLen {
end = valueLen - 1
}
return start, end
}
var hashPattern = []byte("*->")
func (db *DB) lookupKeyByPattern(pattern []byte, subKey []byte) []byte {
// If the pattern is #, return the substitution key itself
if bytes.Equal(pattern, []byte{'#'}) {
return subKey
}
// If we can't find '*' in the pattern, return nil
if !bytes.Contains(pattern, []byte{'*'}) {
return nil
}
key := pattern
var field []byte = nil
// Find out if we're dealing with a hash dereference
if n := bytes.Index(pattern, hashPattern); n > 0 && n+3 < len(pattern) {
key = pattern[0 : n+1]
field = pattern[n+3:]
}
// Perform the '*' substitution
key = bytes.Replace(key, []byte{'*'}, subKey, 1)
var value []byte
if field == nil {
value, _ = db.Get(key)
} else {
value, _ = db.HGet(key, field)
}
return value
}
type sortItem struct {
value []byte
cmpValue []byte
score float64
}
type sortItemSlice struct {
alpha bool
sortByPattern bool
items []sortItem
}
func (s *sortItemSlice) Len() int {
return len(s.items)
}
func (s *sortItemSlice) Swap(i, j int) {
s.items[i], s.items[j] = s.items[j], s.items[i]
}
func (s *sortItemSlice) Less(i, j int) bool {
s1 := s.items[i]
s2 := s.items[j]
if !s.alpha {
if s1.score < s2.score {
return true
} else if s1.score > s2.score {
return false
} else {
return bytes.Compare(s1.value, s2.value) < 0
}
} else {
if s.sortByPattern {
if s1.cmpValue == nil || s2.cmpValue == nil {
if s1.cmpValue == nil {
return true
} else {
return false
}
} else {
// Unlike redis, we only use bytes compare
return bytes.Compare(s1.cmpValue, s2.cmpValue) < 0
}
} else {
// Unlike redis, we only use bytes compare
return bytes.Compare(s1.value, s2.value) < 0
}
}
}
func (db *DB) xsort(values [][]byte, offset int, size int, alpha bool, desc bool, sortBy []byte, sortGet [][]byte) ([][]byte, error) {
if len(values) == 0 {
return [][]byte{}, nil
}
start, end := getSortRange(values, offset, size)
dontsort := 0
if sortBy != nil {
if !bytes.Contains(sortBy, []byte{'*'}) {
dontsort = 1
}
}
items := &sortItemSlice{
alpha: alpha,
sortByPattern: sortBy != nil,
items: make([]sortItem, len(values)),
}
for i, value := range values {
items.items[i].value = value
items.items[i].score = 0
items.items[i].cmpValue = nil
if dontsort == 0 {
var cmpValue []byte
if sortBy != nil {
cmpValue = db.lookupKeyByPattern(sortBy, value)
} else {
// use value iteself to sort by
cmpValue = value
}
if cmpValue == nil {
continue
}
if alpha {
if sortBy != nil {
items.items[i].cmpValue = cmpValue
}
} else {
score, err := strconv.ParseFloat(string(cmpValue), 64)
if err != nil {
return nil, fmt.Errorf("%s scores can't be converted into double", cmpValue)
}
items.items[i].score = score
}
}
}
if dontsort == 0 {
if !desc {
sort.Sort(items)
} else {
sort.Sort(sort.Reverse(items))
}
}
var resLen int = end - start + 1
if len(sortGet) > 0 {
resLen = len(sortGet) * (end - start + 1)
}
res := make([][]byte, 0, resLen)
for i := start; i <= end; i++ {
if len(sortGet) == 0 {
res = append(res, items.items[i].value)
} else {
for _, getPattern := range sortGet {
v := db.lookupKeyByPattern(getPattern, items.items[i].value)
res = append(res, v)
}
}
}
return res, nil
}
func (db *DB) XLSort(key []byte, offset int, size int, alpha bool, desc bool, sortBy []byte, sortGet [][]byte) ([][]byte, error) {
values, err := db.LRange(key, 0, -1)
if err != nil {
return nil, err
}
return db.xsort(values, offset, size, alpha, desc, sortBy, sortGet)
}
func (db *DB) XSSort(key []byte, offset int, size int, alpha bool, desc bool, sortBy []byte, sortGet [][]byte) ([][]byte, error) {
values, err := db.SMembers(key)
if err != nil {
return nil, err
}
return db.xsort(values, offset, size, alpha, desc, sortBy, sortGet)
}
func (db *DB) XZSort(key []byte, offset int, size int, alpha bool, desc bool, sortBy []byte, sortGet [][]byte) ([][]byte, error) {
values, err := db.ZRangeByLex(key, nil, nil, store.RangeClose, 0, -1)
if err != nil {
return nil, err
}
return db.xsort(values, offset, size, alpha, desc, sortBy, sortGet)
}