src.dualinventive.com/go/websocketserver/vendor/github.com/mna/redisc/redistest/server.go

412 lines
11 KiB
Go

// Package redistest provides test helpers to manage a redis server.
package redistest
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"errors"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/require"
)
// ErrTimeout is the error when redis-server is TCP unreachable within its
// request timeout
var ErrTimeout = errors.New("timeout")
// ClusterConfig is the configuration to use for servers started in
// redis-cluster mode. The value must contain a single reference to
// a string placeholder (%s), the port number.
var ClusterConfig = `
port %s
cluster-enabled yes
cluster-config-file nodes.%[1]s.conf
cluster-node-timeout 5000
appendonly no
`
// NumStartupRetries is the number of retries to start a redis-server
// possible due to error "address already in use".
const NumStartupRetries = 100
// NumClusterNodes is the number of nodes started in a test cluster.
// When a cluster is started with replicas, there is 1 replica per
// cluster node, so the total number of nodes is NumClusterNodes * 2.
const NumClusterNodes = 3
// StartServer starts a redis-server instance on a free port.
// It returns the started *exec.Cmd and the port used. The caller
// should make sure to stop the command. If the redis-server
// command is not found in the PATH, the test is skipped.
//
// If w is not nil, both stdout and stderr of the server are
// written to it. If a configuration is specified, it is supplied
// to the server via stdin.
func StartServer(t *testing.T, w io.Writer, conf string) (*exec.Cmd, string) {
if _, err := exec.LookPath("redis-server"); err != nil {
t.Skip("redis-server not found in $PATH")
}
port := getFreePort(t)
// nolint
cmd, _ := startServerWithConfig(t, port, w, conf)
return cmd, port
}
// StartClusterWithReplicas starts a redis cluster of NumClusterNodes with
// 1 replica each. It returns the cleanup function to call after use
// (typically in a defer) and the list of ports for each node,
// masters first, then replicas.
func StartClusterWithReplicas(t *testing.T, w io.Writer) (func(), []string) {
fn, ports := StartCluster(t, w)
mapping := getClusterNodeIDs(t, ports...)
var replicaPorts []string
var replicaCmds []*exec.Cmd
replicaMaster := make(map[string]string)
for _, master := range ports {
port := getClusterFreePort(t)
// nolint
cmd, _ := startServerWithConfig(t, port, w, fmt.Sprintf(ClusterConfig, port))
joinCluster(t, port, master)
replicaPorts = append(replicaPorts, port)
replicaCmds = append(replicaCmds, cmd)
replicaMaster[port] = master
}
// wait for the cluster to stabilize
require.True(t, waitForCluster(t, 10*time.Second, replicaPorts...), "wait for cluster replicas")
for _, port := range replicaPorts {
setupReplica(t, port, mapping[replicaMaster[port]])
}
// wait for replicas to join
require.True(t, waitForReplicas(t, 10*time.Second, append(ports, replicaPorts...)...), "wait for cluster replicas")
return func() {
for _, c := range replicaCmds {
c.Process.Kill()
}
for _, port := range replicaPorts {
if strings.HasPrefix(port, ":") {
port = port[1:]
}
os.Remove(filepath.Join(os.TempDir(), fmt.Sprintf("nodes.%s.conf", port)))
}
fn()
}, append(ports, replicaPorts...)
}
// StartCluster starts a redis cluster of NumClusterNodes using the
// ClusterConfig variable as configuration. If w is not nil,
// stdout and stderr of each node will be written to it.
//
// It returns a function that should be called after the test
// (typically in a defer), and the list of ports for all nodes
// in the cluster.
func StartCluster(t *testing.T, w io.Writer) (func(), []string) {
if _, err := exec.LookPath("redis-server"); err != nil {
t.Skip("redis-server not found in $PATH")
}
const hashSlots = 16384
cmds := make([]*exec.Cmd, NumClusterNodes)
ports := make([]string, NumClusterNodes)
slotsPerNode := hashSlots / NumClusterNodes
for i := 0; i < NumClusterNodes; i++ {
port := getClusterFreePort(t)
retries := NumStartupRetries
conf := fmt.Sprintf(ClusterConfig, port)
for {
cmd, err := startServerWithConfig(t, port, w, conf)
if err != nil {
if err == ErrTimeout {
retries--
require.NotZero(t, retries, "start redis-server failed all retries")
} else {
require.Nil(t, err)
}
} else {
cmds[i], ports[i] = cmd, port
break
}
}
// configure the cluster - add the slots and join
var meetPort string
if i > 0 {
meetPort = ports[i-1]
}
countSlots := slotsPerNode
if i == NumClusterNodes-1 {
// add all remaining slots in the last node
countSlots = hashSlots - (i * slotsPerNode)
}
setupClusterNode(t, port, i*slotsPerNode, countSlots)
if meetPort != "" {
joinCluster(t, port, meetPort)
}
}
// wait for the cluster to catch up
require.True(t, waitForCluster(t, 10*time.Second, ports...), "wait for cluster")
return func() {
for _, c := range cmds {
c.Process.Kill()
}
for _, port := range ports {
if strings.HasPrefix(port, ":") {
port = port[1:]
}
os.Remove(filepath.Join(os.TempDir(), fmt.Sprintf("nodes.%s.conf", port)))
}
}, ports
}
func printClusterNodes(t *testing.T, port string) {
conn, err := redis.Dial("tcp", ":"+port)
require.NoError(t, err, "Dial to cluster node")
defer conn.Close()
res, err := conn.Do("CLUSTER", "NODES")
require.NoError(t, err, "CLUSTER NODES")
fmt.Println(string(res.([]byte)))
}
func printClusterSlots(t *testing.T, port string) {
conn, err := redis.Dial("tcp", ":"+port)
require.NoError(t, err, "Dial to cluster node")
defer conn.Close()
cmd := exec.Command("redis-cli", "-p", port, "CLUSTER", "SLOTS")
b, err := cmd.CombinedOutput()
require.NoError(t, err, "CLUSTER SLOTS via redis-cli")
fmt.Println(string(b))
}
func joinCluster(t *testing.T, nodePort, clusterPort string) {
conn, err := redis.Dial("tcp", ":"+nodePort)
require.NoError(t, err, "Dial to node")
defer conn.Close()
// join the cluster
_, err = conn.Do("CLUSTER", "MEET", "127.0.0.1", clusterPort)
require.NoError(t, err, "CLUSTER MEET")
}
func getClusterNodeIDs(t *testing.T, ports ...string) map[string]string {
if len(ports) == 0 {
return nil
}
conn, err := redis.Dial("tcp", ":"+ports[0])
require.NoError(t, err, "Dial to node")
defer conn.Close()
nodes, err := redis.String(conn.Do("CLUSTER", "NODES"))
require.NoError(t, err, "CLUSTER NODES")
mapping := make(map[string]string)
s := bufio.NewScanner(strings.NewReader(nodes))
for s.Scan() {
fields := strings.Fields(s.Text())
addrField := fields[1]
if ix := strings.Index(addrField, "@"); ix >= 0 {
addrField = addrField[:ix]
}
for _, port := range ports {
if addrField == "127.0.0.1:"+port {
mapping[port] = fields[0]
break
}
}
}
require.Equal(t, len(ports), len(mapping), "Find IDs for all ports")
return mapping
}
func setupReplica(t *testing.T, replicaPort, masterID string) {
conn, err := redis.Dial("tcp", ":"+replicaPort)
require.NoError(t, err, "Dial to replica node")
defer conn.Close()
_, err = conn.Do("CLUSTER", "REPLICATE", masterID)
require.NoError(t, err, "CLUSTER REPLICATE")
}
func setupClusterNode(t *testing.T, port string, start, count int) {
conn, err := redis.Dial("tcp", ":"+port)
require.NoError(t, err, "Dial to cluster node")
defer conn.Close()
args := redis.Args{"ADDSLOTS"}
for i := start; i < start+count; i++ {
args = args.Add(i)
}
_, err = conn.Do("CLUSTER", args...)
require.NoError(t, err, "CLUSTER ADDSLOTS")
}
func waitForReplicas(t *testing.T, timeout time.Duration, ports ...string) bool {
deadline := time.Now().Add(timeout)
for _, port := range ports {
conn, err := redis.Dial("tcp", ":"+port)
require.NoError(t, err, "Dial")
for time.Now().Before(deadline) {
v, err := redis.String(conn.Do("CLUSTER", "NODES"))
require.NoError(t, err, "CLUSTER NODES")
ms, rs := 0, 0
s := bufio.NewScanner(strings.NewReader(v))
for s.Scan() {
fields := strings.Fields(s.Text())
if fields[7] == "connected" {
if strings.Contains(fields[2], "master") {
ms++
} else {
rs++
}
}
}
if ms == NumClusterNodes && rs == NumClusterNodes {
break
}
time.Sleep(100 * time.Millisecond)
}
conn.Close()
if time.Now().After(deadline) {
return false
}
}
return true
}
func waitForCluster(t *testing.T, timeout time.Duration, ports ...string) bool {
deadline := time.Now().Add(timeout)
for _, port := range ports {
conn, err := redis.Dial("tcp", ":"+port)
require.NoError(t, err, "Dial")
for time.Now().Before(deadline) {
vals, err := redis.Bytes(conn.Do("CLUSTER", "INFO"))
require.NoError(t, err, "CLUSTER INFO")
if bytes.Contains(vals, []byte("cluster_state:ok")) {
break
}
time.Sleep(100 * time.Millisecond)
}
conn.Close()
if time.Now().After(deadline) {
return false
}
}
return true
}
func startServerWithConfig(t *testing.T, port string, w io.Writer, conf string) (*exec.Cmd, error) {
var args []string
if conf == "" {
args = []string{"--port", port}
} else {
args = []string{"-"}
}
c := exec.Command("redis-server", args...)
c.Dir = os.TempDir()
if w != nil {
c.Stderr = w
c.Stdout = w
}
if conf != "" {
c.Stdin = strings.NewReader(conf)
}
// start the server
require.NoError(t, c.Start(), "start redis-server")
// wait for the server to start accepting connections
started := waitForPort(port, 10*time.Second)
if !started {
t.Logf("wait for redis-server timeout")
return nil, ErrTimeout
}
t.Logf("redis-server started on port %s", port)
return c, nil
}
func waitForPort(port string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", ":"+port, time.Second)
if err == nil {
conn.Close()
return true
}
time.Sleep(10 * time.Millisecond)
}
return false
}
func getClusterFreePort(t *testing.T) string {
const maxPort = 55535
// the port number in a redis-cluster must be below 55535 because
// the nodes communicate with others on port p+10000. Try to get
// lucky and subtract 10000 from the random port received if it
// is too high.
port := getFreePort(t)
if n, _ := strconv.Atoi(port); n >= maxPort {
port = strconv.Itoa(n - 10000)
}
return port
}
func getFreePort(t *testing.T) string {
l, err := net.Listen("tcp", ":0")
require.NoError(t, err, "listen on port 0")
defer l.Close()
_, p, err := net.SplitHostPort(l.Addr().String())
require.NoError(t, err, "parse host and port")
return p
}
// NewPool creates a redis pool to return connections on the specified
// addr.
func NewPool(t *testing.T, addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 2,
MaxActive: 10,
IdleTimeout: time.Minute,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}