108 lines
2.8 KiB
Go
108 lines
2.8 KiB
Go
package dinet
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
)
|
|
|
|
const repHostPort = "tcp://127.0.0.1:61234"
|
|
|
|
func TestZMQTransportPollCloser(t *testing.T) {
|
|
replier, err := NewConn(TransportZmq)
|
|
require.Nil(t, err)
|
|
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
|
|
|
|
requester, err := NewConn(TransportZmq)
|
|
require.Nil(t, err)
|
|
require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort)))
|
|
|
|
recvErrChan := make(chan error)
|
|
go func() {
|
|
req, err := replier.Recv()
|
|
recvErrChan <- err
|
|
require.Nil(t, replier.Send(req))
|
|
_, err = replier.Recv()
|
|
recvErrChan <- err
|
|
}()
|
|
|
|
require.Nil(t, requester.Send(&rpc.Msg{
|
|
ID: 1,
|
|
UserID: 1,
|
|
ClassMethod: rpc.ClassMethodUserData,
|
|
Type: rpc.MsgTypeRequest}))
|
|
assert.Nil(t, <-recvErrChan)
|
|
assert.Nil(t, replier.Close())
|
|
assert.NotNil(t, <-recvErrChan)
|
|
assert.Nil(t, requester.Close())
|
|
}
|
|
|
|
func TestZMQReconnect(t *testing.T) {
|
|
replier, err := NewConn(TransportZmq)
|
|
require.Nil(t, err)
|
|
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
|
|
err = replier.Reconnect()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
require.Nil(t, err)
|
|
require.Nil(t, replier.Close())
|
|
}
|
|
|
|
// TestZMQTransportErrFSM tests the implementation of the lazy-pirate-pattern
|
|
func TestZMQTransportErrFSM(t *testing.T) {
|
|
replier, err := NewConn(TransportZmq)
|
|
require.Nil(t, err)
|
|
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
|
|
|
|
requester, err := NewConn(TransportZmq)
|
|
require.Nil(t, err)
|
|
require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort)))
|
|
|
|
// send request
|
|
require.Nil(t, requester.Send(&rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeRequest,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}))
|
|
// receive request
|
|
_, err = replier.Recv()
|
|
require.Nil(t, err)
|
|
// reconnect replier to recover it from the invalid state
|
|
err = replier.Reconnect()
|
|
if err != nil {
|
|
t.Logf("Error from reconnect: %s", err.Error())
|
|
// sometimes this fails, retry
|
|
err = replier.Reconnect()
|
|
}
|
|
|
|
require.Nil(t, err)
|
|
// no reply, we send another request which forces the invalid state of zmq
|
|
require.Nil(t, requester.Send(&rpc.Msg{
|
|
Dinetrpc: rpc.CurrentDinetrpc,
|
|
ID: 1,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeRequest,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
DeviceUID: testUID,
|
|
}))
|
|
// receive request
|
|
m, err := replier.Recv()
|
|
require.Nil(t, err)
|
|
// send reply
|
|
require.Nil(t, replier.Send(m))
|
|
// receive reply
|
|
_, err = requester.Recv()
|
|
require.Nil(t, err)
|
|
// all ok, :-)
|
|
require.Nil(t, requester.Close())
|
|
require.Nil(t, replier.Close())
|
|
}
|