188 lines
5.2 KiB
Go
188 lines
5.2 KiB
Go
package wsconn
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
"src.dualinventive.com/go/websocketserver/internal/redis"
|
|
wsclnt "src.dualinventive.com/go/websocketserver/internal/wsconn/testutil"
|
|
)
|
|
|
|
// wsconnMockServer wraps a httptest server for use as a MTIWss stub
|
|
type mockWsconnServer struct {
|
|
url string
|
|
srv *httptest.Server
|
|
}
|
|
|
|
// getFreeAddr gets a free TCP address from the kernel
|
|
func getFreeAddr(t *testing.T) string {
|
|
l, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err, "listen on port 0")
|
|
defer l.Close()
|
|
return l.Addr().String()
|
|
}
|
|
|
|
// New creates a stubbed mtiwss endpoint on URI with ep as endpoint path
|
|
func newWsconnMockServer(f http.HandlerFunc) *mockWsconnServer {
|
|
t := &mockWsconnServer{}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/", f)
|
|
|
|
t.srv = httptest.NewServer(mux)
|
|
t.url = strings.Replace(t.srv.URL+"/", "http", "ws", -1)
|
|
|
|
return t
|
|
}
|
|
|
|
// Close graceful stops the mtiwss endpoint server
|
|
func (t *mockWsconnServer) Close() error {
|
|
t.srv.Close()
|
|
return nil
|
|
}
|
|
|
|
// URL returns the current MTIWss URL with endpoint
|
|
func (t *mockWsconnServer) URL() string {
|
|
return t.url
|
|
}
|
|
|
|
// TestConnectDisconnect will try to connect and disconnect with a websocket client
|
|
func TestConnectDisconnect(t *testing.T) {
|
|
// Start a HTTP server with a NewWsHandlerContext
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
addr := getFreeAddr(t)
|
|
|
|
mux := http.NewServeMux()
|
|
// NOTE: we don't pass a mtiwss requester as it is not used in this test
|
|
// no request are made when not sending from the client so this is safe
|
|
mux.HandleFunc("/", NewWsHandlerContext(ctx, nil))
|
|
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: mux,
|
|
}
|
|
|
|
defer srv.Close()
|
|
go srv.ListenAndServe()
|
|
|
|
// Connect and disconnect with a websocket client
|
|
clnt, err := wsclnt.NewClient("ws://" + addr)
|
|
require.Nil(t, err)
|
|
clnt.Close()
|
|
}
|
|
|
|
// TestPublishDeviceUpdate tests the public function for a single device update so all websocket clients
|
|
// receive a filtered update in DI-Net RPC realtime:data format
|
|
func TestPublishDeviceUpdate(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// newMockServer
|
|
srv := newWsconnMockServer(NewWsHandlerContext(ctx, nil))
|
|
defer srv.Close()
|
|
|
|
// Connect to the mockserver with a websocket client
|
|
clnt, err := wsclnt.NewClient(srv.URL())
|
|
require.Nil(t, err)
|
|
defer clnt.Close()
|
|
|
|
// Get the connect client from the list
|
|
clients.mu.RLock()
|
|
li := clients.list.Front()
|
|
clients.mu.RUnlock()
|
|
require.NotNil(t, li)
|
|
require.NotNil(t, li.Value)
|
|
c := li.Value.(*client)
|
|
|
|
// Set device field subscriptions
|
|
clients.mu.Lock()
|
|
c.deviceFields = map[string][]string{
|
|
"001fc23154ff65065182565553470187": {"device:info", "sensor:3:data"},
|
|
}
|
|
clients.mu.Unlock()
|
|
|
|
// Publish mocked redis event
|
|
ev := redis.Event{Chan: "device", Msg: "001fc23154ff65065182565553470187"}
|
|
values := map[string]string{
|
|
"device:info": "{}",
|
|
"sensor:3:data": "{}",
|
|
"sensor:5:data": "{}", // Add a filtered field with value for testing filterFields
|
|
}
|
|
|
|
Publish(ev, values)
|
|
|
|
// Read the RPC realtime:data message from the websocket
|
|
msg, result, err := clnt.ReadRPCMsg()
|
|
require.Nil(t, err)
|
|
require.NotNil(t, msg)
|
|
|
|
assert.Nil(t, msg.Error)
|
|
assert.NotEqual(t, 0, msg.Time)
|
|
assert.Equal(t, msg.Type, rpc.MsgTypePublish)
|
|
assert.Equal(t, msg.ClassMethod, rpc.ClassMethodRealtimeData)
|
|
assert.Equal(t, msg.DeviceUID, "001fc23154ff65065182565553470187")
|
|
assert.JSONEq(t, result, `{"device:info":{},"sensor:3:data":{}}`)
|
|
}
|
|
|
|
// TestPublishProjectUpdate tests the public function for project updates so all websocket clients
|
|
// receive a filtered update in DI-Net RPC realtime:data format
|
|
func TestPublishProjectUpdate(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// newMockServer
|
|
srv := newWsconnMockServer(NewWsHandlerContext(ctx, nil))
|
|
defer srv.Close()
|
|
|
|
// Connect to the mockserver with a websocket client
|
|
clnt, err := wsclnt.NewClient(srv.URL())
|
|
require.Nil(t, err)
|
|
defer clnt.Close()
|
|
|
|
// Get the connect client from the list
|
|
clients.mu.RLock()
|
|
li := clients.list.Front()
|
|
clients.mu.RUnlock()
|
|
require.NotNil(t, li)
|
|
require.NotNil(t, li.Value)
|
|
c := li.Value.(*client)
|
|
|
|
// Set device field subscriptions
|
|
clients.mu.Lock()
|
|
c.projectFields = map[uint64][]string{
|
|
10: {"last_update", "project:counter", "project:status"},
|
|
}
|
|
clients.mu.Unlock()
|
|
|
|
// Publish mocked redis event
|
|
ev := redis.Event{Chan: "project", Msg: "10"}
|
|
values := map[string]string{
|
|
"last_update": "1234",
|
|
"project:counter": "{}",
|
|
"project:status": "{}",
|
|
"project:info": "{}", // Add a filtered field with value for testing filterFields
|
|
}
|
|
Publish(ev, values)
|
|
|
|
// Read the RPC realtime:data message from the websocket
|
|
msg, result, err := clnt.ReadRPCMsg()
|
|
require.Nil(t, err)
|
|
require.NotNil(t, msg)
|
|
|
|
assert.Nil(t, msg.Error)
|
|
assert.NotEqual(t, 0, msg.Time)
|
|
assert.Equal(t, msg.Type, rpc.MsgTypePublish)
|
|
assert.Equal(t, msg.ClassMethod, rpc.ClassMethodRealtimeData)
|
|
assert.Equal(t, uint(10), msg.ProjectID)
|
|
assert.JSONEq(t, result, `{"last_update":1234,"project:counter":{},"project:status":{}}`)
|
|
}
|