package redis import ( "context" "errors" "hash/crc64" "sync" "time" "github.com/dualinventive/go-lruttl" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/lib/dilog" ) // Redis realtime status subscriber type Redis struct { logger dilog.Logger rc *redisc.Cluster c chan *rpc.Msg cacheMu sync.Mutex // cache keeps track of the last update of every Redis record cache *lruttl.Cache crc64Table *crc64.Table } func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { return &redis.Pool{ MaxIdle: 5, MaxActive: 10, IdleTimeout: time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr, opts...) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil } // New creates a new Redis realtime status instance. func New(logger dilog.Logger, uris []string, connectionTimeout time.Duration, cacheMaxItems int, cacheExpiry time.Duration) (*Redis, error) { rc := redisc.Cluster{ StartupNodes: uris, DialOptions: []redis.DialOption{redis.DialConnectTimeout(connectionTimeout)}, CreatePool: createPool, } // Update the cluster nodes inventory based on the first node if err := rc.Refresh(); err != nil { return nil, err } r := &Redis{ logger: logger, rc: &rc, c: make(chan *rpc.Msg), cache: lruttl.New(cacheMaxItems, cacheExpiry), crc64Table: crc64.MakeTable(crc64.ISO), } return r, nil } // Send sends a message to redis. It is processed internally by another go routine. func (r *Redis) Send(m *rpc.Msg) error { r.c <- m return nil } // Process processes all incomming write messages. This method blocks until the context is invalid. func (r *Redis) Process(ctx context.Context) { for { select { case <-ctx.Done(): return case m := <-r.c: logger := r.logger.WithFields(dilog.Fields{ "device:uid": m.DeviceUID, "project:id": m.ProjectID, "classmethod": m.ClassMethod, "type": m.Type, }) if err := r.process(logger, m); err != nil { logger.WithError(err).Error("Unable to process RPC message") } } } } // process processes a single rpc message func (r *Redis) process(logger dilog.Logger, msg *rpc.Msg) error { // Ignore error messages if msg.Error != nil { return nil } if ok, err := r.shouldProcess(msg); !ok { return err } // Sensor data if msg.ClassMethod == rpc.ClassMethodSensorData { var resValueItems []rpc.ResultValueItem if err := msg.Result.Unmarshal(&resValueItems); err != nil { return err } if err := r.processSensorData(logger, msg, resValueItems); err != nil { return err } return r.publish(msg) } var valueItems []map[string]interface{} // All other class:method versions with result if err := msg.Result.Unmarshal(&valueItems); err == nil { if err = r.processGeneralClassMethod(logger, msg, valueItems); err != nil { return err } return r.publish(msg) } // All other class:method versions without result result := make(map[string]interface{}) result["time"] = msg.Time mKeyName := keyName(msg) if mKeyName == "" { logger.Warning("Redis.process keyName is empty") return nil } if err := r.processMsgResult(logger, mKeyName, fieldName(msg, 0), result); err != nil { return err } return r.publish(msg) } func (r *Redis) shouldProcess(msg *rpc.Msg) (bool, error) { switch msg.Class() { case rpc.ClassLog, rpc.ClassMessage: // Drop log and message messages return false, nil case rpc.ClassProject: // Project class must contain a project ID if !rpc.ValidProjectID(msg.ProjectID) { return false, errors.New("project contains no projectID") } // Project class must contain a result if msg.Result.IsEmpty() { return false, errors.New("project contains no result") } default: // All other methods must contain a device UID or project ID if !rpc.ValidDeviceUID(msg.DeviceUID) && !rpc.ValidProjectID(msg.ProjectID) { return false, errors.New("message has invalid deviceUID or projectID") } } // Drop request messages if msg.Type == rpc.MsgTypeRequest { return false, nil } return true, nil } // getConn gets a new connection from the cluster // it configures it as a retry connection. With 3 retries and 1s timeout delay. func (r *Redis) getConn() (redis.Conn, error) { conn := r.rc.Get() // create the retry connection - only Do, Close and Err are // supported on that connection. It will make up to 3 attempts // to get a valid response, and will wait 100ms before a retry // in case of a TRYAGAIN redis error. return redisc.RetryConn(conn, 3, time.Second) } // Close the connection to redis func (r *Redis) Close() error { return r.rc.Close() }