80 lines
2.2 KiB
Go
80 lines
2.2 KiB
Go
package redis
|
|
|
|
import (
|
|
"encoding/json"
|
|
"hash/crc64"
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/lib/dilog"
|
|
)
|
|
|
|
// IsNew checks if the data at key - field in Redis is older than time t. When this is the first call with key - field
|
|
// the last_update is requested from Redis and stored in a cache. The next calls with key - field uses the cached
|
|
// timestamp.
|
|
func (r *Redis) IsNew(logger dilog.Logger, key, field string, t ditime.Time) bool {
|
|
var timestamp ditime.Time
|
|
r.cacheMu.Lock()
|
|
v, ok := r.cache.Get(r.cacheKey(key, field))
|
|
r.cacheMu.Unlock()
|
|
// Entry is unknown. Grab timestamp from Redis
|
|
if !ok {
|
|
if timestamp, ok = r.redisLastUpdate(logger, key, field); ok {
|
|
r.UpdateCache(key, field, timestamp)
|
|
} else {
|
|
// Timestamp is not in the cache and could not fetched from redis. This value should be new.
|
|
return true
|
|
}
|
|
} else {
|
|
timestamp = v.(ditime.Time)
|
|
}
|
|
prevTime := timestamp.ToTime()
|
|
curTime := t.ToTime()
|
|
return prevTime.Before(curTime) || prevTime.Equal(curTime)
|
|
}
|
|
|
|
// UpdateCache inserts time t as last_update
|
|
func (r *Redis) UpdateCache(key, field string, t ditime.Time) {
|
|
r.cacheMu.Lock()
|
|
r.cache.Add(r.cacheKey(key, field), t)
|
|
r.cacheMu.Unlock()
|
|
}
|
|
|
|
func (r *Redis) redisLastUpdate(logger dilog.Logger, key, field string) (ditime.Time, bool) {
|
|
c, err := r.getConn()
|
|
if err != nil {
|
|
logger.WithError(err).Warning("cannot get time from redis")
|
|
return 0, false
|
|
}
|
|
defer func() {
|
|
closeerr := c.Close()
|
|
// we are closing the connection so it is too late for errors.
|
|
_ = closeerr
|
|
}()
|
|
|
|
record, err := redis.String(c.Do("HGET", key, field))
|
|
if err != nil {
|
|
logger.WithError(err).Warning("cannot get time from redis")
|
|
return 0, false
|
|
}
|
|
|
|
var data map[string]interface{}
|
|
if err = json.Unmarshal([]byte(record), &data); err != nil {
|
|
logger.WithError(err).Warning("cannot unmarshal redis data")
|
|
return 0, false
|
|
}
|
|
timestamp, ok := data["time"].(ditime.Time)
|
|
if !ok {
|
|
logger.WithFields(dilog.Fields{
|
|
"key": key,
|
|
"field": field,
|
|
}).Warning("record contains no time")
|
|
return 0, false
|
|
}
|
|
return timestamp, true
|
|
}
|
|
|
|
func (r *Redis) cacheKey(key, field string) uint64 {
|
|
return crc64.Checksum([]byte(key+field), r.crc64Table)
|
|
}
|