src.dualinventive.com/go/redis-proxy/internal/redis/redis_test.go

597 lines
14 KiB
Go

package redis
import (
"context"
"encoding/json"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"src.dualinventive.com/go/dinet/ditime"
"src.dualinventive.com/go/dinet/rpc"
"src.dualinventive.com/go/dinet/rts"
"src.dualinventive.com/go/redis-proxy/internal/redistest"
)
type cleanupFunc func()
func setup(t *testing.T, msg string) (cleanup cleanupFunc, redisTest *redistest.Redis) {
redisTest = redistest.New(t)
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 1, time.Second)
require.Nil(t, err)
var rpcMsg rpc.Msg
err = json.Unmarshal([]byte(msg), &rpcMsg)
require.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r.Process(ctx)
}()
err = r.Send(&rpcMsg)
require.Nil(t, err)
// Message is delivered. We can close the process task
cancel()
wg.Wait()
return func() {
require.Nil(t, redisTest.Close())
require.Nil(t, r.Close())
}, redisTest
}
func TestDeviceSensorDataMsg(t *testing.T) {
strMsg := `{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":1,
"value":129873412,
"time":1234571
},{
"uid":2,
"value":"string value",
"time":1234572
},{
"uid":3,
"value":{"a":"b"},
"time":1234573
}]}`
cleanup, redisTest := setup(t, strMsg)
defer cleanup()
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
redisTest.FieldUint64(t, key, "last_update")
redisTest.FieldExist(t, key, "sensor:1:data")
redisTest.FieldExist(t, key, "sensor:2:data")
redisTest.FieldExist(t, key, "sensor:3:data")
require.False(t, redisTest.GPSRecordExists(t, "00133410270056001251343236363736"))
}
func TestDeviceSensorGPSDataMsg(t *testing.T) {
testCases := []struct {
name string
msg string
expect interface{}
GPSExists bool
}{
{
"valid GPS",
`{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":13,
"value":{
"fix": true,
"hdop": 2.799999952316284,
"latitude": 51.5856781,
"longitude": 5.1952634,
"siv": 0
},
"time":1234573
}]
}`,
map[string]interface{}{
"fix": true,
"hdop": 2.799999952316284,
"latitude": 51.5856781,
"longitude": 5.1952634,
"siv": float64(0),
},
true,
},
{
"invalid GPS",
`{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":13,
"value":5,
"time":1234573
}]
}`,
float64(5),
false,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cleanup, redisTest := setup(t, tc.msg)
defer cleanup()
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
redisTest.FieldUint64(t, key, "last_update")
data := redisTest.FieldJSONExist(t, key, "sensor:13:data", "value")
require.Equal(t, tc.expect, data)
require.Equal(t, tc.GPSExists, redisTest.GPSRecordExists(t, "00133410270056001251343236363736"))
})
}
}
func TestDeviceDataInvalidMsg(t *testing.T) {
testCases := []struct {
name string
msg string
}{
{
"GPS msg invalid device UID",
`{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"1337",
"time":1234570,
"result":[{
"uid":13,
"value":{
"fix": true,
"hdop": 2.799999952316284,
"latitude": 51.5856781,
"longitude": 5.1952634,
"siv": 0
},
"time":1234573
}]
}`,
},
{
"invalid device UID",
`{
"dinetrpc":1,
"pub":"device:info",
"device:uid":"1337",
"time":1234570,
"result":[{
"revision": 0,
"time": 1511865759485,
"type": "zkl-3000-rc",
"version":
"wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181"
}]
}`,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cleanup, redisTest := setup(t, tc.msg)
defer cleanup()
require.False(t, redisTest.WaitForChange(500*time.Millisecond))
})
}
}
func TestDeviceDevicePingMsg(t *testing.T) {
strMsg := `{
"dinetrpc":1,
"pub":"device:ping",
"device:uid":"00133410270056001251343236363736",
"time":1234570
}`
cleanup, redisTest := setup(t, strMsg)
defer cleanup()
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
redisTest.FieldUint64(t, key, "last_update")
redisTest.FieldExist(t, key, "device:ping")
}
func TestDeviceDeviceInfoMsg(t *testing.T) {
testCases := []struct {
name string
msg string
field string
json string
expect interface{}
}{
{
"contains time",
`{
"dinetrpc":1,
"pub":"device:info",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"revision": 0,
"time": 1511865759485,
"type": "zkl-3000-rc",
"version":
"wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181"
}]
}`,
"device:info",
"version",
`wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181`,
},
{
"missing time",
`{
"dinetrpc":1,
"pub":"sensor:info",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"label": "bat1-voltage",
"type": "number",
"uid": 1
}]
}`,
"sensor:1:info",
"time",
float64(1234570),
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cleanup, redisTest := setup(t, tc.msg)
defer cleanup()
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
data := redisTest.FieldJSONExist(t, key, tc.field, tc.json)
require.Equal(t, tc.expect, data)
})
}
}
func TestRemoveInfoFields(t *testing.T) {
redisTest := redistest.New(t)
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
require.Nil(t, err)
msg := `{
"dinetrpc":1,
"rep":"config:info",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"default": "di-tcp.dualinventive.com",
"label": "dncm-tcp-host",
"time": 92715,
"type": "string",
"uid": 100
}, {
"default": "4030",
"label": "dncm-tcp-port",
"time": 92715,
"type": "string",
"uid": 101
}, {
"default": "40",
"label": "test2",
"time": 92715,
"type": "string",
"uid": 102
}, {
"default": "30",
"label": "test3",
"time": 92715,
"type": "string",
"uid": 103
}, {
"default": "32",
"label": "test4",
"time": 92715,
"type": "string",
"uid": 104
}]
}`
var rpcMsg rpc.Msg
err = json.Unmarshal([]byte(msg), &rpcMsg)
require.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r.Process(ctx)
}()
// Send first config
err = r.Send(&rpcMsg)
require.Nil(t, err)
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
redisTest.FieldExist(t, key, "config:100:info")
redisTest.FieldExist(t, key, "config:101:info")
redisTest.FieldExist(t, key, "config:102:info")
redisTest.FieldExist(t, key, "config:103:info")
redisTest.FieldExist(t, key, "config:104:info")
msg = `{
"dinetrpc":1,
"rep":"sensor:info",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"label": "bat1-voltage",
"type": "number",
"uid": 1
}]
}`
var rpcMsg2 rpc.Msg
err = json.Unmarshal([]byte(msg), &rpcMsg2)
require.Nil(t, err)
// Send sensor info (this data should stay persistent)
err = r.Send(&rpcMsg2)
require.Nil(t, err)
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
redisTest.FieldExist(t, key, "config:100:info")
redisTest.FieldExist(t, key, "config:101:info")
redisTest.FieldExist(t, key, "config:102:info")
redisTest.FieldExist(t, key, "config:103:info")
redisTest.FieldExist(t, key, "config:104:info")
redisTest.FieldExist(t, key, "sensor:1:info")
msg = `{
"dinetrpc":1,
"rep":"config:info",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"default": "di-tcp.dualinventive.com",
"label": "dncm-tcp-host",
"time": 92715,
"type": "string",
"uid": 100
}, {
"default": "40",
"label": "test2",
"time": 92715,
"type": "string",
"uid": 102
}, {
"default": "30",
"label": "test3",
"time": 92715,
"type": "string",
"uid": 103
}]
}`
var rpcMsg3 rpc.Msg
err = json.Unmarshal([]byte(msg), &rpcMsg3)
require.Nil(t, err)
// Send second config (this request is missing config:101:info and config:104:info)
err = r.Send(&rpcMsg3)
require.Nil(t, err)
cancel()
wg.Wait()
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
redisTest.FieldExist(t, key, "config:100:info")
// This key should be removed now
redisTest.FieldNotExist(t, key, "config:101:info")
redisTest.FieldExist(t, key, "config:102:info")
redisTest.FieldExist(t, key, "config:103:info")
// This key should be removed now
redisTest.FieldNotExist(t, key, "config:104:info")
// This is another class, so this should still exist
redisTest.FieldExist(t, key, "sensor:1:info")
require.Nil(t, redisTest.Close())
require.Nil(t, r.Close())
}
func TestIgnoreOldData(t *testing.T) {
redisTest := redistest.New(t)
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
require.Nil(t, err)
strMsg := `{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":1,
"value":129873412,
"time":1234571
},{
"uid":2,
"value":"string value"
}]}`
var rpcMsg rpc.Msg
err = json.Unmarshal([]byte(strMsg), &rpcMsg)
require.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r.Process(ctx)
}()
// Send first sensor data
err = r.Send(&rpcMsg)
require.Nil(t, err)
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
r.cacheMu.Lock()
v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data"))
v2, ok2 := r.cache.Get(r.cacheKey(key, "sensor:2:data"))
r.cacheMu.Unlock()
require.True(t, ok1)
require.True(t, ok2)
require.Equal(t, ditime.Time(1234571), v1)
require.Equal(t, ditime.Time(1234570), v2)
val := redisTest.FieldJSONExist(t, key, "sensor:1:data", "value")
require.InDelta(t, float64(129873412), val, 0.1)
redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value")
strMsg = `{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234569,
"result":[{
"uid":1,
"value":12,
"time":1234572
},{
"uid":2,
"value":"a new value"
}]}`
var rpcMsg2 rpc.Msg
err = json.Unmarshal([]byte(strMsg), &rpcMsg2)
require.Nil(t, err)
// Send new device data (sensor 1 contains a newer value, sensor 2 contains a old value)
err = r.Send(&rpcMsg2)
require.Nil(t, err)
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
r.cacheMu.Lock()
v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data"))
v2, ok2 = r.cache.Get(r.cacheKey(key, "sensor:2:data"))
r.cacheMu.Unlock()
require.True(t, ok1)
require.True(t, ok2)
cancel()
wg.Wait()
// Updated
require.Equal(t, ditime.Time(1234572), v1)
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12))
// Not updated
require.Equal(t, ditime.Time(1234570), v2)
redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value")
v := redisTest.FieldJSONExist(t, key, "sensor:2:data", "last_update")
f, okf := v.(float64)
require.True(t, okf)
require.InDelta(t, f, float64(ditime.Now()), 50000)
require.Nil(t, redisTest.Close())
require.Nil(t, r.Close())
}
func TestOverwriteSameTimestampData(t *testing.T) {
redisTest := redistest.New(t)
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
require.Nil(t, err)
strMsg := `{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":1,
"value":129873412,
"time":1234571
}]}`
var rpcMsg rpc.Msg
err = json.Unmarshal([]byte(strMsg), &rpcMsg)
require.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r.Process(ctx)
}()
// Send first sensor data
err = r.Send(&rpcMsg)
require.Nil(t, err)
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
r.cacheMu.Lock()
v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data"))
r.cacheMu.Unlock()
require.True(t, ok1)
require.Equal(t, ditime.Time(1234571), v1)
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(129873412))
strMsg = `{
"dinetrpc":1,
"pub":"sensor:data",
"device:uid":"00133410270056001251343236363736",
"time":1234570,
"result":[{
"uid":1,
"value":12,
"time":1234571
}]}`
var rpcMsg2 rpc.Msg
err = json.Unmarshal([]byte(strMsg), &rpcMsg2)
require.Nil(t, err)
// Send new device data. The value is changed, but the time is the same
err = r.Send(&rpcMsg2)
require.Nil(t, err)
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
r.cacheMu.Lock()
v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data"))
r.cacheMu.Unlock()
require.True(t, ok1)
cancel()
wg.Wait()
// Updated
require.Equal(t, ditime.Time(1234571), v1)
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12))
require.Nil(t, redisTest.Close())
require.Nil(t, r.Close())
}