70 lines
1.6 KiB
Go
70 lines
1.6 KiB
Go
package rediskv
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
"github.com/mna/redisc"
|
|
)
|
|
|
|
type clusterConn struct {
|
|
rc *redisc.Cluster
|
|
}
|
|
|
|
func clusterConnCreatePool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
|
|
return &redis.Pool{
|
|
MaxIdle: 5,
|
|
MaxActive: 10,
|
|
IdleTimeout: time.Minute,
|
|
Dial: func() (redis.Conn, error) {
|
|
return redis.Dial("tcp", addr, opts...)
|
|
},
|
|
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
_, err := c.Do("PING")
|
|
return err
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func newClusterConn(hosts []string) (redis.Conn, error) {
|
|
rc := redisc.Cluster{
|
|
StartupNodes: hosts,
|
|
DialOptions: []redis.DialOption{redis.DialConnectTimeout(3 * time.Second), redis.DialReadTimeout(5 * time.Second)},
|
|
CreatePool: clusterConnCreatePool,
|
|
}
|
|
|
|
// Update the cluster nodes inventory based on the first node
|
|
if err := rc.Refresh(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &clusterConn{rc: &rc}, nil
|
|
}
|
|
|
|
func (cc *clusterConn) Close() error {
|
|
return cc.rc.Close()
|
|
}
|
|
|
|
func (cc *clusterConn) Err() error {
|
|
return nil
|
|
}
|
|
|
|
func (cc *clusterConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
|
c := cc.rc.Get()
|
|
defer c.Close() //nolint: errcheck
|
|
return c.Do(commandName, args...)
|
|
}
|
|
|
|
func (cc *clusterConn) Send(commandName string, args ...interface{}) error {
|
|
return fmt.Errorf("rediskv: Send not supported for clusterConn")
|
|
}
|
|
|
|
func (cc *clusterConn) Flush() error {
|
|
return fmt.Errorf("rediskv: Flush not supported for clusterConn")
|
|
}
|
|
|
|
func (cc *clusterConn) Receive() (reply interface{}, err error) {
|
|
return nil, fmt.Errorf("rediskv: Receive not supported for clusterConn")
|
|
}
|