package reader import ( "context" "errors" "sync" "testing" "time" "github.com/stretchr/testify/require" "src.dualinventive.com/go/dinet/ditime" "src.dualinventive.com/go/dinet/rpc" "src.dualinventive.com/go/lib/dilog" "src.dualinventive.com/go/redis-proxy/internal/zmqtest" ) type chanRPCWriter struct { c chan *rpc.Msg } func (w chanRPCWriter) Send(m *rpc.Msg) error { select { case w.c <- m: default: } return nil } func (w chanRPCWriter) SendBytes(b []byte) error { return errors.New("not supported") } func TestMessage(t *testing.T) { var wg sync.WaitGroup logger := dilog.NewTestLogger(t) w := chanRPCWriter{c: make(chan *rpc.Msg)} pubConn, zmqURL := zmqtest.New(t) reader, err := New(logger, zmqURL, w) require.Nil(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Serve the messages wg.Add(1) go func() { defer wg.Done() reader.Serve(ctx) }() // Keep sending publish messages wg.Add(1) go func() { defer wg.Done() for { time.Sleep(100 * time.Millisecond) if ctx.Err() != nil { return } require.Nil(t, pubConn.Send(&rpc.Msg{ DeviceUID: "01234567890123456789012345678901", ID: 3, Time: ditime.Now(), Type: rpc.MsgTypePublish, ClassMethod: rpc.ClassMethodDevicePing, })) } }() // Wait until we have a single message because pub/sub connections may drop messages <-w.c // Stop sending messages & stop listening cancel() // Wait for the go routines to finish wg.Wait() }