package dinet import ( "sync" "testing" "time" "src.dualinventive.com/go/dinet/ditime" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/lib/dilog" "github.com/stretchr/testify/require" ) const testUID = "DEADBEEFCAFEBABEDEADBEEFCAFEBABE" func TestRouterSend(t *testing.T) { logger := dilog.NewTestLogger(t) conn, err := NewDeviceConn(TransportTest) require.Nil(t, err) tt := conn.(*TestTransport) rtr, err := NewRouter(logger, conn) require.Nil(t, err) msg := rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, Time: ditime.Now(), Type: rpc.MsgTypePublish, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, } require.Nil(t, conn.Connect("dont care")) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := rtr.Send(&msg) require.Nil(t, err) }() rx := <-tt.Read require.Equal(t, msg.Dinetrpc, rx.Dinetrpc) require.Equal(t, msg.ID, rx.ID) require.Equal(t, msg.Time, rx.Time) require.Equal(t, msg.Type, rx.Type) require.Equal(t, msg.ClassMethod, rx.ClassMethod) require.Equal(t, msg.DeviceUID, rx.DeviceUID) wg.Wait() require.Nil(t, conn.Close()) } func TestRouterSubscribeDevice(t *testing.T) { logger := dilog.NewTestLogger(t) testCbcounter := struct { mu sync.Mutex count int }{} conn, err := NewDeviceConn(TransportTest) require.Nil(t, err) tt := conn.(*TestTransport) rtr, err := NewRouter(logger, conn) require.Nil(t, err) msg := &rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, Time: ditime.Now(), Type: rpc.MsgTypePublish, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, } msg2 := &rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeReply, ClassMethod: rpc.ClassMethodDeviceInfo, DeviceUID: testUID, } require.Nil(t, conn.Connect("dont care")) rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg { testCbcounter.mu.Lock() testCbcounter.count++ testCbcounter.mu.Unlock() require.NotNil(t, in) return msg2 }) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := rtr.ListenAndServe() require.NotNil(t, err) require.Equal(t, ErrDisconnected, err) }() tt.Write <- msg rx := <-tt.Read testCbcounter.mu.Lock() require.Equal(t, 1, testCbcounter.count) testCbcounter.mu.Unlock() require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc) require.Equal(t, msg2.ID, rx.ID) require.Equal(t, msg2.Time, rx.Time) require.Equal(t, msg2.Type, rx.Type) require.Equal(t, msg2.ClassMethod, rx.ClassMethod) require.Equal(t, msg2.DeviceUID, rx.DeviceUID) require.Nil(t, conn.Close()) wg.Wait() } func TestRouterSkipInvalidMessage(t *testing.T) { logger := dilog.NewTestLogger(t) conn, err := NewDeviceConn(TransportTest) require.Nil(t, err) tt := conn.(*TestTransport) rtr, err := NewRouter(logger, conn) require.Nil(t, err) // Invalid msg (missing device:uid) msg := &rpc.Msg{ Type: rpc.MsgTypeRequest, ClassMethod: rpc.ClassMethodDevicePing, } require.Nil(t, conn.Connect("dont care")) rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeAny, func(in *rpc.Msg) *rpc.Msg { return &rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeReply, ClassMethod: rpc.ClassMethodDeviceInfo, DeviceUID: testUID, } }) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := rtr.ListenAndServe() require.NotNil(t, err) require.Equal(t, ErrDisconnected, err) }() // Send request and read an implicit reply with error code EProto tt.Write <- msg select { case m := <-tt.Read: require.NotNil(t, m.Error) require.Equal(t, rpc.EProto, m.Error.Code) case <-time.After(time.Millisecond * 500): require.Fail(t, "go no EProto error reply") } require.Nil(t, conn.Close()) wg.Wait() } func TestRouterSubscribeRequest(t *testing.T) { logger := dilog.NewTestLogger(t) testCbcounter := struct { mu sync.Mutex count int }{} conn, err := NewDeviceConn(TransportTest) require.Nil(t, err) tt := conn.(*TestTransport) rtr, err := NewRouter(logger, conn) require.Nil(t, err) msg := &rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeRequest, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, } msg2 := &rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeReply, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, } require.Nil(t, conn.Connect("dont care")) rtr.Subscribe(testUID, rpc.ClassMethodDevicePing, rpc.MsgTypeRequest, func(in *rpc.Msg) *rpc.Msg { require.NotNil(t, in) testCbcounter.mu.Lock() defer testCbcounter.mu.Unlock() testCbcounter.count++ return msg2 }) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := rtr.ListenAndServe() require.NotNil(t, err) require.Equal(t, ErrDisconnected, err) }() tt.Write <- msg rx := <-tt.Read testCbcounter.mu.Lock() require.Equal(t, 1, testCbcounter.count) testCbcounter.count = 0 testCbcounter.mu.Unlock() require.Equal(t, msg2.Dinetrpc, rx.Dinetrpc) require.Equal(t, msg2.ID, rx.ID) require.Equal(t, msg2.Time, rx.Time) require.Equal(t, msg2.Type, rx.Type) require.Equal(t, msg2.ClassMethod, rx.ClassMethod) require.Equal(t, msg2.DeviceUID, rx.DeviceUID) // There is no sensor:info callback, but we expect a response though msg.ClassMethod = rpc.ClassMethodSensorInfo tt.Write <- msg rxerr := <-tt.Read testCbcounter.mu.Lock() // callback must not be called require.Equal(t, 0, testCbcounter.count) testCbcounter.mu.Unlock() require.Equal(t, msg.ClassMethod, rxerr.ClassMethod) require.Equal(t, msg.ID, rxerr.ID) require.Equal(t, rpc.MsgTypeReply, rxerr.Type) require.Equal(t, rpc.EOpnotsupp, rxerr.Error.Code) require.Nil(t, conn.Close()) wg.Wait() }