597 lines
14 KiB
Go
597 lines
14 KiB
Go
package redis
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
"src.dualinventive.com/go/dinet/rts"
|
|
"src.dualinventive.com/go/redis-proxy/internal/redistest"
|
|
)
|
|
|
|
type cleanupFunc func()
|
|
|
|
func setup(t *testing.T, msg string) (cleanup cleanupFunc, redisTest *redistest.Redis) {
|
|
redisTest = redistest.New(t)
|
|
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 1, time.Second)
|
|
require.Nil(t, err)
|
|
|
|
var rpcMsg rpc.Msg
|
|
err = json.Unmarshal([]byte(msg), &rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.Process(ctx)
|
|
}()
|
|
|
|
err = r.Send(&rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
// Message is delivered. We can close the process task
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
return func() {
|
|
require.Nil(t, redisTest.Close())
|
|
require.Nil(t, r.Close())
|
|
}, redisTest
|
|
}
|
|
|
|
func TestDeviceSensorDataMsg(t *testing.T) {
|
|
strMsg := `{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":1,
|
|
"value":129873412,
|
|
"time":1234571
|
|
},{
|
|
"uid":2,
|
|
"value":"string value",
|
|
"time":1234572
|
|
},{
|
|
"uid":3,
|
|
"value":{"a":"b"},
|
|
"time":1234573
|
|
}]}`
|
|
cleanup, redisTest := setup(t, strMsg)
|
|
defer cleanup()
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
redisTest.FieldUint64(t, key, "last_update")
|
|
redisTest.FieldExist(t, key, "sensor:1:data")
|
|
redisTest.FieldExist(t, key, "sensor:2:data")
|
|
redisTest.FieldExist(t, key, "sensor:3:data")
|
|
require.False(t, redisTest.GPSRecordExists(t, "00133410270056001251343236363736"))
|
|
}
|
|
|
|
func TestDeviceSensorGPSDataMsg(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
msg string
|
|
expect interface{}
|
|
GPSExists bool
|
|
}{
|
|
{
|
|
"valid GPS",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":13,
|
|
"value":{
|
|
"fix": true,
|
|
"hdop": 2.799999952316284,
|
|
"latitude": 51.5856781,
|
|
"longitude": 5.1952634,
|
|
"siv": 0
|
|
},
|
|
"time":1234573
|
|
}]
|
|
}`,
|
|
map[string]interface{}{
|
|
"fix": true,
|
|
"hdop": 2.799999952316284,
|
|
"latitude": 51.5856781,
|
|
"longitude": 5.1952634,
|
|
"siv": float64(0),
|
|
},
|
|
true,
|
|
},
|
|
{
|
|
"invalid GPS",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":13,
|
|
"value":5,
|
|
"time":1234573
|
|
}]
|
|
}`,
|
|
float64(5),
|
|
false,
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
cleanup, redisTest := setup(t, tc.msg)
|
|
defer cleanup()
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
redisTest.FieldUint64(t, key, "last_update")
|
|
data := redisTest.FieldJSONExist(t, key, "sensor:13:data", "value")
|
|
require.Equal(t, tc.expect, data)
|
|
require.Equal(t, tc.GPSExists, redisTest.GPSRecordExists(t, "00133410270056001251343236363736"))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDeviceDataInvalidMsg(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
msg string
|
|
}{
|
|
{
|
|
"GPS msg invalid device UID",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"1337",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":13,
|
|
"value":{
|
|
"fix": true,
|
|
"hdop": 2.799999952316284,
|
|
"latitude": 51.5856781,
|
|
"longitude": 5.1952634,
|
|
"siv": 0
|
|
},
|
|
"time":1234573
|
|
}]
|
|
}`,
|
|
},
|
|
{
|
|
"invalid device UID",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"device:info",
|
|
"device:uid":"1337",
|
|
"time":1234570,
|
|
"result":[{
|
|
"revision": 0,
|
|
"time": 1511865759485,
|
|
"type": "zkl-3000-rc",
|
|
"version":
|
|
"wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181"
|
|
}]
|
|
}`,
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
cleanup, redisTest := setup(t, tc.msg)
|
|
defer cleanup()
|
|
require.False(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDeviceDevicePingMsg(t *testing.T) {
|
|
strMsg := `{
|
|
"dinetrpc":1,
|
|
"pub":"device:ping",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570
|
|
}`
|
|
cleanup, redisTest := setup(t, strMsg)
|
|
defer cleanup()
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
redisTest.FieldUint64(t, key, "last_update")
|
|
redisTest.FieldExist(t, key, "device:ping")
|
|
}
|
|
|
|
func TestDeviceDeviceInfoMsg(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
msg string
|
|
field string
|
|
json string
|
|
expect interface{}
|
|
}{
|
|
{
|
|
"contains time",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"device:info",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"revision": 0,
|
|
"time": 1511865759485,
|
|
"type": "zkl-3000-rc",
|
|
"version":
|
|
"wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181"
|
|
}]
|
|
}`,
|
|
"device:info",
|
|
"version",
|
|
`wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181`,
|
|
},
|
|
{
|
|
"missing time",
|
|
`{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:info",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"label": "bat1-voltage",
|
|
"type": "number",
|
|
"uid": 1
|
|
}]
|
|
}`,
|
|
"sensor:1:info",
|
|
"time",
|
|
float64(1234570),
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
cleanup, redisTest := setup(t, tc.msg)
|
|
defer cleanup()
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
data := redisTest.FieldJSONExist(t, key, tc.field, tc.json)
|
|
require.Equal(t, tc.expect, data)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestRemoveInfoFields(t *testing.T) {
|
|
redisTest := redistest.New(t)
|
|
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
|
|
require.Nil(t, err)
|
|
|
|
msg := `{
|
|
"dinetrpc":1,
|
|
"rep":"config:info",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"default": "di-tcp.dualinventive.com",
|
|
"label": "dncm-tcp-host",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 100
|
|
}, {
|
|
"default": "4030",
|
|
"label": "dncm-tcp-port",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 101
|
|
}, {
|
|
"default": "40",
|
|
"label": "test2",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 102
|
|
}, {
|
|
"default": "30",
|
|
"label": "test3",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 103
|
|
}, {
|
|
"default": "32",
|
|
"label": "test4",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 104
|
|
}]
|
|
}`
|
|
var rpcMsg rpc.Msg
|
|
err = json.Unmarshal([]byte(msg), &rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.Process(ctx)
|
|
}()
|
|
|
|
// Send first config
|
|
err = r.Send(&rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
redisTest.FieldExist(t, key, "config:100:info")
|
|
redisTest.FieldExist(t, key, "config:101:info")
|
|
redisTest.FieldExist(t, key, "config:102:info")
|
|
redisTest.FieldExist(t, key, "config:103:info")
|
|
redisTest.FieldExist(t, key, "config:104:info")
|
|
|
|
msg = `{
|
|
"dinetrpc":1,
|
|
"rep":"sensor:info",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"label": "bat1-voltage",
|
|
"type": "number",
|
|
"uid": 1
|
|
}]
|
|
}`
|
|
|
|
var rpcMsg2 rpc.Msg
|
|
err = json.Unmarshal([]byte(msg), &rpcMsg2)
|
|
require.Nil(t, err)
|
|
|
|
// Send sensor info (this data should stay persistent)
|
|
err = r.Send(&rpcMsg2)
|
|
require.Nil(t, err)
|
|
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
redisTest.FieldExist(t, key, "config:100:info")
|
|
redisTest.FieldExist(t, key, "config:101:info")
|
|
redisTest.FieldExist(t, key, "config:102:info")
|
|
redisTest.FieldExist(t, key, "config:103:info")
|
|
redisTest.FieldExist(t, key, "config:104:info")
|
|
redisTest.FieldExist(t, key, "sensor:1:info")
|
|
|
|
msg = `{
|
|
"dinetrpc":1,
|
|
"rep":"config:info",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"default": "di-tcp.dualinventive.com",
|
|
"label": "dncm-tcp-host",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 100
|
|
}, {
|
|
"default": "40",
|
|
"label": "test2",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 102
|
|
}, {
|
|
"default": "30",
|
|
"label": "test3",
|
|
"time": 92715,
|
|
"type": "string",
|
|
"uid": 103
|
|
}]
|
|
}`
|
|
|
|
var rpcMsg3 rpc.Msg
|
|
err = json.Unmarshal([]byte(msg), &rpcMsg3)
|
|
require.Nil(t, err)
|
|
|
|
// Send second config (this request is missing config:101:info and config:104:info)
|
|
err = r.Send(&rpcMsg3)
|
|
require.Nil(t, err)
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
redisTest.FieldExist(t, key, "config:100:info")
|
|
// This key should be removed now
|
|
redisTest.FieldNotExist(t, key, "config:101:info")
|
|
redisTest.FieldExist(t, key, "config:102:info")
|
|
redisTest.FieldExist(t, key, "config:103:info")
|
|
// This key should be removed now
|
|
redisTest.FieldNotExist(t, key, "config:104:info")
|
|
// This is another class, so this should still exist
|
|
redisTest.FieldExist(t, key, "sensor:1:info")
|
|
|
|
require.Nil(t, redisTest.Close())
|
|
require.Nil(t, r.Close())
|
|
}
|
|
|
|
func TestIgnoreOldData(t *testing.T) {
|
|
redisTest := redistest.New(t)
|
|
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
|
|
require.Nil(t, err)
|
|
|
|
strMsg := `{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":1,
|
|
"value":129873412,
|
|
"time":1234571
|
|
},{
|
|
"uid":2,
|
|
"value":"string value"
|
|
}]}`
|
|
var rpcMsg rpc.Msg
|
|
err = json.Unmarshal([]byte(strMsg), &rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.Process(ctx)
|
|
}()
|
|
|
|
// Send first sensor data
|
|
err = r.Send(&rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
r.cacheMu.Lock()
|
|
v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data"))
|
|
v2, ok2 := r.cache.Get(r.cacheKey(key, "sensor:2:data"))
|
|
r.cacheMu.Unlock()
|
|
require.True(t, ok1)
|
|
require.True(t, ok2)
|
|
|
|
require.Equal(t, ditime.Time(1234571), v1)
|
|
require.Equal(t, ditime.Time(1234570), v2)
|
|
val := redisTest.FieldJSONExist(t, key, "sensor:1:data", "value")
|
|
require.InDelta(t, float64(129873412), val, 0.1)
|
|
redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value")
|
|
|
|
strMsg = `{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234569,
|
|
"result":[{
|
|
"uid":1,
|
|
"value":12,
|
|
"time":1234572
|
|
},{
|
|
"uid":2,
|
|
"value":"a new value"
|
|
}]}`
|
|
|
|
var rpcMsg2 rpc.Msg
|
|
err = json.Unmarshal([]byte(strMsg), &rpcMsg2)
|
|
require.Nil(t, err)
|
|
|
|
// Send new device data (sensor 1 contains a newer value, sensor 2 contains a old value)
|
|
err = r.Send(&rpcMsg2)
|
|
require.Nil(t, err)
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
r.cacheMu.Lock()
|
|
v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data"))
|
|
v2, ok2 = r.cache.Get(r.cacheKey(key, "sensor:2:data"))
|
|
r.cacheMu.Unlock()
|
|
require.True(t, ok1)
|
|
require.True(t, ok2)
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
// Updated
|
|
require.Equal(t, ditime.Time(1234572), v1)
|
|
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12))
|
|
// Not updated
|
|
require.Equal(t, ditime.Time(1234570), v2)
|
|
redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value")
|
|
|
|
v := redisTest.FieldJSONExist(t, key, "sensor:2:data", "last_update")
|
|
f, okf := v.(float64)
|
|
require.True(t, okf)
|
|
require.InDelta(t, f, float64(ditime.Now()), 50000)
|
|
|
|
require.Nil(t, redisTest.Close())
|
|
require.Nil(t, r.Close())
|
|
}
|
|
|
|
func TestOverwriteSameTimestampData(t *testing.T) {
|
|
redisTest := redistest.New(t)
|
|
r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second)
|
|
require.Nil(t, err)
|
|
|
|
strMsg := `{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":1,
|
|
"value":129873412,
|
|
"time":1234571
|
|
}]}`
|
|
var rpcMsg rpc.Msg
|
|
err = json.Unmarshal([]byte(strMsg), &rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
r.Process(ctx)
|
|
}()
|
|
|
|
// Send first sensor data
|
|
err = r.Send(&rpcMsg)
|
|
require.Nil(t, err)
|
|
|
|
key := rts.KeyPrefixDevice + "00133410270056001251343236363736"
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
r.cacheMu.Lock()
|
|
v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data"))
|
|
r.cacheMu.Unlock()
|
|
require.True(t, ok1)
|
|
|
|
require.Equal(t, ditime.Time(1234571), v1)
|
|
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(129873412))
|
|
|
|
strMsg = `{
|
|
"dinetrpc":1,
|
|
"pub":"sensor:data",
|
|
"device:uid":"00133410270056001251343236363736",
|
|
"time":1234570,
|
|
"result":[{
|
|
"uid":1,
|
|
"value":12,
|
|
"time":1234571
|
|
}]}`
|
|
|
|
var rpcMsg2 rpc.Msg
|
|
err = json.Unmarshal([]byte(strMsg), &rpcMsg2)
|
|
require.Nil(t, err)
|
|
|
|
// Send new device data. The value is changed, but the time is the same
|
|
err = r.Send(&rpcMsg2)
|
|
require.Nil(t, err)
|
|
require.True(t, redisTest.WaitForChange(500*time.Millisecond))
|
|
|
|
r.cacheMu.Lock()
|
|
v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data"))
|
|
r.cacheMu.Unlock()
|
|
require.True(t, ok1)
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
// Updated
|
|
require.Equal(t, ditime.Time(1234571), v1)
|
|
redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12))
|
|
|
|
require.Nil(t, redisTest.Close())
|
|
require.Nil(t, r.Close())
|
|
}
|