src.dualinventive.com/go/dinet/router_test.go

248 lines
5.8 KiB
Go

package dinet
import (
"sync"
"testing"
"time"
"src.dualinventive.com/go/dinet/ditime"
"src.dualinventive.com/go/dinet/rpc"
"src.dualinventive.com/go/lib/dilog"
"github.com/stretchr/testify/require"
)
const testUID = "DEADBEEFCAFEBABEDEADBEEFCAFEBABE"
func TestRouterSend(t *testing.T) {
logger := dilog.NewTestLogger(t)
conn, err := NewDeviceConn(TransportTest)
require.Nil(t, err)
tt := conn.(*TestTransport)
rtr, err := NewRouter(logger, conn)
require.Nil(t, err)
msg := rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
Time: ditime.Now(),
Type: rpc.MsgTypePublish,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}
require.Nil(t, conn.Connect("dont care"))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := rtr.Send(&msg)
require.Nil(t, err)
}()
rx := <-tt.Read
require.Equal(t, msg.Dinetrpc, rx.Dinetrpc)
require.Equal(t, msg.ID, rx.ID)
require.Equal(t, msg.Time, rx.Time)
require.Equal(t, msg.Type, rx.Type)
require.Equal(t, msg.ClassMethod, rx.ClassMethod)
require.Equal(t, msg.DeviceUID, rx.DeviceUID)
wg.Wait()
require.Nil(t, conn.Close())
}
func TestRouterSubscribeDevice(t *testing.T) {
logger := dilog.NewTestLogger(t)
testCbcounter := struct {
mu sync.Mutex
count int
}{}
conn, err := NewDeviceConn(TransportTest)
require.Nil(t, err)
tt := conn.(*TestTransport)
rtr, err := NewRouter(logger, conn)
require.Nil(t, err)
msg := &rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
Time: ditime.Now(),
Type: rpc.MsgTypePublish,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}
msg2 := &rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeReply,
ClassMethod: rpc.ClassMethodDeviceInfo,
DeviceUID: testUID,
}
require.Nil(t, conn.Connect("dont care"))
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg {
testCbcounter.mu.Lock()
testCbcounter.count++
testCbcounter.mu.Unlock()
require.NotNil(t, in)
return msg2
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := rtr.ListenAndServe()
require.NotNil(t, err)
require.Equal(t, ErrDisconnected, err)
}()
tt.Write <- msg
rx := <-tt.Read
testCbcounter.mu.Lock()
require.Equal(t, 1, testCbcounter.count)
testCbcounter.mu.Unlock()
require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc)
require.Equal(t, msg2.ID, rx.ID)
require.Equal(t, msg2.Time, rx.Time)
require.Equal(t, msg2.Type, rx.Type)
require.Equal(t, msg2.ClassMethod, rx.ClassMethod)
require.Equal(t, msg2.DeviceUID, rx.DeviceUID)
require.Nil(t, conn.Close())
wg.Wait()
}
func TestRouterSkipInvalidMessage(t *testing.T) {
logger := dilog.NewTestLogger(t)
conn, err := NewDeviceConn(TransportTest)
require.Nil(t, err)
tt := conn.(*TestTransport)
rtr, err := NewRouter(logger, conn)
require.Nil(t, err)
// Invalid msg (missing device:uid)
msg := &rpc.Msg{
Type: rpc.MsgTypeRequest,
ClassMethod: rpc.ClassMethodDevicePing,
}
require.Nil(t, conn.Connect("dont care"))
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg {
return &rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeReply,
ClassMethod: rpc.ClassMethodDeviceInfo,
DeviceUID: testUID,
}
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := rtr.ListenAndServe()
require.NotNil(t, err)
require.Equal(t, ErrDisconnected, err)
}()
// Send request and read an implicit reply with error code EProto
tt.Write <- msg
select {
case m := <-tt.Read:
require.NotNil(t, m.Error)
require.Equal(t, rpc.EProto, m.Error.Code)
case <-time.After(time.Millisecond * 500):
require.Fail(t, "go no EProto error reply")
}
require.Nil(t, conn.Close())
wg.Wait()
}
func TestRouterSubscribeRequest(t *testing.T) {
logger := dilog.NewTestLogger(t)
testCbcounter := struct {
mu sync.Mutex
count int
}{}
conn, err := NewDeviceConn(TransportTest)
require.Nil(t, err)
tt := conn.(*TestTransport)
rtr, err := NewRouter(logger, conn)
require.Nil(t, err)
msg := &rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeRequest,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}
msg2 := &rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeReply,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}
require.Nil(t, conn.Connect("dont care"))
rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeRequest, func(in *rpc.Msg) *rpc.Msg {
require.NotNil(t, in)
testCbcounter.mu.Lock()
defer testCbcounter.mu.Unlock()
testCbcounter.count++
return msg2
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := rtr.ListenAndServe()
require.NotNil(t, err)
require.Equal(t, ErrDisconnected, err)
}()
tt.Write <- msg
rx := <-tt.Read
testCbcounter.mu.Lock()
require.Equal(t, 1, testCbcounter.count)
testCbcounter.count = 0
testCbcounter.mu.Unlock()
require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc)
require.Equal(t, msg2.ID, rx.ID)
require.Equal(t, msg2.Time, rx.Time)
require.Equal(t, msg2.Type, rx.Type)
require.Equal(t, msg2.ClassMethod, rx.ClassMethod)
require.Equal(t, msg2.DeviceUID, rx.DeviceUID)
// There is no sensor:info callback, but we expect a response though
msg.ClassMethod = rpc.ClassMethodSensorInfo
tt.Write <- msg
rxerr := <-tt.Read
testCbcounter.mu.Lock()
// callback must not be called
require.Equal(t, 0, testCbcounter.count)
testCbcounter.mu.Unlock()
require.Equal(t, msg.ClassMethod, rxerr.ClassMethod)
require.Equal(t, msg.ID, rxerr.ID)
require.Equal(t, rpc.MsgTypeReply, rxerr.Type)
require.Equal(t, rpc.EOpnotsupp, rxerr.Error.Code)
require.Nil(t, conn.Close())
wg.Wait()
}