src.dualinventive.com/go/influxdb-logger/internal/device/cache.go

143 lines
3.7 KiB
Go

package device
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/dualinventive/go-lruttl"
"github.com/sirupsen/logrus"
"src.dualinventive.com/go/dinet/rpc"
"src.dualinventive.com/go/dinet/rts"
"src.dualinventive.com/go/lib/kv"
)
// Cache contains a lruttl cache with devices. When devices are not in the cache, the KV is requested.
type Cache struct {
c *lruttl.Cache
r kv.KV
}
// New creates a new cache that contains at most maxEntries devices. When the device is not in the cache, store is
// requested. When the expiry of the device is exceeded, the information is requested again.
func New(store kv.KV, maxEntries int, expiry time.Duration) *Cache {
return &Cache{
c: lruttl.New(maxEntries, expiry),
r: store,
}
}
// Device returns the device from the cache. If the device is not in the cache, the device is requested via store.
func (c *Cache) Device(deviceUID string) (*Device, error) {
d, found := c.c.Get(deviceUID)
if !found {
logrus.Debugf("Device %s not found. Requesting Redis", deviceUID)
return c.loadFromRedis(deviceUID)
}
return d.(*Device), nil
}
// loadFromRedis request the information from the store and returns the device
func (c *Cache) loadFromRedis(deviceUID string) (*Device, error) {
fields, err := c.r.HKeys(rts.KeyPrefixDevice + deviceUID)
if err != nil {
return nil, err
}
var UID int64
var ci *FieldInfo
d := Device{
SensorFields: make(map[uint16]*FieldInfo),
NotifyFields: make(map[uint16]*FieldInfo),
}
for _, field := range fields {
// Optimalisation, no need to split when the suffix is not :info
if !strings.HasSuffix(field, ":info") {
continue
}
fieldParts := strings.Split(field, ":")
if len(fieldParts) != 3 {
continue
}
UID, err = strconv.ParseInt(fieldParts[1], 10, 16)
if err != nil {
logrus.Warnf("cannot get UID from %s, device: %s", field, deviceUID)
continue
}
// Get the information from redis
ci, err = c.getFieldInfoFromRedis(deviceUID, field)
if err != nil {
logrus.Warnf("cannot get label from redis: %s, device: %s", err, deviceUID)
continue
}
switch rpc.Class(fieldParts[0]) {
case rpc.ClassSensor:
d.SensorFields[uint16(UID)] = ci
case rpc.ClassNotify:
d.NotifyFields[uint16(UID)] = ci
}
}
// Get the device type from redis
t, err := c.getDeviceTypeFromRedis(deviceUID)
if err == nil {
d.Type = t
} else {
logrus.Warnf("cannot get device type from redis: %s", err)
}
// Add the device to the cache
c.c.Add(deviceUID, &d)
return &d, nil
}
// getDeviceTypeFromRedis returns the device type from Redis
func (c *Cache) getDeviceTypeFromRedis(deviceUID string) (string, error) {
info, err := c.r.HGet(rts.KeyPrefixDevice+deviceUID, string(rpc.ClassMethodDeviceInfo))
if err != nil {
return "", err
}
var data map[string]interface{}
if err = json.Unmarshal([]byte(info), &data); err != nil {
return "", err
}
if t, ok := data["type"]; ok {
return fmt.Sprintf("%s", t), nil
}
return "", errors.New("no type in device:info")
}
// getFieldInfoFromRedis returns the device info from redis
func (c *Cache) getFieldInfoFromRedis(deviceUID, field string) (*FieldInfo, error) {
info, err := c.r.HGet(rts.KeyPrefixDevice+deviceUID, field)
if err != nil {
return nil, err
}
var data map[string]interface{}
if err = json.Unmarshal([]byte(info), &data); err != nil {
return nil, err
}
var fi FieldInfo
var ok bool
if fi.Label, ok = data["label"].(string); !ok {
return nil, errors.New("no label")
}
if len(fi.Label) == 0 {
return nil, errors.New("empty label")
}
if fi.Type, ok = data["type"].(string); !ok {
return nil, errors.New("no type")
}
if len(fi.Type) == 0 {
return nil, errors.New("empty type")
}
return &fi, nil
}