src.dualinventive.com/go/redis-proxy/internal/redis/redis.go

185 lines
4.7 KiB
Go

package redis
import (
"context"
"errors"
"hash/crc64"
"sync"
"time"
"github.com/dualinventive/go-lruttl"
"github.com/gomodule/redigo/redis"
"github.com/mna/redisc"
"src.dualinventive.com/go/dinet/rpc"
"src.dualinventive.com/go/lib/dilog"
)
// Redis realtime status subscriber
type Redis struct {
logger dilog.Logger
rc *redisc.Cluster
c chan *rpc.Msg
cacheMu sync.Mutex
// cache keeps track of the last update of every Redis record
cache *lruttl.Cache
crc64Table *crc64.Table
}
func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
return &redis.Pool{
MaxIdle: 5,
MaxActive: 10,
IdleTimeout: time.Minute,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr, opts...)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}, nil
}
// New creates a new Redis realtime status instance.
func New(logger dilog.Logger, uris []string, connectionTimeout time.Duration, cacheMaxItems int,
cacheExpiry time.Duration) (*Redis, error) {
rc := redisc.Cluster{
StartupNodes: uris,
DialOptions: []redis.DialOption{redis.DialConnectTimeout(connectionTimeout)},
CreatePool: createPool,
}
// Update the cluster nodes inventory based on the first node
if err := rc.Refresh(); err != nil {
return nil, err
}
r := &Redis{
logger: logger,
rc: &rc,
c: make(chan *rpc.Msg),
cache: lruttl.New(cacheMaxItems, cacheExpiry),
crc64Table: crc64.MakeTable(crc64.ISO),
}
return r, nil
}
// Send sends a message to redis. It is processed internally by another go routine.
func (r *Redis) Send(m *rpc.Msg) error {
r.c <- m
return nil
}
// Process processes all incomming write messages. This method blocks until the context is invalid.
func (r *Redis) Process(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m := <-r.c:
logger := r.logger.WithFields(dilog.Fields{
"device:uid": m.DeviceUID,
"project:id": m.ProjectID,
"classmethod": m.ClassMethod,
"type": m.Type,
})
if err := r.process(logger, m); err != nil {
logger.WithError(err).Error("Unable to process RPC message")
}
}
}
}
// process processes a single rpc message
func (r *Redis) process(logger dilog.Logger, msg *rpc.Msg) error {
// Ignore error messages
if msg.Error != nil {
return nil
}
if ok, err := r.shouldProcess(msg); !ok {
return err
}
// Sensor data
if msg.ClassMethod == rpc.ClassMethodSensorData {
var resValueItems []rpc.ResultValueItem
if err := msg.Result.Unmarshal(&resValueItems); err != nil {
return err
}
if err := r.processSensorData(logger, msg, resValueItems); err != nil {
return err
}
return r.publish(msg)
}
var valueItems []map[string]interface{}
// All other class:method versions with result
if err := msg.Result.Unmarshal(&valueItems); err == nil {
if err = r.processGeneralClassMethod(logger, msg, valueItems); err != nil {
return err
}
return r.publish(msg)
}
// All other class:method versions without result
result := make(map[string]interface{})
result["time"] = msg.Time
mKeyName := keyName(msg)
if mKeyName == "" {
logger.Warning("Redis.process keyName is empty")
return nil
}
if err := r.processMsgResult(logger, mKeyName, fieldName(msg, 0), result); err != nil {
return err
}
return r.publish(msg)
}
func (r *Redis) shouldProcess(msg *rpc.Msg) (bool, error) {
switch msg.Class() {
case rpc.ClassLog, rpc.ClassMessage:
// Drop log and message messages
return false, nil
case rpc.ClassProject:
// Project class must contain a project ID
if !rpc.ValidProjectID(msg.ProjectID) {
return false, errors.New("project contains no projectID")
}
// Project class must contain a result
if msg.Result.IsEmpty() {
return false, errors.New("project contains no result")
}
default:
// All other methods must contain a device UID or project ID
if !rpc.ValidDeviceUID(msg.DeviceUID) && !rpc.ValidProjectID(msg.ProjectID) {
return false, errors.New("message has invalid deviceUID or projectID")
}
}
// Drop request messages
if msg.Type == rpc.MsgTypeRequest {
return false, nil
}
return true, nil
}
// getConn gets a new connection from the cluster
// it configures it as a retry connection. With 3 retries and 1s timeout delay.
func (r *Redis) getConn() (redis.Conn, error) {
conn := r.rc.Get()
// create the retry connection - only Do, Close and Err are
// supported on that connection. It will make up to 3 attempts
// to get a valid response, and will wait 100ms before a retry
// in case of a TRYAGAIN redis error.
return redisc.RetryConn(conn, 3, time.Second)
}
// Close the connection to redis
func (r *Redis) Close() error {
return r.rc.Close()
}