248 lines
5.8 KiB
Go
248 lines
5.8 KiB
Go
package dinet
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
"src.dualinventive.com/go/lib/dilog"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const testUID = "DEADBEEFCAFEBABEDEADBEEFCAFEBABE"
|
|
|
|
func TestRouterSend(t *testing.T) {
|
|
logger := dilog.NewTestLogger(t)
|
|
conn, err := NewDeviceConn(TransportTest)
|
|
require.Nil(t, err)
|
|
tt := conn.(*TestTransport)
|
|
|
|
rtr, err := NewRouter(logger, conn)
|
|
require.Nil(t, err)
|
|
|
|
msg := rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypePublish,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}
|
|
require.Nil(t, conn.Connect("dont care"))
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := rtr.Send(&msg)
|
|
require.Nil(t, err)
|
|
}()
|
|
rx := <-tt.Read
|
|
require.Equal(t, msg.Dinetrpc, rx.Dinetrpc)
|
|
require.Equal(t, msg.ID, rx.ID)
|
|
require.Equal(t, msg.Time, rx.Time)
|
|
require.Equal(t, msg.Type, rx.Type)
|
|
require.Equal(t, msg.ClassMethod, rx.ClassMethod)
|
|
require.Equal(t, msg.DeviceUID, rx.DeviceUID)
|
|
wg.Wait()
|
|
|
|
require.Nil(t, conn.Close())
|
|
}
|
|
|
|
func TestRouterSubscribeDevice(t *testing.T) {
|
|
logger := dilog.NewTestLogger(t)
|
|
testCbcounter := struct {
|
|
mu sync.Mutex
|
|
count int
|
|
}{}
|
|
conn, err := NewDeviceConn(TransportTest)
|
|
require.Nil(t, err)
|
|
tt := conn.(*TestTransport)
|
|
|
|
rtr, err := NewRouter(logger, conn)
|
|
require.Nil(t, err)
|
|
|
|
msg := &rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypePublish,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}
|
|
msg2 := &rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeReply,
|
|
ClassMethod: rpc.ClassMethodDeviceInfo,
|
|
DeviceUID: testUID,
|
|
}
|
|
|
|
require.Nil(t, conn.Connect("dont care"))
|
|
|
|
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg {
|
|
testCbcounter.mu.Lock()
|
|
testCbcounter.count++
|
|
testCbcounter.mu.Unlock()
|
|
require.NotNil(t, in)
|
|
return msg2
|
|
})
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := rtr.ListenAndServe()
|
|
require.NotNil(t, err)
|
|
require.Equal(t, ErrDisconnected, err)
|
|
}()
|
|
|
|
tt.Write <- msg
|
|
rx := <-tt.Read
|
|
testCbcounter.mu.Lock()
|
|
require.Equal(t, 1, testCbcounter.count)
|
|
testCbcounter.mu.Unlock()
|
|
require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc)
|
|
require.Equal(t, msg2.ID, rx.ID)
|
|
require.Equal(t, msg2.Time, rx.Time)
|
|
require.Equal(t, msg2.Type, rx.Type)
|
|
require.Equal(t, msg2.ClassMethod, rx.ClassMethod)
|
|
require.Equal(t, msg2.DeviceUID, rx.DeviceUID)
|
|
|
|
require.Nil(t, conn.Close())
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestRouterSkipInvalidMessage(t *testing.T) {
|
|
logger := dilog.NewTestLogger(t)
|
|
conn, err := NewDeviceConn(TransportTest)
|
|
require.Nil(t, err)
|
|
tt := conn.(*TestTransport)
|
|
|
|
rtr, err := NewRouter(logger, conn)
|
|
require.Nil(t, err)
|
|
|
|
// Invalid msg (missing device:uid)
|
|
msg := &rpc.Msg{
|
|
Type: rpc.MsgTypeRequest,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
}
|
|
|
|
require.Nil(t, conn.Connect("dont care"))
|
|
|
|
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg {
|
|
return &rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeReply,
|
|
ClassMethod: rpc.ClassMethodDeviceInfo,
|
|
DeviceUID: testUID,
|
|
}
|
|
})
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := rtr.ListenAndServe()
|
|
require.NotNil(t, err)
|
|
require.Equal(t, ErrDisconnected, err)
|
|
}()
|
|
|
|
// Send request and read an implicit reply with error code EProto
|
|
tt.Write <- msg
|
|
select {
|
|
case m := <-tt.Read:
|
|
require.NotNil(t, m.Error)
|
|
require.Equal(t, rpc.EProto, m.Error.Code)
|
|
case <-time.After(time.Millisecond * 500):
|
|
require.Fail(t, "go no EProto error reply")
|
|
}
|
|
|
|
require.Nil(t, conn.Close())
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestRouterSubscribeRequest(t *testing.T) {
|
|
logger := dilog.NewTestLogger(t)
|
|
testCbcounter := struct {
|
|
mu sync.Mutex
|
|
count int
|
|
}{}
|
|
conn, err := NewDeviceConn(TransportTest)
|
|
require.Nil(t, err)
|
|
tt := conn.(*TestTransport)
|
|
rtr, err := NewRouter(logger, conn)
|
|
require.Nil(t, err)
|
|
|
|
msg := &rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeRequest,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}
|
|
msg2 := &rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeReply,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}
|
|
|
|
require.Nil(t, conn.Connect("dont care"))
|
|
|
|
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeRequest, func(in *rpc.Msg) *rpc.Msg {
|
|
require.NotNil(t, in)
|
|
testCbcounter.mu.Lock()
|
|
defer testCbcounter.mu.Unlock()
|
|
testCbcounter.count++
|
|
return msg2
|
|
})
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := rtr.ListenAndServe()
|
|
require.NotNil(t, err)
|
|
require.Equal(t, ErrDisconnected, err)
|
|
}()
|
|
|
|
tt.Write <- msg
|
|
rx := <-tt.Read
|
|
testCbcounter.mu.Lock()
|
|
require.Equal(t, 1, testCbcounter.count)
|
|
testCbcounter.count = 0
|
|
testCbcounter.mu.Unlock()
|
|
require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc)
|
|
require.Equal(t, msg2.ID, rx.ID)
|
|
require.Equal(t, msg2.Time, rx.Time)
|
|
require.Equal(t, msg2.Type, rx.Type)
|
|
require.Equal(t, msg2.ClassMethod, rx.ClassMethod)
|
|
require.Equal(t, msg2.DeviceUID, rx.DeviceUID)
|
|
|
|
// There is no sensor:info callback, but we expect a response though
|
|
msg.ClassMethod = rpc.ClassMethodSensorInfo
|
|
tt.Write <- msg
|
|
rxerr := <-tt.Read
|
|
testCbcounter.mu.Lock()
|
|
// callback must not be called
|
|
require.Equal(t, 0, testCbcounter.count)
|
|
testCbcounter.mu.Unlock()
|
|
|
|
require.Equal(t, msg.ClassMethod, rxerr.ClassMethod)
|
|
require.Equal(t, msg.ID, rxerr.ID)
|
|
require.Equal(t, rpc.MsgTypeReply, rxerr.Type)
|
|
require.Equal(t, rpc.EOpnotsupp, rxerr.Error.Code)
|
|
|
|
require.Nil(t, conn.Close())
|
|
wg.Wait()
|
|
}
|