src.dualinventive.com/go/redis-proxy/vendor/github.com/mna/redisc/retry_conn.go

134 lines
3.3 KiB
Go

package redisc
import (
"errors"
"time"
"github.com/gomodule/redigo/redis"
)
// RetryConn wraps the connection c (which must be a *Conn)
// into a connection that automatically handles cluster redirections
// (MOVED and ASK replies) and retries for TRYAGAIN errors.
// Only Do, Close and Err can be called on that connection,
// all other methods return an error.
//
// The maxAtt parameter indicates the maximum number of attempts
// to successfully execute the command. The tryAgainDelay is the
// duration to wait before retrying a TRYAGAIN error.
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
cc, ok := c.(*Conn)
if !ok {
return nil, errors.New("redisc: connection is not a *Conn")
}
return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
}
type retryConn struct {
c *Conn
maxAttempts int
tryAgainDelay time.Duration
}
func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
return rc.do(cmd, args...)
}
func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
var att int
var asking bool
cluster := rc.c.cluster
for rc.maxAttempts <= 0 || att < rc.maxAttempts {
if asking {
if err := rc.c.Send("ASKING"); err != nil {
return nil, err
}
asking = false
}
v, err := rc.c.Do(cmd, args...)
re := ParseRedir(err)
if re == nil {
if IsTryAgain(err) {
// handle retry
time.Sleep(rc.tryAgainDelay)
att++
continue
}
// not a retry error nor a redirection, return result
return v, err
}
// handle redirection
rc.c.mu.Lock()
readOnly := rc.c.readOnly
connAddr := rc.c.boundAddr
rc.c.mu.Unlock()
if readOnly {
// check if the connection was already made to that slot, meaning
// that the redirection is because the command can't be served
// by the replica and a non-readonly connection must be made to
// the slot's master. If that's not the case, then keep the
// readonly flag to true, meaning that it will attempt a connection
// to a replica for the new slot.
cluster.mu.Lock()
slotMappings := cluster.mapping[re.NewSlot]
cluster.mu.Unlock()
if isIn(slotMappings, connAddr) {
readOnly = false
}
}
// forceDial doesn't require locking (immutable)
conn, addr, err := cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
if err != nil {
// could not get connection to that node, return that error
return nil, err
}
rc.c.mu.Lock()
// close and replace the old connection (close must come before assignments)
rc.c.closeLocked()
rc.c.rc = conn
rc.c.boundAddr = addr
rc.c.readOnly = readOnly
rc.c.mu.Unlock()
asking = re.Type == "ASK"
att++
}
return nil, errors.New("redisc: too many attempts")
}
func (rc *retryConn) Err() error {
return rc.c.Err()
}
func (rc *retryConn) Close() error {
return rc.c.Close()
}
func (rc *retryConn) Send(cmd string, args ...interface{}) error {
return errors.New("redisc: unsupported call to Send")
}
func (rc *retryConn) Receive() (interface{}, error) {
return nil, errors.New("redisc: unsupported call to Receive")
}
func (rc *retryConn) Flush() error {
return errors.New("redisc: unsupported call to Flush")
}
func isIn(list []string, v string) bool {
for _, vv := range list {
if v == vv {
return true
}
}
return false
}