74 lines
1.4 KiB
Go
74 lines
1.4 KiB
Go
package reader
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
|
|
"src.dualinventive.com/go/influxdb-logger/internal/zmqtest"
|
|
)
|
|
|
|
type chanRPCWriter struct {
|
|
c chan *rpc.Msg
|
|
}
|
|
|
|
func (w chanRPCWriter) Send(m *rpc.Msg) error {
|
|
select {
|
|
case w.c <- m:
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w chanRPCWriter) SendBytes(b []byte) error {
|
|
return errors.New("not supported")
|
|
}
|
|
|
|
func TestMessage(t *testing.T) {
|
|
var wg sync.WaitGroup
|
|
w := chanRPCWriter{c: make(chan *rpc.Msg)}
|
|
pubConn, zmqURL := zmqtest.New(t)
|
|
reader, err := New(zmqURL, w)
|
|
require.Nil(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Serve the messages
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
reader.Serve(ctx)
|
|
}()
|
|
// Keep sending publish messages
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
time.Sleep(100 * time.Millisecond)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
require.Nil(t, pubConn.Send(&rpc.Msg{
|
|
DeviceUID: "01234567890123456789012345678901",
|
|
ID: 3,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypePublish,
|
|
ClassMethod: rpc.ClassMethodDevicePing,
|
|
}))
|
|
}
|
|
}()
|
|
// Wait until we have a single message because pub/sub connections may drop messages
|
|
<-w.c
|
|
// Stop sending messages & stop listening
|
|
cancel()
|
|
// Wait for the go routines to finish
|
|
wg.Wait()
|
|
}
|