src.dualinventive.com/go/redis-proxy/vendor/github.com/mna/redisc/cluster_test.go

520 lines
17 KiB
Go

package redisc
import (
"strings"
"sync"
"testing"
"time"
"github.com/gomodule/redigo/redis"
"github.com/mna/redisc/redistest"
"github.com/mna/redisc/redistest/resp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestClusterRefreshNormalServer(t *testing.T) {
cmd, port := redistest.StartServer(t, nil, "")
defer cmd.Process.Kill()
c := &Cluster{
StartupNodes: []string{":" + port},
}
err := c.Refresh()
if assert.Error(t, err, "Refresh") {
assert.Contains(t, err.Error(), "redisc: all nodes failed", "expected error message")
}
}
func assertMapping(t *testing.T, mapping [hashSlots][]string, masterPorts, replicaPorts []string) {
var prev string
pix := -1
expectedMappingNodes := 1 // at least a master node
if len(replicaPorts) > 0 {
// if there are replicase, then we expected 2 mapping nodes (master+replica)
expectedMappingNodes = 2
}
for ix, maps := range mapping {
if assert.Equal(t, expectedMappingNodes, len(maps), "Mapping has %d node(s)", expectedMappingNodes) {
if maps[0] != prev || ix == len(mapping)-1 {
prev = maps[0]
t.Logf("%5d: %s\n", ix, maps[0])
pix++
}
if assert.NotEmpty(t, maps[0]) {
split := strings.Index(maps[0], ":")
assert.Contains(t, masterPorts, maps[0][split+1:], "expected master")
}
if len(maps) > 1 && assert.NotEmpty(t, maps[1]) {
split := strings.Index(maps[1], ":")
assert.Contains(t, replicaPorts, maps[1][split+1:], "expected replica")
}
}
}
}
func TestClusterRefresh(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()
c := &Cluster{
StartupNodes: []string{":" + ports[0]},
}
err := c.Refresh()
if assert.NoError(t, err, "Refresh") {
assertMapping(t, c.mapping, ports, nil)
}
}
func TestClusterRefreshStartWithReplica(t *testing.T) {
fn, ports := redistest.StartClusterWithReplicas(t, nil)
defer fn()
c := &Cluster{
StartupNodes: []string{":" + ports[len(ports)-1]}, // last port is a replica
}
err := c.Refresh()
if assert.NoError(t, err, "Refresh") {
assertMapping(t, c.mapping, ports[:redistest.NumClusterNodes], ports[redistest.NumClusterNodes:])
}
}
func TestClusterRefreshAllFail(t *testing.T) {
s := redistest.StartMockServer(t, func(cmd string, args ...string) interface{} {
return resp.Error("nope")
})
defer s.Close()
c := &Cluster{
StartupNodes: []string{s.Addr},
}
if err := c.Refresh(); assert.Error(t, err, "Refresh") {
assert.Contains(t, err.Error(), "all nodes failed", "expected message")
}
require.NoError(t, c.Close(), "Close")
}
func TestClusterNoNode(t *testing.T) {
c := &Cluster{}
conn := c.Get()
_, err := conn.Do("A")
if assert.Error(t, err, "Do") {
assert.Contains(t, err.Error(), "failed to get a connection", "expected message")
}
if err := BindConn(conn); assert.Error(t, err, "Bind without key") {
assert.Contains(t, err.Error(), "failed to get a connection", "expected message")
}
if err := BindConn(conn, "A"); assert.Error(t, err, "Bind with key") {
assert.Contains(t, err.Error(), "failed to get a connection", "expected message")
}
}
func TestClusterNeedsRefresh(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()
for i, p := range ports {
ports[i] = ":" + p
}
c := &Cluster{
StartupNodes: ports,
}
defer c.Close()
conn := c.Get().(*Conn)
defer conn.Close()
// at this point, no mapping is stored
c.mu.Lock()
for i, v := range c.mapping {
if !assert.Empty(t, v, "No addr for %d", i) {
break
}
}
c.mu.Unlock()
// calling Do may or may not generate a MOVED error (it will get a
// random node, because no mapping is known yet)
conn.Do("GET", "b")
// wait for refreshing to become false again
c.mu.Lock()
for c.refreshing {
c.mu.Unlock()
time.Sleep(100 * time.Millisecond)
c.mu.Lock()
}
for i, v := range c.mapping {
if !assert.NotEmpty(t, v, "Addr for %d", i) {
break
}
}
c.mu.Unlock()
}
func TestClusterClose(t *testing.T) {
c := &Cluster{
StartupNodes: []string{":6379"},
DialOptions: []redis.DialOption{redis.DialConnectTimeout(2 * time.Second)},
CreatePool: createPool,
}
assert.NoError(t, c.Close(), "Close")
if err := c.Close(); assert.Error(t, err, "Close after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if conn := c.Get(); assert.Error(t, conn.Err(), "Get after Close") {
assert.Contains(t, conn.Err().Error(), "redisc: closed", "expected message")
}
if _, err := c.Dial(); assert.Error(t, err, "Dial after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if err := c.Refresh(); assert.Error(t, err, "Refresh after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
}
func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
return &redis.Pool{
MaxIdle: 5,
MaxActive: 10,
IdleTimeout: time.Minute,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr, opts...)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}, nil
}
type redisCmd struct {
name string
args redis.Args
resp interface{} // if resp is of type lenResult, asserts that there is a result at least this long
errMsg string
}
type lenResult int
func TestCommands(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()
cmdsPerGroup := map[string][]redisCmd{
"cluster": {
{"CLUSTER", redis.Args{"INFO"}, lenResult(10), ""},
{"READONLY", nil, "OK", ""},
{"READWRITE", nil, "OK", ""},
{"CLUSTER", redis.Args{"COUNTKEYSINSLOT", 12345}, int64(0), ""},
{"CLUSTER", redis.Args{"KEYSLOT", "a"}, int64(15495), ""},
{"CLUSTER", redis.Args{"NODES"}, lenResult(100), ""},
},
"connection": {
{"AUTH", redis.Args{"pwd"}, nil, "ERR Client sent AUTH, but no password is set"},
{"ECHO", redis.Args{"a"}, []byte("a"), ""},
{"PING", nil, "PONG", ""},
{"SELECT", redis.Args{1}, nil, "ERR SELECT is not allowed in cluster mode"},
{"QUIT", nil, "OK", ""},
},
"hashes": {
{"HSET", redis.Args{"ha", "f1", "1"}, int64(1), ""},
{"HLEN", redis.Args{"ha"}, int64(1), ""},
{"HEXISTS", redis.Args{"ha", "f1"}, int64(1), ""},
{"HDEL", redis.Args{"ha", "f1", "f2"}, int64(1), ""},
{"HINCRBY", redis.Args{"hb", "f1", "1"}, int64(1), ""},
{"HINCRBYFLOAT", redis.Args{"hb", "f2", "0.5"}, []byte("0.5"), ""},
{"HKEYS", redis.Args{"hb"}, []interface{}{[]byte("f1"), []byte("f2")}, ""},
{"HMGET", redis.Args{"hb", "f1", "f2"}, []interface{}{[]byte("1"), []byte("0.5")}, ""},
{"HMSET", redis.Args{"hc", "f1", "a", "f2", "b"}, "OK", ""},
{"HSET", redis.Args{"ha", "f1", "2"}, int64(1), ""},
{"HGET", redis.Args{"ha", "f1"}, []byte("2"), ""},
{"HGETALL", redis.Args{"ha"}, []interface{}{[]byte("f1"), []byte("2")}, ""},
{"HSETNX", redis.Args{"ha", "f2", "3"}, int64(1), ""},
//{"HSTRLEN", redis.Args{"hb", "f2"}, int64(3), ""}, // redis 3.2 only
{"HVALS", redis.Args{"hb"}, []interface{}{[]byte("1"), []byte("0.5")}, ""},
{"HSCAN", redis.Args{"hb", 0}, lenResult(2), ""},
},
"hyperloglog": {
{"PFADD", redis.Args{"hll", "a", "b", "c"}, int64(1), ""},
{"PFCOUNT", redis.Args{"hll"}, int64(3), ""},
{"PFADD", redis.Args{"hll2", "d"}, int64(1), ""},
{"PFMERGE", redis.Args{"hll", "hll2"}, nil, "CROSSSLOT"},
},
"keys": {
{"SET", redis.Args{"k1", "z"}, "OK", ""},
{"EXISTS", redis.Args{"k1"}, int64(1), ""},
{"DUMP", redis.Args{"k1"}, lenResult(10), ""},
{"EXPIRE", redis.Args{"k1", 10}, int64(1), ""},
{"EXPIREAT", redis.Args{"k1", time.Now().Add(time.Hour).Unix()}, int64(1), ""},
{"KEYS", redis.Args{"z*"}, []interface{}{}, ""}, // KEYS is supported, but uses a random node and returns keys from that node (undeterministic)
{"MOVE", redis.Args{"k1", 2}, nil, "ERR MOVE is not allowed in cluster mode"},
{"PERSIST", redis.Args{"k1"}, int64(1), ""},
{"PEXPIRE", redis.Args{"k1", 10000}, int64(1), ""},
{"PEXPIREAT", redis.Args{"k1", time.Now().Add(time.Hour).UnixNano() / int64(time.Millisecond)}, int64(1), ""},
{"PTTL", redis.Args{"k1"}, lenResult(3500000), ""},
// RANDOMKEY is not deterministic
{"RENAME", redis.Args{"k1", "k2"}, nil, "CROSSSLOT"},
{"RENAMENX", redis.Args{"k1", "k2"}, nil, "CROSSSLOT"},
{"SCAN", redis.Args{0}, lenResult(2), ""}, // works, but only for the keys on that random node
{"TTL", redis.Args{"k1"}, lenResult(3000), ""},
{"TYPE", redis.Args{"k1"}, "string", ""},
{"DEL", redis.Args{"k1"}, int64(1), ""},
{"SADD", redis.Args{"k3", "a", "z", "d"}, int64(3), ""},
{"SORT", redis.Args{"k3", "ALPHA"}, []interface{}{[]byte("a"), []byte("d"), []byte("z")}, ""},
},
"lists": {
{"LPUSH", redis.Args{"l1", "a", "b", "c"}, int64(3), ""},
{"LINDEX", redis.Args{"l1", 1}, []byte("b"), ""},
{"LINSERT", redis.Args{"l1", "BEFORE", "b", "d"}, int64(4), ""},
{"LLEN", redis.Args{"l1"}, int64(4), ""},
{"LPOP", redis.Args{"l1"}, []byte("c"), ""},
{"LPUSHX", redis.Args{"l1", "e"}, int64(4), ""},
{"LRANGE", redis.Args{"l1", 0, 1}, []interface{}{[]byte("e"), []byte("d")}, ""},
{"LREM", redis.Args{"l1", 0, "d"}, int64(1), ""},
{"LSET", redis.Args{"l1", 0, "f"}, "OK", ""},
{"LTRIM", redis.Args{"l1", 0, 3}, "OK", ""},
{"RPOP", redis.Args{"l1"}, []byte("a"), ""},
{"RPOPLPUSH", redis.Args{"l1", "l2"}, nil, "CROSSSLOT"},
{"RPUSH", redis.Args{"l1", "g"}, int64(3), ""},
{"RPUSH", redis.Args{"l1", "h"}, int64(4), ""},
{"BLPOP", redis.Args{"l1", 1}, lenResult(2), ""},
{"BRPOP", redis.Args{"l1", 1}, lenResult(2), ""},
{"BRPOPLPUSH", redis.Args{"l1", "l2", 1}, nil, "CROSSSLOT"},
},
"pubsub": {
{"PUBSUB", redis.Args{"NUMPAT"}, lenResult(0), ""},
{"PUBLISH", redis.Args{"ev1", "a"}, lenResult(0), ""},
// to actually subscribe to events, only Send must be called, and Receive to listen (or redis.PubSubConn must be used)
},
"scripting": {
{"SCRIPT", redis.Args{"FLUSH"}, "OK", ""},
{"SCRIPT", redis.Args{"EXISTS", "return GET x"}, []interface{}{int64(0)}, ""},
// to actually use scripts with keys, conn.Bind must be called to select the right node
},
"server": {
{"CLIENT", redis.Args{"LIST"}, lenResult(10), ""},
{"COMMAND", nil, lenResult(50), ""},
{"INFO", nil, lenResult(100), ""},
{"TIME", nil, lenResult(2), ""},
},
"sets": {
{"SADD", redis.Args{"t1", "a", "b"}, int64(2), ""},
{"SADD", redis.Args{"{t1}.b", "c", "b"}, int64(2), ""},
{"SCARD", redis.Args{"t1"}, int64(2), ""},
{"SDIFF", redis.Args{"t1", "t2"}, nil, "CROSSSLOT"},
{"SDIFFSTORE", redis.Args{"{t1}.3", "t1", "{t1}.2"}, int64(2), ""},
{"SINTER", redis.Args{"t1", "{t1}.b"}, []interface{}{[]byte("b")}, ""},
{"SINTERSTORE", redis.Args{"{t1}.c", "t1", "{t1}.b"}, int64(1), ""},
{"SISMEMBER", redis.Args{"t1", "a"}, int64(1), ""},
{"SMEMBERS", redis.Args{"t1"}, lenResult(2), ""}, // order is not deterministic
{"SMOVE", redis.Args{"t1", "{t1}.c", "a"}, int64(1), ""},
{"SPOP", redis.Args{"t3"}, nil, ""},
{"SRANDMEMBER", redis.Args{"t3"}, nil, ""},
{"SREM", redis.Args{"t1", "b"}, int64(1), ""},
{"SSCAN", redis.Args{"{t1}.b", 0}, lenResult(2), ""},
{"SUNION", redis.Args{"{t1}.b", "{t1}.c"}, lenResult(3), ""},
{"SUNIONSTORE", redis.Args{"{t1}.d", "{t1}.b", "{t1}.c"}, int64(3), ""},
},
"sortedsets": {
{"ZADD", redis.Args{"z1", 1, "m1", 2, "m2", 3, "m3"}, int64(3), ""},
{"ZCARD", redis.Args{"z1"}, int64(3), ""},
{"ZCOUNT", redis.Args{"z1", "(1", "3"}, int64(2), ""},
{"ZINCRBY", redis.Args{"z1", 1, "m1"}, []byte("2"), ""},
{"ZINTERSTORE", redis.Args{"z2", 1, "z1"}, nil, "CROSSSLOT"},
{"ZLEXCOUNT", redis.Args{"z1", "[m1", "[m2"}, int64(2), ""},
{"ZRANGE", redis.Args{"z1", 0, 0}, []interface{}{[]byte("m1")}, ""},
{"ZRANGEBYLEX", redis.Args{"z1", "[m1", "(m2"}, []interface{}{[]byte("m1")}, ""},
{"ZRANGEBYSCORE", redis.Args{"z1", "(2", "3"}, []interface{}{[]byte("m3")}, ""},
{"ZRANK", redis.Args{"z1", "m3"}, int64(2), ""},
{"ZREM", redis.Args{"z1", "m1"}, int64(1), ""},
// TODO : complete commands...
{"ZSCORE", redis.Args{"z1", "m3"}, []byte("3"), ""},
},
"strings": {
{"APPEND", redis.Args{"s1", "a"}, int64(1), ""},
{"BITCOUNT", redis.Args{"s1"}, int64(3), ""},
{"GET", redis.Args{"s1"}, []byte("a"), ""},
{"MSET", redis.Args{"s2", "b", "s3", "c"}, "", "CROSSSLOT"},
{"SET", redis.Args{"s{b}", "b"}, "OK", ""},
{"SET", redis.Args{"s{bcd}", "c"}, "OK", ""},
// keys "b" (3300) and "bcd" (1872) are both in a hash slot < 5000, so on same node for this test
// yet it still fails with CROSSSLOT.
{"MGET", redis.Args{"s{b}", "s{bcd}"}, "", "CROSSSLOT"},
},
"transactions": {
{"DISCARD", nil, "", "ERR DISCARD without MULTI"},
{"EXEC", nil, "", "ERR EXEC without MULTI"},
{"MULTI", nil, "OK", ""},
{"SET", redis.Args{"tr1", 1}, "OK", ""},
{"WATCH", redis.Args{"tr1"}, "OK", ""},
{"UNWATCH", nil, "OK", ""},
// to actually use transactions, conn.Bind must be called to select the right node
},
}
for i, p := range ports {
ports[i] = ":" + p
}
c := &Cluster{
StartupNodes: ports,
DialOptions: []redis.DialOption{redis.DialConnectTimeout(2 * time.Second)},
CreatePool: createPool,
}
require.NoError(t, c.Refresh(), "Refresh")
var wg sync.WaitGroup
// start a goroutine that subscribes and listens to events
ok := make(chan int)
done := make(chan int)
go runPubSubCommands(t, c, ok, done)
<-ok
wg.Add(len(cmdsPerGroup))
for _, cmds := range cmdsPerGroup {
go runCommands(t, c, cmds, &wg)
}
wg.Add(2)
go runScriptCommands(t, c, &wg)
go runTransactionsCommands(t, c, &wg)
wg.Wait()
close(done)
<-ok
assert.NoError(t, c.Close(), "Cluster Close")
}
func runTransactionsCommands(t *testing.T, c *Cluster, wg *sync.WaitGroup) {
defer wg.Done()
conn := c.Get()
defer conn.Close()
if conn, ok := conn.(*Conn); ok {
require.NoError(t, conn.Bind("tr{a}1", "tr{a}2"), "Bind")
}
_, err := conn.Do("WATCH", "tr{a}1")
assert.NoError(t, err, "WATCH")
_, err = conn.Do("MULTI")
assert.NoError(t, err, "MULTI")
_, err = conn.Do("SET", "tr{a}1", "a")
assert.NoError(t, err, "SET 1")
_, err = conn.Do("SET", "tr{a}2", "b")
assert.NoError(t, err, "SET 2")
_, err = conn.Do("EXEC")
assert.NoError(t, err, "EXEC")
v, err := redis.Strings(conn.Do("MGET", "tr{a}1", "tr{a}2"))
assert.NoError(t, err, "MGET")
if assert.Equal(t, 2, len(v), "Number of MGET results") {
assert.Equal(t, "a", v[0], "MGET[0]")
assert.Equal(t, "b", v[1], "MGET[1]")
}
}
func runPubSubCommands(t *testing.T, c *Cluster, steps, stop chan int) {
conn, err := c.Dial()
require.NoError(t, err, "Dial for PubSub")
psc := redis.PubSubConn{Conn: conn}
assert.NoError(t, psc.PSubscribe("ev*"), "PSubscribe")
assert.NoError(t, psc.Subscribe("e1"), "Subscribe")
// allow commands to start running
steps <- 1
var received bool
loop:
for {
select {
case <-stop:
break loop
default:
}
v := psc.Receive()
switch v := v.(type) {
case redis.Message:
if !assert.Equal(t, []byte("a"), v.Data, "Received value") {
t.Logf("%T", v)
}
received = true
break loop
}
}
<-stop
assert.NoError(t, psc.Unsubscribe("e1"), "Unsubscribe")
assert.NoError(t, psc.PUnsubscribe("ev*"), "PUnsubscribe")
assert.NoError(t, psc.Close(), "Close for PubSub")
assert.True(t, received, "Did receive event")
steps <- 1
}
func runScriptCommands(t *testing.T, c *Cluster, wg *sync.WaitGroup) {
defer wg.Done()
var script = redis.NewScript(2, `
redis.call("SET", KEYS[1], ARGV[1])
redis.call("SET", KEYS[2], ARGV[2])
return 1
`)
conn := c.Get()
defer conn.Close()
require.NoError(t, BindConn(conn, "scr{a}1", "src{a}2"), "Bind")
// script.Do, send the whole script
v, err := script.Do(conn, "scr{a}1", "scr{a}2", "x", "y")
assert.NoError(t, err, "Do script")
assert.Equal(t, int64(1), v, "Script result")
// send only the hash, should work because the script is now loaded on this node
assert.NoError(t, script.SendHash(conn, "scr{a}1", "scr{a}2", "x", "y"), "SendHash")
assert.NoError(t, conn.Flush(), "Flush")
v, err = conn.Receive()
assert.NoError(t, err, "SendHash Receive")
assert.Equal(t, int64(1), v, "SendHash Script result")
// do with keys from different slots
v, err = script.Do(conn, "scr{a}1", "scr{b}2", "x", "y")
if assert.Error(t, err, "Do script invalid keys") {
assert.Contains(t, err.Error(), "CROSSSLOT", "Do script invalid keys")
}
}
func runCommands(t *testing.T, c *Cluster, cmds []redisCmd, wg *sync.WaitGroup) {
defer wg.Done()
for _, cmd := range cmds {
conn := c.Get()
res, err := conn.Do(cmd.name, cmd.args...)
if cmd.errMsg != "" {
if assert.Error(t, err, cmd.name) {
assert.Contains(t, err.Error(), cmd.errMsg, cmd.name)
}
} else {
assert.NoError(t, err, cmd.name)
if lr, ok := cmd.resp.(lenResult); ok {
switch res := res.(type) {
case []byte:
assert.True(t, len(res) >= int(lr), "result has at least %d bytes, has %d", lr, len(res))
case []interface{}:
assert.True(t, len(res) >= int(lr), "result array has at least %d items, has %d", lr, len(res))
case int64:
assert.True(t, res >= int64(lr), "result is at least %d, is %d", lr, res)
default:
t.Errorf("unexpected result type %T", res)
}
} else {
if !assert.Equal(t, cmd.resp, res, cmd.name) {
t.Logf("%T vs %T", cmd.resp, res)
}
}
}
require.NoError(t, conn.Close(), "Close")
}
}