package dinet import ( "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "src.dualinventive.com/go/dinet/ditime" "src.dualinventive.com/go/dinet/rpc" ) const repHostPort = "tcp://127.0.0.1:61234" func TestZMQTransportPollCloser(t *testing.T) { replier, err := NewConn(TransportZmq) require.Nil(t, err) require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort))) requester, err := NewConn(TransportZmq) require.Nil(t, err) require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort))) recvErrChan := make(chan error) go func() { req, err := replier.Recv() recvErrChan <- err require.Nil(t, replier.Send(req)) _, err = replier.Recv() recvErrChan <- err }() require.Nil(t, requester.Send(&rpc.Msg{ ID: 1, UserID: 1, ClassMethod: rpc.ClassMethodUserData, Type: rpc.MsgTypeRequest})) assert.Nil(t, <-recvErrChan) assert.Nil(t, replier.Close()) assert.NotNil(t, <-recvErrChan) assert.Nil(t, requester.Close()) } func TestZMQReconnect(t *testing.T) { replier, err := NewConn(TransportZmq) require.Nil(t, err) require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort))) err = replier.Reconnect() if err != nil { fmt.Println(err) } require.Nil(t, err) require.Nil(t, replier.Close()) } // TestZMQTransportErrFSM tests the implementation of the lazy-pirate-pattern func TestZMQTransportErrFSM(t *testing.T) { replier, err := NewConn(TransportZmq) require.Nil(t, err) require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort))) requester, err := NewConn(TransportZmq) require.Nil(t, err) require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort))) // send request require.Nil(t, requester.Send(&rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeRequest, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, })) // receive request _, err = replier.Recv() require.Nil(t, err) // reconnect replier to recover it from the invalid state err = replier.Reconnect() if err != nil { t.Logf("Error from reconnect: %s", err.Error()) // sometimes this fails, retry err = replier.Reconnect() } require.Nil(t, err) // no reply, we send another request which forces the invalid state of zmq require.Nil(t, requester.Send(&rpc.Msg{ Dinetrpc: rpc.CurrentDinetrpc, ID: 1, Time: ditime.Now(), Type: rpc.MsgTypeRequest, ClassMethod: rpc.ClassMethodDevicePing, DeviceUID: testUID, })) // receive request m, err := replier.Recv() require.Nil(t, err) // send reply require.Nil(t, replier.Send(m)) // receive reply _, err = requester.Recv() require.Nil(t, err) // all ok, :-) require.Nil(t, requester.Close()) require.Nil(t, replier.Close()) }