185 lines
4.7 KiB
Go
185 lines
4.7 KiB
Go
package redis
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"hash/crc64"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dualinventive/go-lruttl"
|
|
"github.com/gomodule/redigo/redis"
|
|
"github.com/mna/redisc"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
"src.dualinventive.com/go/lib/dilog"
|
|
)
|
|
|
|
// Redis realtime status subscriber
|
|
type Redis struct {
|
|
logger dilog.Logger
|
|
rc *redisc.Cluster
|
|
c chan *rpc.Msg
|
|
cacheMu sync.Mutex
|
|
// cache keeps track of the last update of every Redis record
|
|
cache *lruttl.Cache
|
|
crc64Table *crc64.Table
|
|
}
|
|
|
|
func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
|
|
return &redis.Pool{
|
|
MaxIdle: 5,
|
|
MaxActive: 10,
|
|
IdleTimeout: time.Minute,
|
|
Dial: func() (redis.Conn, error) {
|
|
return redis.Dial("tcp", addr, opts...)
|
|
},
|
|
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
_, err := c.Do("PING")
|
|
return err
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// New creates a new Redis realtime status instance.
|
|
func New(logger dilog.Logger, uris []string, connectionTimeout time.Duration, cacheMaxItems int,
|
|
cacheExpiry time.Duration) (*Redis, error) {
|
|
rc := redisc.Cluster{
|
|
StartupNodes: uris,
|
|
DialOptions: []redis.DialOption{redis.DialConnectTimeout(connectionTimeout)},
|
|
CreatePool: createPool,
|
|
}
|
|
|
|
// Update the cluster nodes inventory based on the first node
|
|
if err := rc.Refresh(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := &Redis{
|
|
logger: logger,
|
|
rc: &rc,
|
|
c: make(chan *rpc.Msg),
|
|
cache: lruttl.New(cacheMaxItems, cacheExpiry),
|
|
crc64Table: crc64.MakeTable(crc64.ISO),
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Send sends a message to redis. It is processed internally by another go routine.
|
|
func (r *Redis) Send(m *rpc.Msg) error {
|
|
r.c <- m
|
|
return nil
|
|
}
|
|
|
|
// Process processes all incomming write messages. This method blocks until the context is invalid.
|
|
func (r *Redis) Process(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case m := <-r.c:
|
|
logger := r.logger.WithFields(dilog.Fields{
|
|
"device:uid": m.DeviceUID,
|
|
"project:id": m.ProjectID,
|
|
"classmethod": m.ClassMethod,
|
|
"type": m.Type,
|
|
})
|
|
if err := r.process(logger, m); err != nil {
|
|
logger.WithError(err).Error("Unable to process RPC message")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// process processes a single rpc message
|
|
func (r *Redis) process(logger dilog.Logger, msg *rpc.Msg) error {
|
|
// Ignore error messages
|
|
if msg.Error != nil {
|
|
return nil
|
|
}
|
|
|
|
if ok, err := r.shouldProcess(msg); !ok {
|
|
return err
|
|
}
|
|
|
|
// Sensor data
|
|
if msg.ClassMethod == rpc.ClassMethodSensorData {
|
|
var resValueItems []rpc.ResultValueItem
|
|
if err := msg.Result.Unmarshal(&resValueItems); err != nil {
|
|
return err
|
|
}
|
|
if err := r.processSensorData(logger, msg, resValueItems); err != nil {
|
|
return err
|
|
}
|
|
return r.publish(msg)
|
|
}
|
|
|
|
var valueItems []map[string]interface{}
|
|
// All other class:method versions with result
|
|
if err := msg.Result.Unmarshal(&valueItems); err == nil {
|
|
if err = r.processGeneralClassMethod(logger, msg, valueItems); err != nil {
|
|
return err
|
|
}
|
|
return r.publish(msg)
|
|
}
|
|
|
|
// All other class:method versions without result
|
|
result := make(map[string]interface{})
|
|
result["time"] = msg.Time
|
|
|
|
mKeyName := keyName(msg)
|
|
if mKeyName == "" {
|
|
logger.Warning("Redis.process keyName is empty")
|
|
return nil
|
|
}
|
|
|
|
if err := r.processMsgResult(logger, mKeyName, fieldName(msg, 0), result); err != nil {
|
|
return err
|
|
}
|
|
return r.publish(msg)
|
|
}
|
|
|
|
func (r *Redis) shouldProcess(msg *rpc.Msg) (bool, error) {
|
|
switch msg.Class() {
|
|
case rpc.ClassLog, rpc.ClassMessage:
|
|
// Drop log and message messages
|
|
return false, nil
|
|
case rpc.ClassProject:
|
|
// Project class must contain a project ID
|
|
if !rpc.ValidProjectID(msg.ProjectID) {
|
|
return false, errors.New("project contains no projectID")
|
|
}
|
|
|
|
// Project class must contain a result
|
|
if msg.Result.IsEmpty() {
|
|
return false, errors.New("project contains no result")
|
|
}
|
|
default:
|
|
// All other methods must contain a device UID or project ID
|
|
if !rpc.ValidDeviceUID(msg.DeviceUID) && !rpc.ValidProjectID(msg.ProjectID) {
|
|
return false, errors.New("message has invalid deviceUID or projectID")
|
|
}
|
|
}
|
|
// Drop request messages
|
|
if msg.Type == rpc.MsgTypeRequest {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// getConn gets a new connection from the cluster
|
|
// it configures it as a retry connection. With 3 retries and 1s timeout delay.
|
|
func (r *Redis) getConn() (redis.Conn, error) {
|
|
conn := r.rc.Get()
|
|
|
|
// create the retry connection - only Do, Close and Err are
|
|
// supported on that connection. It will make up to 3 attempts
|
|
// to get a valid response, and will wait 100ms before a retry
|
|
// in case of a TRYAGAIN redis error.
|
|
return redisc.RetryConn(conn, 3, time.Second)
|
|
}
|
|
|
|
// Close the connection to redis
|
|
func (r *Redis) Close() error {
|
|
return r.rc.Close()
|
|
}
|