src.dualinventive.com/go/influxdb-logger/vendor/github.com/mna/redisc/conn_test.go

306 lines
8.8 KiB
Go

package redisc
import (
"io"
"strings"
"testing"
"time"
"github.com/gomodule/redigo/redis"
"github.com/mna/redisc/redistest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Test the conn.ReadOnly behaviour in a cluster setup with 1 replica per
// node. Runs multiple tests in the same function because setting up
// such a cluster is slow.
func TestConnReadOnlyWithReplicas(t *testing.T) {
fn, ports := redistest.StartClusterWithReplicas(t, nil)
defer fn()
c := &Cluster{}
testWithReplicaBindRandomWithoutNode(t, c)
c = &Cluster{StartupNodes: []string{":" + ports[0]}}
testWithReplicaBindEmptySlot(t, c)
c = &Cluster{StartupNodes: []string{":" + ports[0]}}
testWithReplicaClusterRefresh(t, c, ports)
// at this point the cluster has refreshed its mapping
testReadWriteFromReplica(t, c, ports[redistest.NumClusterNodes:])
testReadOnlyWithRandomConn(t, c, ports[redistest.NumClusterNodes:])
testRetryReadOnlyConn(t, c, ports[:redistest.NumClusterNodes], ports[redistest.NumClusterNodes:])
}
func testRetryReadOnlyConn(t *testing.T, c *Cluster, masters []string, replicas []string) {
conn := c.Get().(*Conn)
defer conn.Close()
assert.NoError(t, ReadOnlyConn(conn), "ReadOnly")
rc, _ := RetryConn(conn, 4, time.Second)
// keys "a" and "b" are not in the same slot - bind to "a" and
// then ask for "b" to force a redirect.
assert.NoError(t, BindConn(conn, "a"), "Bind")
addr1 := assertBoundTo(t, conn, replicas)
if _, err := rc.Do("GET", "b"); assert.NoError(t, err, "GET b") {
addr2 := assertBoundTo(t, conn, replicas)
assert.NotEqual(t, addr1, addr2, "Bound to different replica")
// conn is now bound to the node serving slot "b". Send a READWRITE
// command and get "b" again, should re-bind to the same slot, but to
// the master.
_, err := rc.Do("READWRITE")
assert.NoError(t, err, "READWRITE")
if _, err := rc.Do("GET", "b"); assert.NoError(t, err, "GET b") {
addr3 := assertBoundTo(t, conn, masters)
assert.NotEqual(t, addr2, addr3, "Bound to the master")
}
}
}
// assert that conn is bound to one of the specified ports.
func assertBoundTo(t *testing.T, conn *Conn, ports []string) string {
conn.mu.Lock()
addr := conn.boundAddr
conn.mu.Unlock()
found := false
for _, port := range ports {
if strings.HasSuffix(addr, ":"+port) {
found = true
break
}
}
assert.True(t, found, "Bound address")
return addr
}
func testReadOnlyWithRandomConn(t *testing.T, c *Cluster, replicas []string) {
conn := c.Get().(*Conn)
defer conn.Close()
assert.NoError(t, ReadOnlyConn(conn), "ReadOnlyConn")
assert.NoError(t, BindConn(conn), "BindConn")
// it should now be bound to a random replica
assertBoundTo(t, conn, replicas)
}
func testReadWriteFromReplica(t *testing.T, c *Cluster, replicas []string) {
conn1 := c.Get()
defer conn1.Close()
_, err := conn1.Do("SET", "k1", "a")
assert.NoError(t, err, "SET on master")
conn2 := c.Get().(*Conn)
defer conn2.Close()
ReadOnlyConn(conn2)
// can read the key from the replica (may take a moment to replicate,
// so retry a few times)
var got string
deadline := time.Now().Add(100 * time.Millisecond)
for time.Now().Before(deadline) {
got, err = redis.String(conn2.Do("GET", "k1"))
if err != nil && got == "a" {
break
}
time.Sleep(10 * time.Millisecond)
}
if assert.NoError(t, err, "GET from replica") {
assert.Equal(t, "a", got, "expected value")
}
// bound address should be a replica
assertBoundTo(t, conn2, replicas)
// write command should fail with a MOVED
if _, err = conn2.Do("SET", "k1", "b"); assert.Error(t, err, "SET on ReadOnly conn") {
assert.Contains(t, err.Error(), "MOVED", "MOVED error")
}
// sending READWRITE switches the connection back to read from master
_, err = conn2.Do("READWRITE")
assert.NoError(t, err, "READWRITE")
// now even a GET fails with a MOVED
if _, err = conn2.Do("GET", "k1"); assert.Error(t, err, "GET on replica conn after READWRITE") {
assert.Contains(t, err.Error(), "MOVED", "MOVED error")
}
}
func testWithReplicaBindEmptySlot(t *testing.T, c *Cluster) {
conn := c.Get()
defer conn.Close()
// key "a" is not in node at [0], so will generate a refresh and connect
// to a random node (to node at [0]).
assert.NoError(t, conn.(*Conn).Bind("a"), "Bind to missing slot")
if _, err := conn.Do("GET", "a"); assert.Error(t, err, "GET") {
assert.Contains(t, err.Error(), "MOVED", "MOVED error")
}
// wait for refreshing to become false again
c.mu.Lock()
for c.refreshing {
c.mu.Unlock()
time.Sleep(100 * time.Millisecond)
c.mu.Lock()
}
for i, v := range c.mapping {
if !assert.NotEmpty(t, v, "Addr for %d", i) {
break
}
}
c.mu.Unlock()
}
func testWithReplicaBindRandomWithoutNode(t *testing.T, c *Cluster) {
conn := c.Get()
defer conn.Close()
if err := conn.(*Conn).Bind(); assert.Error(t, err, "Bind fails") {
assert.Contains(t, err.Error(), "failed to get a connection", "expected message")
}
}
func testWithReplicaClusterRefresh(t *testing.T, c *Cluster, ports []string) {
err := c.Refresh()
if assert.NoError(t, err, "Refresh") {
var prev string
pix := -1
for ix, node := range c.mapping {
if assert.Equal(t, 2, len(node), "Mapping for slot %d must have 2 nodes", ix) {
if node[0] != prev || ix == len(c.mapping)-1 {
prev = node[0]
t.Logf("%5d: %s\n", ix, node[0])
pix++
}
if assert.NotEmpty(t, node[0]) {
split0, split1 := strings.Index(node[0], ":"), strings.Index(node[1], ":")
assert.Contains(t, ports, node[0][split0+1:], "expected address")
assert.Contains(t, ports, node[1][split1+1:], "expected address")
}
} else {
break
}
}
}
}
func TestConnReadOnly(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()
c := &Cluster{
StartupNodes: []string{":" + ports[0]},
}
require.NoError(t, c.Refresh(), "Refresh")
conn := c.Get()
defer conn.Close()
cc := conn.(*Conn)
assert.NoError(t, cc.ReadOnly(), "ReadOnly")
// both get and set work, because the connection is on a master
_, err := cc.Do("SET", "b", 1)
assert.NoError(t, err, "SET")
v, err := redis.Int(cc.Do("GET", "b"))
if assert.NoError(t, err, "GET") {
assert.Equal(t, 1, v, "expected result")
}
conn2 := c.Get()
defer conn2.Close()
cc2 := conn2.(*Conn)
assert.NoError(t, cc2.Bind(), "Bind")
assert.Error(t, cc2.ReadOnly(), "ReadOnly after Bind")
}
func TestConnBind(t *testing.T) {
fn, ports := redistest.StartCluster(t, nil)
defer fn()
for i, p := range ports {
ports[i] = ":" + p
}
c := &Cluster{
StartupNodes: ports,
DialOptions: []redis.DialOption{redis.DialConnectTimeout(2 * time.Second)},
}
require.NoError(t, c.Refresh(), "Refresh")
conn := c.Get()
defer conn.Close()
if err := BindConn(conn, "A", "B"); assert.Error(t, err, "Bind with different keys") {
assert.Contains(t, err.Error(), "keys do not belong to the same slot", "expected message")
}
assert.NoError(t, BindConn(conn, "A"), "Bind")
if err := BindConn(conn, "B"); assert.Error(t, err, "Bind after Bind") {
assert.Contains(t, err.Error(), "connection already bound", "expected message")
}
conn2 := c.Get()
defer conn2.Close()
assert.NoError(t, BindConn(conn2), "Bind without key")
}
func TestConnClose(t *testing.T) {
c := &Cluster{
StartupNodes: []string{":6379"},
}
conn := c.Get()
require.NoError(t, conn.Close(), "Close")
_, err := conn.Do("A")
if assert.Error(t, err, "Do after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if assert.Error(t, conn.Err(), "Err after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if assert.Error(t, conn.Close(), "Close after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if assert.Error(t, conn.Flush(), "Flush after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if assert.Error(t, conn.Send("A"), "Send after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
_, err = conn.Receive()
if assert.Error(t, err, "Receive after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
cc := conn.(*Conn)
if assert.Error(t, cc.Bind("A"), "Bind after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
if assert.Error(t, cc.ReadOnly(), "ReadOnly after Close") {
assert.Contains(t, err.Error(), "redisc: closed", "expected message")
}
}
func TestIsRedisError(t *testing.T) {
err := error(redis.Error("CROSSSLOT some message"))
assert.True(t, IsCrossSlot(err), "CrossSlot")
assert.False(t, IsTryAgain(err), "CrossSlot")
err = redis.Error("TRYAGAIN some message")
assert.False(t, IsCrossSlot(err), "TryAgain")
assert.True(t, IsTryAgain(err), "TryAgain")
err = io.EOF
assert.False(t, IsCrossSlot(err), "EOF")
assert.False(t, IsTryAgain(err), "EOF")
err = redis.Error("ERR some error")
assert.False(t, IsCrossSlot(err), "ERR")
assert.False(t, IsTryAgain(err), "ERR")
}