134 lines
3.3 KiB
Go
134 lines
3.3 KiB
Go
package redisc
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
)
|
|
|
|
// RetryConn wraps the connection c (which must be a *Conn)
|
|
// into a connection that automatically handles cluster redirections
|
|
// (MOVED and ASK replies) and retries for TRYAGAIN errors.
|
|
// Only Do, Close and Err can be called on that connection,
|
|
// all other methods return an error.
|
|
//
|
|
// The maxAtt parameter indicates the maximum number of attempts
|
|
// to successfully execute the command. The tryAgainDelay is the
|
|
// duration to wait before retrying a TRYAGAIN error.
|
|
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
|
|
cc, ok := c.(*Conn)
|
|
if !ok {
|
|
return nil, errors.New("redisc: connection is not a *Conn")
|
|
}
|
|
return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
|
|
}
|
|
|
|
type retryConn struct {
|
|
c *Conn
|
|
|
|
maxAttempts int
|
|
tryAgainDelay time.Duration
|
|
}
|
|
|
|
func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
|
return rc.do(cmd, args...)
|
|
}
|
|
|
|
func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
|
|
var att int
|
|
var asking bool
|
|
|
|
cluster := rc.c.cluster
|
|
for rc.maxAttempts <= 0 || att < rc.maxAttempts {
|
|
if asking {
|
|
if err := rc.c.Send("ASKING"); err != nil {
|
|
return nil, err
|
|
}
|
|
asking = false
|
|
}
|
|
|
|
v, err := rc.c.Do(cmd, args...)
|
|
re := ParseRedir(err)
|
|
if re == nil {
|
|
if IsTryAgain(err) {
|
|
// handle retry
|
|
time.Sleep(rc.tryAgainDelay)
|
|
att++
|
|
continue
|
|
}
|
|
|
|
// not a retry error nor a redirection, return result
|
|
return v, err
|
|
}
|
|
|
|
// handle redirection
|
|
rc.c.mu.Lock()
|
|
readOnly := rc.c.readOnly
|
|
connAddr := rc.c.boundAddr
|
|
rc.c.mu.Unlock()
|
|
if readOnly {
|
|
// check if the connection was already made to that slot, meaning
|
|
// that the redirection is because the command can't be served
|
|
// by the replica and a non-readonly connection must be made to
|
|
// the slot's master. If that's not the case, then keep the
|
|
// readonly flag to true, meaning that it will attempt a connection
|
|
// to a replica for the new slot.
|
|
cluster.mu.Lock()
|
|
slotMappings := cluster.mapping[re.NewSlot]
|
|
cluster.mu.Unlock()
|
|
if isIn(slotMappings, connAddr) {
|
|
readOnly = false
|
|
}
|
|
}
|
|
|
|
// forceDial doesn't require locking (immutable)
|
|
conn, addr, err := cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
|
|
if err != nil {
|
|
// could not get connection to that node, return that error
|
|
return nil, err
|
|
}
|
|
|
|
rc.c.mu.Lock()
|
|
// close and replace the old connection (close must come before assignments)
|
|
rc.c.closeLocked()
|
|
rc.c.rc = conn
|
|
rc.c.boundAddr = addr
|
|
rc.c.readOnly = readOnly
|
|
rc.c.mu.Unlock()
|
|
|
|
asking = re.Type == "ASK"
|
|
att++
|
|
}
|
|
return nil, errors.New("redisc: too many attempts")
|
|
}
|
|
|
|
func (rc *retryConn) Err() error {
|
|
return rc.c.Err()
|
|
}
|
|
|
|
func (rc *retryConn) Close() error {
|
|
return rc.c.Close()
|
|
}
|
|
|
|
func (rc *retryConn) Send(cmd string, args ...interface{}) error {
|
|
return errors.New("redisc: unsupported call to Send")
|
|
}
|
|
|
|
func (rc *retryConn) Receive() (interface{}, error) {
|
|
return nil, errors.New("redisc: unsupported call to Receive")
|
|
}
|
|
|
|
func (rc *retryConn) Flush() error {
|
|
return errors.New("redisc: unsupported call to Flush")
|
|
}
|
|
|
|
func isIn(list []string, v string) bool {
|
|
for _, vv := range list {
|
|
if v == vv {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|