159 lines
3.7 KiB
Go
159 lines
3.7 KiB
Go
package redis
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
"github.com/mna/redisc"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Redis realtime status subscriber
|
|
type Redis struct {
|
|
isClosed chan bool
|
|
mu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
rc *redisc.Cluster
|
|
evcache eventCache // Publish event cache
|
|
}
|
|
|
|
// getPubSubConn gets a new connection from the cluster and subscribes to ChanDevice and ChanProject
|
|
func (r *Redis) getPubSubConn() redis.PubSubConn {
|
|
conn := r.rc.Get()
|
|
|
|
psc := redis.PubSubConn{Conn: conn}
|
|
psc.Subscribe(ChanDevice, ChanProject)
|
|
return psc
|
|
}
|
|
|
|
// getConn gets a new connection from the cluster and sets it to READONLY
|
|
// it configures it as a retry connection. With 3 retries and 1sec timeout delay.
|
|
func (r *Redis) getConn() (redis.Conn, error) {
|
|
conn := r.rc.Get()
|
|
|
|
// create the retry connection - only Do, Close and Err are
|
|
// supported on that connection. It will make up to 3 attempts
|
|
// to get a valid response, and will wait 1sec before a retry
|
|
// in case of a TRYAGAIN redis error.
|
|
c, err := redisc.RetryConn(conn, 3, time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
redisc.ReadOnlyConn(c)
|
|
return c, nil
|
|
}
|
|
|
|
// servePublishEvents processes all publish events and adds them to the cache
|
|
func (r *Redis) servePublishEvents() {
|
|
defer r.wg.Done()
|
|
|
|
psc := r.getPubSubConn()
|
|
|
|
for {
|
|
switch msg := psc.Receive().(type) {
|
|
case redis.Message:
|
|
r.evcache.Add(msg.Channel, string(msg.Data))
|
|
case error:
|
|
// Reconnect a new PUBSUB connection on error
|
|
psc.Close()
|
|
|
|
// Check if servePublishEvents loop must be closed due to Close call
|
|
select {
|
|
case <-r.isClosed:
|
|
logrus.Info("redis: closing pubsub event loop")
|
|
return
|
|
default:
|
|
}
|
|
|
|
logrus.Warn("redis pubsub error:", msg, " (reconnecting...)")
|
|
time.Sleep(time.Second)
|
|
psc = r.getPubSubConn()
|
|
case redis.Subscription:
|
|
// Drop message
|
|
case redis.Pong:
|
|
// Drop message
|
|
default:
|
|
logrus.Warn("redis unexpected pubsub msg:", msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
|
|
return &redis.Pool{
|
|
MaxIdle: 5,
|
|
MaxActive: 10,
|
|
IdleTimeout: time.Minute,
|
|
Dial: func() (redis.Conn, error) {
|
|
return redis.Dial("tcp", addr, opts...)
|
|
},
|
|
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
_, err := c.Do("PING")
|
|
return err
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// New creates a new Redis realtime status instance. It subscribes to publishes on ChanDevice and ChanProject.
|
|
func New(URIs []string) (*Redis, error) {
|
|
rc := redisc.Cluster{
|
|
StartupNodes: URIs,
|
|
DialOptions: []redis.DialOption{redis.DialConnectTimeout(3 * time.Second), redis.DialReadTimeout(5 * time.Second)},
|
|
CreatePool: createPool,
|
|
}
|
|
|
|
if err := rc.Refresh(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := &Redis{
|
|
rc: &rc,
|
|
isClosed: make(chan bool),
|
|
}
|
|
r.evcache.Flush()
|
|
|
|
r.wg.Add(1)
|
|
go r.servePublishEvents()
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// ReadEvents reads all current caches keyspace events
|
|
func (r *Redis) ReadEvents() []Event {
|
|
return r.evcache.Flush()
|
|
}
|
|
|
|
// HGetAll gets all keys with values
|
|
func (r *Redis) HGetAll(key string) (map[string]string, error) {
|
|
c, err := r.getConn()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res, err := redis.StringMap(c.Do("HGETALL", key))
|
|
c.Close()
|
|
return res, err
|
|
}
|
|
|
|
// HMGet gets all key values fields
|
|
// fields which don't exist in Redis will result in empty string entries
|
|
func (r *Redis) HMGet(key string, fields []string) ([]string, error) {
|
|
c, err := r.getConn()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res, err := redis.Strings(c.Do("HMGET", redis.Args{}.Add(key).AddFlat(fields)...))
|
|
c.Close()
|
|
return res, err
|
|
}
|
|
|
|
// Close closes all the connections to Redis
|
|
func (r *Redis) Close() error {
|
|
close(r.isClosed)
|
|
r.mu.Lock()
|
|
err := r.rc.Close()
|
|
r.wg.Wait()
|
|
r.mu.Unlock()
|
|
return err
|
|
}
|