src.dualinventive.com/go/redis-proxy/internal/reader/reader_test.go

75 lines
1.5 KiB
Go

package reader
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"src.dualinventive.com/go/dinet/ditime"
"src.dualinventive.com/go/dinet/rpc"
"src.dualinventive.com/go/lib/dilog"
"src.dualinventive.com/go/redis-proxy/internal/zmqtest"
)
type chanRPCWriter struct {
c chan *rpc.Msg
}
func (w chanRPCWriter) Send(m *rpc.Msg) error {
select {
case w.c <- m:
default:
}
return nil
}
func (w chanRPCWriter) SendBytes(b []byte) error {
return errors.New("not supported")
}
func TestMessage(t *testing.T) {
var wg sync.WaitGroup
logger := dilog.NewTestLogger(t)
w := chanRPCWriter{c: make(chan *rpc.Msg)}
pubConn, zmqURL := zmqtest.New(t)
reader, err := New(logger, zmqURL, w)
require.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Serve the messages
wg.Add(1)
go func() {
defer wg.Done()
reader.Serve(ctx)
}()
// Keep sending publish messages
wg.Add(1)
go func() {
defer wg.Done()
for {
time.Sleep(100 * time.Millisecond)
if ctx.Err() != nil {
return
}
require.Nil(t, pubConn.Send(&rpc.Msg{
DeviceUID: "01234567890123456789012345678901",
ID: 3,
Time: ditime.Now(),
Type: rpc.MsgTypePublish,
ClassMethod: rpc.ClassMethodDevicePing,
}))
}
}()
// Wait until we have a single message because pub/sub connections may drop messages
<-w.c
// Stop sending messages & stop listening
cancel()
// Wait for the go routines to finish
wg.Wait()
}