src.dualinventive.com/go/dinet/transport_zmq_linux_test.go

108 lines
2.8 KiB
Go

package dinet
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"src.dualinventive.com/go/dinet/ditime"
"src.dualinventive.com/go/dinet/rpc"
)
const repHostPort = "tcp://127.0.0.1:61234"
func TestZMQTransportPollCloser(t *testing.T) {
replier, err := NewConn(TransportZmq)
require.Nil(t, err)
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
requester, err := NewConn(TransportZmq)
require.Nil(t, err)
require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort)))
recvErrChan := make(chan error)
go func() {
req, err := replier.Recv()
recvErrChan <- err
require.Nil(t, replier.Send(req))
_, err = replier.Recv()
recvErrChan <- err
}()
require.Nil(t, requester.Send(&rpc.Msg{
ID: 1,
UserID: 1,
ClassMethod: rpc.ClassMethodUserData,
Type: rpc.MsgTypeRequest}))
assert.Nil(t, <-recvErrChan)
assert.Nil(t, replier.Close())
assert.NotNil(t, <-recvErrChan)
assert.Nil(t, requester.Close())
}
func TestZMQReconnect(t *testing.T) {
replier, err := NewConn(TransportZmq)
require.Nil(t, err)
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
err = replier.Reconnect()
if err != nil {
fmt.Println(err)
}
require.Nil(t, err)
require.Nil(t, replier.Close())
}
// TestZMQTransportErrFSM tests the implementation of the lazy-pirate-pattern
func TestZMQTransportErrFSM(t *testing.T) {
replier, err := NewConn(TransportZmq)
require.Nil(t, err)
require.Nil(t, replier.Connect(fmt.Sprintf("%s?type=rep&bind=true", repHostPort)))
requester, err := NewConn(TransportZmq)
require.Nil(t, err)
require.Nil(t, requester.Connect(fmt.Sprintf("%s?type=req", repHostPort)))
// send request
require.Nil(t, requester.Send(&rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeRequest,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}))
// receive request
_, err = replier.Recv()
require.Nil(t, err)
// reconnect replier to recover it from the invalid state
err = replier.Reconnect()
if err != nil {
t.Logf("Error from reconnect: %s", err.Error())
// sometimes this fails, retry
err = replier.Reconnect()
}
require.Nil(t, err)
// no reply, we send another request which forces the invalid state of zmq
require.Nil(t, requester.Send(&rpc.Msg{
Dinetrpc: rpc.CurrentDinetrpc,
ID: 1,
Time: ditime.Now(),
Type: rpc.MsgTypeRequest,
ClassMethod: rpc.ClassMethodDevicePing,
DeviceUID: testUID,
}))
// receive request
m, err := replier.Recv()
require.Nil(t, err)
// send reply
require.Nil(t, replier.Send(m))
// receive reply
_, err = requester.Recv()
require.Nil(t, err)
// all ok, :-)
require.Nil(t, requester.Close())
require.Nil(t, replier.Close())
}