src.dualinventive.com/go/websocketserver/internal/redis/redis.go

159 lines
3.7 KiB
Go

package redis
import (
"sync"
"time"
"github.com/gomodule/redigo/redis"
"github.com/mna/redisc"
"github.com/sirupsen/logrus"
)
// Redis realtime status subscriber
type Redis struct {
isClosed chan bool
mu sync.RWMutex
wg sync.WaitGroup
rc *redisc.Cluster
evcache eventCache // Publish event cache
}
// getPubSubConn gets a new connection from the cluster and subscribes to ChanDevice and ChanProject
func (r *Redis) getPubSubConn() redis.PubSubConn {
conn := r.rc.Get()
psc := redis.PubSubConn{Conn: conn}
psc.Subscribe(ChanDevice, ChanProject)
return psc
}
// getConn gets a new connection from the cluster and sets it to READONLY
// it configures it as a retry connection. With 3 retries and 1sec timeout delay.
func (r *Redis) getConn() (redis.Conn, error) {
conn := r.rc.Get()
// create the retry connection - only Do, Close and Err are
// supported on that connection. It will make up to 3 attempts
// to get a valid response, and will wait 1sec before a retry
// in case of a TRYAGAIN redis error.
c, err := redisc.RetryConn(conn, 3, time.Second)
if err != nil {
return nil, err
}
redisc.ReadOnlyConn(c)
return c, nil
}
// servePublishEvents processes all publish events and adds them to the cache
func (r *Redis) servePublishEvents() {
defer r.wg.Done()
psc := r.getPubSubConn()
for {
switch msg := psc.Receive().(type) {
case redis.Message:
r.evcache.Add(msg.Channel, string(msg.Data))
case error:
// Reconnect a new PUBSUB connection on error
psc.Close()
// Check if servePublishEvents loop must be closed due to Close call
select {
case <-r.isClosed:
logrus.Info("redis: closing pubsub event loop")
return
default:
}
logrus.Warn("redis pubsub error:", msg, " (reconnecting...)")
time.Sleep(time.Second)
psc = r.getPubSubConn()
case redis.Subscription:
// Drop message
case redis.Pong:
// Drop message
default:
logrus.Warn("redis unexpected pubsub msg:", msg)
}
}
}
func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
return &redis.Pool{
MaxIdle: 5,
MaxActive: 10,
IdleTimeout: time.Minute,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr, opts...)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}, nil
}
// New creates a new Redis realtime status instance. It subscribes to publishes on ChanDevice and ChanProject.
func New(URIs []string) (*Redis, error) {
rc := redisc.Cluster{
StartupNodes: URIs,
DialOptions: []redis.DialOption{redis.DialConnectTimeout(3 * time.Second), redis.DialReadTimeout(5 * time.Second)},
CreatePool: createPool,
}
if err := rc.Refresh(); err != nil {
return nil, err
}
r := &Redis{
rc: &rc,
isClosed: make(chan bool),
}
r.evcache.Flush()
r.wg.Add(1)
go r.servePublishEvents()
return r, nil
}
// ReadEvents reads all current caches keyspace events
func (r *Redis) ReadEvents() []Event {
return r.evcache.Flush()
}
// HGetAll gets all keys with values
func (r *Redis) HGetAll(key string) (map[string]string, error) {
c, err := r.getConn()
if err != nil {
return nil, err
}
res, err := redis.StringMap(c.Do("HGETALL", key))
c.Close()
return res, err
}
// HMGet gets all key values fields
// fields which don't exist in Redis will result in empty string entries
func (r *Redis) HMGet(key string, fields []string) ([]string, error) {
c, err := r.getConn()
if err != nil {
return nil, err
}
res, err := redis.Strings(c.Do("HMGET", redis.Args{}.Add(key).AddFlat(fields)...))
c.Close()
return res, err
}
// Close closes all the connections to Redis
func (r *Redis) Close() error {
close(r.isClosed)
r.mu.Lock()
err := r.rc.Close()
r.wg.Wait()
r.mu.Unlock()
return err
}