src.dualinventive.com/go/lib/kv/rediskv/rediskv_cluster.go

70 lines
1.6 KiB
Go

package rediskv
import (
"fmt"
"time"
"github.com/gomodule/redigo/redis"
"github.com/mna/redisc"
)
type clusterConn struct {
rc *redisc.Cluster
}
func clusterConnCreatePool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
return &redis.Pool{
MaxIdle: 5,
MaxActive: 10,
IdleTimeout: time.Minute,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr, opts...)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}, nil
}
func newClusterConn(hosts []string) (redis.Conn, error) {
rc := redisc.Cluster{
StartupNodes: hosts,
DialOptions: []redis.DialOption{redis.DialConnectTimeout(3 * time.Second), redis.DialReadTimeout(5 * time.Second)},
CreatePool: clusterConnCreatePool,
}
// Update the cluster nodes inventory based on the first node
if err := rc.Refresh(); err != nil {
return nil, err
}
return &clusterConn{rc: &rc}, nil
}
func (cc *clusterConn) Close() error {
return cc.rc.Close()
}
func (cc *clusterConn) Err() error {
return nil
}
func (cc *clusterConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
c := cc.rc.Get()
defer c.Close() //nolint: errcheck
return c.Do(commandName, args...)
}
func (cc *clusterConn) Send(commandName string, args ...interface{}) error {
return fmt.Errorf("rediskv: Send not supported for clusterConn")
}
func (cc *clusterConn) Flush() error {
return fmt.Errorf("rediskv: Flush not supported for clusterConn")
}
func (cc *clusterConn) Receive() (reply interface{}, err error) {
return nil, fmt.Errorf("rediskv: Receive not supported for clusterConn")
}