package redis import ( "context" "encoding/json" "sync" "testing" "time" "github.com/stretchr/testify/require" "src.dualinventive.com/go/dinet/ditime" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/dinet/rts" "src.dualinventive.com/go/redis-proxy/internal/redistest" ) type cleanupFunc func() func setup(t *testing.T, msg string) (cleanup cleanupFunc, redisTest *redistest.Redis) { redisTest = redistest.New(t) r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 1, time.Second) require.Nil(t, err) var rpcMsg rpc.Msg err = json.Unmarshal([]byte(msg), &rpcMsg) require.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() r.Process(ctx) }() err = r.Send(&rpcMsg) require.Nil(t, err) // Message is delivered. We can close the process task cancel() wg.Wait() return func() { require.Nil(t, redisTest.Close()) require.Nil(t, r.Close()) }, redisTest } func TestDeviceSensorDataMsg(t *testing.T) { strMsg := `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":1, "value":129873412, "time":1234571 },{ "uid":2, "value":"string value", "time":1234572 },{ "uid":3, "value":{"a":"b"}, "time":1234573 }]}` cleanup, redisTest := setup(t, strMsg) defer cleanup() require.True(t, redisTest.WaitForChange(500*time.Millisecond)) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" redisTest.FieldUint64(t, key, "last_update") redisTest.FieldExist(t, key, "sensor:1:data") redisTest.FieldExist(t, key, "sensor:2:data") redisTest.FieldExist(t, key, "sensor:3:data") require.False(t, redisTest.GPSRecordExists(t, "00133410270056001251343236363736")) } func TestDeviceSensorGPSDataMsg(t *testing.T) { testCases := []struct { name string msg string expect interface{} GPSExists bool }{ { "valid GPS", `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":13, "value":{ "fix": true, "hdop": 2.799999952316284, "latitude": 51.5856781, "longitude": 5.1952634, "siv": 0 }, "time":1234573 }] }`, map[string]interface{}{ "fix": true, "hdop": 2.799999952316284, "latitude": 51.5856781, "longitude": 5.1952634, "siv": float64(0), }, true, }, { "invalid GPS", `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":13, "value":5, "time":1234573 }] }`, float64(5), false, }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { cleanup, redisTest := setup(t, tc.msg) defer cleanup() require.True(t, redisTest.WaitForChange(500*time.Millisecond)) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" redisTest.FieldUint64(t, key, "last_update") data := redisTest.FieldJSONExist(t, key, "sensor:13:data", "value") require.Equal(t, tc.expect, data) require.Equal(t, tc.GPSExists, redisTest.GPSRecordExists(t, "00133410270056001251343236363736")) }) } } func TestDeviceDataInvalidMsg(t *testing.T) { testCases := []struct { name string msg string }{ { "GPS msg invalid device UID", `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"1337", "time":1234570, "result":[{ "uid":13, "value":{ "fix": true, "hdop": 2.799999952316284, "latitude": 51.5856781, "longitude": 5.1952634, "siv": 0 }, "time":1234573 }] }`, }, { "invalid device UID", `{ "dinetrpc":1, "pub":"device:info", "device:uid":"1337", "time":1234570, "result":[{ "revision": 0, "time": 1511865759485, "type": "zkl-3000-rc", "version": "wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181" }] }`, }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { cleanup, redisTest := setup(t, tc.msg) defer cleanup() require.False(t, redisTest.WaitForChange(500*time.Millisecond)) }) } } func TestDeviceDevicePingMsg(t *testing.T) { strMsg := `{ "dinetrpc":1, "pub":"device:ping", "device:uid":"00133410270056001251343236363736", "time":1234570 }` cleanup, redisTest := setup(t, strMsg) defer cleanup() require.True(t, redisTest.WaitForChange(500*time.Millisecond)) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" redisTest.FieldUint64(t, key, "last_update") redisTest.FieldExist(t, key, "device:ping") } func TestDeviceDeviceInfoMsg(t *testing.T) { testCases := []struct { name string msg string field string json string expect interface{} }{ { "contains time", `{ "dinetrpc":1, "pub":"device:info", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "revision": 0, "time": 1511865759485, "type": "zkl-3000-rc", "version": "wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181" }] }`, "device:info", "version", `wcpu:2.00-20150309-B900;mcu:2.10-20160425-0004;swd:4.00-20130513-0180;swm:4.00-20130513-0181`, }, { "missing time", `{ "dinetrpc":1, "pub":"sensor:info", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "label": "bat1-voltage", "type": "number", "uid": 1 }] }`, "sensor:1:info", "time", float64(1234570), }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { cleanup, redisTest := setup(t, tc.msg) defer cleanup() require.True(t, redisTest.WaitForChange(500*time.Millisecond)) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" data := redisTest.FieldJSONExist(t, key, tc.field, tc.json) require.Equal(t, tc.expect, data) }) } } func TestRemoveInfoFields(t *testing.T) { redisTest := redistest.New(t) r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second) require.Nil(t, err) msg := `{ "dinetrpc":1, "rep":"config:info", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "default": "di-tcp.dualinventive.com", "label": "dncm-tcp-host", "time": 92715, "type": "string", "uid": 100 }, { "default": "4030", "label": "dncm-tcp-port", "time": 92715, "type": "string", "uid": 101 }, { "default": "40", "label": "test2", "time": 92715, "type": "string", "uid": 102 }, { "default": "30", "label": "test3", "time": 92715, "type": "string", "uid": 103 }, { "default": "32", "label": "test4", "time": 92715, "type": "string", "uid": 104 }] }` var rpcMsg rpc.Msg err = json.Unmarshal([]byte(msg), &rpcMsg) require.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() r.Process(ctx) }() // Send first config err = r.Send(&rpcMsg) require.Nil(t, err) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" require.True(t, redisTest.WaitForChange(500*time.Millisecond)) redisTest.FieldExist(t, key, "config:100:info") redisTest.FieldExist(t, key, "config:101:info") redisTest.FieldExist(t, key, "config:102:info") redisTest.FieldExist(t, key, "config:103:info") redisTest.FieldExist(t, key, "config:104:info") msg = `{ "dinetrpc":1, "rep":"sensor:info", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "label": "bat1-voltage", "type": "number", "uid": 1 }] }` var rpcMsg2 rpc.Msg err = json.Unmarshal([]byte(msg), &rpcMsg2) require.Nil(t, err) // Send sensor info (this data should stay persistent) err = r.Send(&rpcMsg2) require.Nil(t, err) require.True(t, redisTest.WaitForChange(500*time.Millisecond)) redisTest.FieldExist(t, key, "config:100:info") redisTest.FieldExist(t, key, "config:101:info") redisTest.FieldExist(t, key, "config:102:info") redisTest.FieldExist(t, key, "config:103:info") redisTest.FieldExist(t, key, "config:104:info") redisTest.FieldExist(t, key, "sensor:1:info") msg = `{ "dinetrpc":1, "rep":"config:info", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "default": "di-tcp.dualinventive.com", "label": "dncm-tcp-host", "time": 92715, "type": "string", "uid": 100 }, { "default": "40", "label": "test2", "time": 92715, "type": "string", "uid": 102 }, { "default": "30", "label": "test3", "time": 92715, "type": "string", "uid": 103 }] }` var rpcMsg3 rpc.Msg err = json.Unmarshal([]byte(msg), &rpcMsg3) require.Nil(t, err) // Send second config (this request is missing config:101:info and config:104:info) err = r.Send(&rpcMsg3) require.Nil(t, err) cancel() wg.Wait() require.True(t, redisTest.WaitForChange(500*time.Millisecond)) redisTest.FieldExist(t, key, "config:100:info") // This key should be removed now redisTest.FieldNotExist(t, key, "config:101:info") redisTest.FieldExist(t, key, "config:102:info") redisTest.FieldExist(t, key, "config:103:info") // This key should be removed now redisTest.FieldNotExist(t, key, "config:104:info") // This is another class, so this should still exist redisTest.FieldExist(t, key, "sensor:1:info") require.Nil(t, redisTest.Close()) require.Nil(t, r.Close()) } func TestIgnoreOldData(t *testing.T) { redisTest := redistest.New(t) r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second) require.Nil(t, err) strMsg := `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":1, "value":129873412, "time":1234571 },{ "uid":2, "value":"string value" }]}` var rpcMsg rpc.Msg err = json.Unmarshal([]byte(strMsg), &rpcMsg) require.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() r.Process(ctx) }() // Send first sensor data err = r.Send(&rpcMsg) require.Nil(t, err) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" require.True(t, redisTest.WaitForChange(500*time.Millisecond)) r.cacheMu.Lock() v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data")) v2, ok2 := r.cache.Get(r.cacheKey(key, "sensor:2:data")) r.cacheMu.Unlock() require.True(t, ok1) require.True(t, ok2) require.Equal(t, ditime.Time(1234571), v1) require.Equal(t, ditime.Time(1234570), v2) val := redisTest.FieldJSONExist(t, key, "sensor:1:data", "value") require.InDelta(t, float64(129873412), val, 0.1) redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value") strMsg = `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234569, "result":[{ "uid":1, "value":12, "time":1234572 },{ "uid":2, "value":"a new value" }]}` var rpcMsg2 rpc.Msg err = json.Unmarshal([]byte(strMsg), &rpcMsg2) require.Nil(t, err) // Send new device data (sensor 1 contains a newer value, sensor 2 contains a old value) err = r.Send(&rpcMsg2) require.Nil(t, err) require.True(t, redisTest.WaitForChange(500*time.Millisecond)) r.cacheMu.Lock() v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data")) v2, ok2 = r.cache.Get(r.cacheKey(key, "sensor:2:data")) r.cacheMu.Unlock() require.True(t, ok1) require.True(t, ok2) cancel() wg.Wait() // Updated require.Equal(t, ditime.Time(1234572), v1) redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12)) // Not updated require.Equal(t, ditime.Time(1234570), v2) redisTest.FieldJSONKeyValue(t, key, "sensor:2:data", "value", "string value") v := redisTest.FieldJSONExist(t, key, "sensor:2:data", "last_update") f, okf := v.(float64) require.True(t, okf) require.InDelta(t, f, float64(ditime.Now()), 50000) require.Nil(t, redisTest.Close()) require.Nil(t, r.Close()) } func TestOverwriteSameTimestampData(t *testing.T) { redisTest := redistest.New(t) r, err := New(redisTest.Logger, redisTest.RedisPorts(), 10*time.Second, 6, 10*time.Second) require.Nil(t, err) strMsg := `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":1, "value":129873412, "time":1234571 }]}` var rpcMsg rpc.Msg err = json.Unmarshal([]byte(strMsg), &rpcMsg) require.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() r.Process(ctx) }() // Send first sensor data err = r.Send(&rpcMsg) require.Nil(t, err) key := rts.KeyPrefixDevice + "00133410270056001251343236363736" require.True(t, redisTest.WaitForChange(500*time.Millisecond)) r.cacheMu.Lock() v1, ok1 := r.cache.Get(r.cacheKey(key, "sensor:1:data")) r.cacheMu.Unlock() require.True(t, ok1) require.Equal(t, ditime.Time(1234571), v1) redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(129873412)) strMsg = `{ "dinetrpc":1, "pub":"sensor:data", "device:uid":"00133410270056001251343236363736", "time":1234570, "result":[{ "uid":1, "value":12, "time":1234571 }]}` var rpcMsg2 rpc.Msg err = json.Unmarshal([]byte(strMsg), &rpcMsg2) require.Nil(t, err) // Send new device data. The value is changed, but the time is the same err = r.Send(&rpcMsg2) require.Nil(t, err) require.True(t, redisTest.WaitForChange(500*time.Millisecond)) r.cacheMu.Lock() v1, ok1 = r.cache.Get(r.cacheKey(key, "sensor:1:data")) r.cacheMu.Unlock() require.True(t, ok1) cancel() wg.Wait() // Updated require.Equal(t, ditime.Time(1234571), v1) redisTest.FieldJSONKeyValue(t, key, "sensor:1:data", "value", float64(12)) require.Nil(t, redisTest.Close()) require.Nil(t, r.Close()) }