package rediskv import ( "fmt" "time" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" ) type clusterConn struct { rc *redisc.Cluster } func clusterConnCreatePool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { return &redis.Pool{ MaxIdle: 5, MaxActive: 10, IdleTimeout: time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr, opts...) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil } func newClusterConn(hosts []string) (redis.Conn, error) { rc := redisc.Cluster{ StartupNodes: hosts, DialOptions: []redis.DialOption{redis.DialConnectTimeout(3 * time.Second), redis.DialReadTimeout(5 * time.Second)}, CreatePool: clusterConnCreatePool, } // Update the cluster nodes inventory based on the first node if err := rc.Refresh(); err != nil { return nil, err } return &clusterConn{rc: &rc}, nil } func (cc *clusterConn) Close() error { return cc.rc.Close() } func (cc *clusterConn) Err() error { return nil } func (cc *clusterConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { c := cc.rc.Get() defer c.Close() //nolint: errcheck return c.Do(commandName, args...) } func (cc *clusterConn) Send(commandName string, args ...interface{}) error { return fmt.Errorf("rediskv: Send not supported for clusterConn") } func (cc *clusterConn) Flush() error { return fmt.Errorf("rediskv: Flush not supported for clusterConn") } func (cc *clusterConn) Receive() (reply interface{}, err error) { return nil, fmt.Errorf("rediskv: Receive not supported for clusterConn") }