package redisc import ( "io" "strings" "testing" "time" "github.com/gomodule/redigo/redis" "github.com/mna/redisc/redistest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // Test the conn.ReadOnly behaviour in a cluster setup with 1 replica per // node. Runs multiple tests in the same function because setting up // such a cluster is slow. func TestConnReadOnlyWithReplicas(t *testing.T) { fn, ports := redistest.StartClusterWithReplicas(t, nil) defer fn() c := &Cluster{} testWithReplicaBindRandomWithoutNode(t, c) c = &Cluster{StartupNodes: []string{":" + ports[0]}} testWithReplicaBindEmptySlot(t, c) c = &Cluster{StartupNodes: []string{":" + ports[0]}} testWithReplicaClusterRefresh(t, c, ports) // at this point the cluster has refreshed its mapping testReadWriteFromReplica(t, c, ports[redistest.NumClusterNodes:]) testReadOnlyWithRandomConn(t, c, ports[redistest.NumClusterNodes:]) testRetryReadOnlyConn(t, c, ports[:redistest.NumClusterNodes], ports[redistest.NumClusterNodes:]) } func testRetryReadOnlyConn(t *testing.T, c *Cluster, masters []string, replicas []string) { conn := c.Get().(*Conn) defer conn.Close() assert.NoError(t, ReadOnlyConn(conn), "ReadOnly") rc, _ := RetryConn(conn, 4, time.Second) // keys "a" and "b" are not in the same slot - bind to "a" and // then ask for "b" to force a redirect. assert.NoError(t, BindConn(conn, "a"), "Bind") addr1 := assertBoundTo(t, conn, replicas) if _, err := rc.Do("GET", "b"); assert.NoError(t, err, "GET b") { addr2 := assertBoundTo(t, conn, replicas) assert.NotEqual(t, addr1, addr2, "Bound to different replica") // conn is now bound to the node serving slot "b". Send a READWRITE // command and get "b" again, should re-bind to the same slot, but to // the master. _, err := rc.Do("READWRITE") assert.NoError(t, err, "READWRITE") if _, err := rc.Do("GET", "b"); assert.NoError(t, err, "GET b") { addr3 := assertBoundTo(t, conn, masters) assert.NotEqual(t, addr2, addr3, "Bound to the master") } } } // assert that conn is bound to one of the specified ports. func assertBoundTo(t *testing.T, conn *Conn, ports []string) string { conn.mu.Lock() addr := conn.boundAddr conn.mu.Unlock() found := false for _, port := range ports { if strings.HasSuffix(addr, ":"+port) { found = true break } } assert.True(t, found, "Bound address") return addr } func testReadOnlyWithRandomConn(t *testing.T, c *Cluster, replicas []string) { conn := c.Get().(*Conn) defer conn.Close() assert.NoError(t, ReadOnlyConn(conn), "ReadOnlyConn") assert.NoError(t, BindConn(conn), "BindConn") // it should now be bound to a random replica assertBoundTo(t, conn, replicas) } func testReadWriteFromReplica(t *testing.T, c *Cluster, replicas []string) { conn1 := c.Get() defer conn1.Close() _, err := conn1.Do("SET", "k1", "a") assert.NoError(t, err, "SET on master") conn2 := c.Get().(*Conn) defer conn2.Close() ReadOnlyConn(conn2) // can read the key from the replica (may take a moment to replicate, // so retry a few times) var got string deadline := time.Now().Add(100 * time.Millisecond) for time.Now().Before(deadline) { got, err = redis.String(conn2.Do("GET", "k1")) if err != nil && got == "a" { break } time.Sleep(10 * time.Millisecond) } if assert.NoError(t, err, "GET from replica") { assert.Equal(t, "a", got, "expected value") } // bound address should be a replica assertBoundTo(t, conn2, replicas) // write command should fail with a MOVED if _, err = conn2.Do("SET", "k1", "b"); assert.Error(t, err, "SET on ReadOnly conn") { assert.Contains(t, err.Error(), "MOVED", "MOVED error") } // sending READWRITE switches the connection back to read from master _, err = conn2.Do("READWRITE") assert.NoError(t, err, "READWRITE") // now even a GET fails with a MOVED if _, err = conn2.Do("GET", "k1"); assert.Error(t, err, "GET on replica conn after READWRITE") { assert.Contains(t, err.Error(), "MOVED", "MOVED error") } } func testWithReplicaBindEmptySlot(t *testing.T, c *Cluster) { conn := c.Get() defer conn.Close() // key "a" is not in node at [0], so will generate a refresh and connect // to a random node (to node at [0]). assert.NoError(t, conn.(*Conn).Bind("a"), "Bind to missing slot") if _, err := conn.Do("GET", "a"); assert.Error(t, err, "GET") { assert.Contains(t, err.Error(), "MOVED", "MOVED error") } // wait for refreshing to become false again c.mu.Lock() for c.refreshing { c.mu.Unlock() time.Sleep(100 * time.Millisecond) c.mu.Lock() } for i, v := range c.mapping { if !assert.NotEmpty(t, v, "Addr for %d", i) { break } } c.mu.Unlock() } func testWithReplicaBindRandomWithoutNode(t *testing.T, c *Cluster) { conn := c.Get() defer conn.Close() if err := conn.(*Conn).Bind(); assert.Error(t, err, "Bind fails") { assert.Contains(t, err.Error(), "failed to get a connection", "expected message") } } func testWithReplicaClusterRefresh(t *testing.T, c *Cluster, ports []string) { err := c.Refresh() if assert.NoError(t, err, "Refresh") { var prev string pix := -1 for ix, node := range c.mapping { if assert.Equal(t, 2, len(node), "Mapping for slot %d must have 2 nodes", ix) { if node[0] != prev || ix == len(c.mapping)-1 { prev = node[0] t.Logf("%5d: %s\n", ix, node[0]) pix++ } if assert.NotEmpty(t, node[0]) { split0, split1 := strings.Index(node[0], ":"), strings.Index(node[1], ":") assert.Contains(t, ports, node[0][split0+1:], "expected address") assert.Contains(t, ports, node[1][split1+1:], "expected address") } } else { break } } } } func TestConnReadOnly(t *testing.T) { fn, ports := redistest.StartCluster(t, nil) defer fn() c := &Cluster{ StartupNodes: []string{":" + ports[0]}, } require.NoError(t, c.Refresh(), "Refresh") conn := c.Get() defer conn.Close() cc := conn.(*Conn) assert.NoError(t, cc.ReadOnly(), "ReadOnly") // both get and set work, because the connection is on a master _, err := cc.Do("SET", "b", 1) assert.NoError(t, err, "SET") v, err := redis.Int(cc.Do("GET", "b")) if assert.NoError(t, err, "GET") { assert.Equal(t, 1, v, "expected result") } conn2 := c.Get() defer conn2.Close() cc2 := conn2.(*Conn) assert.NoError(t, cc2.Bind(), "Bind") assert.Error(t, cc2.ReadOnly(), "ReadOnly after Bind") } func TestConnBind(t *testing.T) { fn, ports := redistest.StartCluster(t, nil) defer fn() for i, p := range ports { ports[i] = ":" + p } c := &Cluster{ StartupNodes: ports, DialOptions: []redis.DialOption{redis.DialConnectTimeout(2 * time.Second)}, } require.NoError(t, c.Refresh(), "Refresh") conn := c.Get() defer conn.Close() if err := BindConn(conn, "A", "B"); assert.Error(t, err, "Bind with different keys") { assert.Contains(t, err.Error(), "keys do not belong to the same slot", "expected message") } assert.NoError(t, BindConn(conn, "A"), "Bind") if err := BindConn(conn, "B"); assert.Error(t, err, "Bind after Bind") { assert.Contains(t, err.Error(), "connection already bound", "expected message") } conn2 := c.Get() defer conn2.Close() assert.NoError(t, BindConn(conn2), "Bind without key") } func TestConnClose(t *testing.T) { c := &Cluster{ StartupNodes: []string{":6379"}, } conn := c.Get() require.NoError(t, conn.Close(), "Close") _, err := conn.Do("A") if assert.Error(t, err, "Do after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } if assert.Error(t, conn.Err(), "Err after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } if assert.Error(t, conn.Close(), "Close after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } if assert.Error(t, conn.Flush(), "Flush after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } if assert.Error(t, conn.Send("A"), "Send after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } _, err = conn.Receive() if assert.Error(t, err, "Receive after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } cc := conn.(*Conn) if assert.Error(t, cc.Bind("A"), "Bind after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } if assert.Error(t, cc.ReadOnly(), "ReadOnly after Close") { assert.Contains(t, err.Error(), "redisc: closed", "expected message") } } func TestIsRedisError(t *testing.T) { err := error(redis.Error("CROSSSLOT some message")) assert.True(t, IsCrossSlot(err), "CrossSlot") assert.False(t, IsTryAgain(err), "CrossSlot") err = redis.Error("TRYAGAIN some message") assert.False(t, IsCrossSlot(err), "TryAgain") assert.True(t, IsTryAgain(err), "TryAgain") err = io.EOF assert.False(t, IsCrossSlot(err), "EOF") assert.False(t, IsTryAgain(err), "EOF") err = redis.Error("ERR some error") assert.False(t, IsCrossSlot(err), "ERR") assert.False(t, IsTryAgain(err), "ERR") }