214 lines
6.1 KiB
Go
214 lines
6.1 KiB
Go
package redistest
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
redigo "github.com/gomodule/redigo/redis"
|
|
"github.com/mna/redisc"
|
|
"github.com/mna/redisc/redistest"
|
|
"github.com/stretchr/objx"
|
|
"github.com/stretchr/testify/require"
|
|
"src.dualinventive.com/go/dinet/rts"
|
|
"src.dualinventive.com/go/lib/dilog"
|
|
)
|
|
|
|
// Redis contains a working redis cluster with functions to test data in Redis.
|
|
type Redis struct {
|
|
Logger dilog.Logger
|
|
redisPort []string
|
|
rc *redisc.Cluster
|
|
cleanup func()
|
|
wg sync.WaitGroup
|
|
event chan bool
|
|
isClosed chan bool
|
|
}
|
|
|
|
// New creates a new Redis cluster.
|
|
func New(t *testing.T) *Redis {
|
|
// Start redis cluster
|
|
rcClose, rcPorts := redistest.StartCluster(t, nil)
|
|
r := &Redis{
|
|
Logger: dilog.NewTestLogger(t),
|
|
redisPort: []string{"127.0.0.1:" + rcPorts[0]},
|
|
cleanup: rcClose,
|
|
event: make(chan bool, 1),
|
|
isClosed: make(chan bool),
|
|
}
|
|
r.rc = &redisc.Cluster{
|
|
StartupNodes: r.redisPort,
|
|
DialOptions: []redigo.DialOption{redigo.DialConnectTimeout(10 * time.Second)},
|
|
}
|
|
r.wg.Add(1)
|
|
go r.eventListener(t)
|
|
|
|
require.Nil(t, r.rc.Refresh())
|
|
return r
|
|
}
|
|
|
|
// RedisPorts returns the testing redis ports
|
|
func (r *Redis) RedisPorts() []string {
|
|
return r.redisPort
|
|
}
|
|
|
|
// getPubSubConn gets a new connection from the cluster and subscribes to ChanDevice and ChanProject
|
|
func (r *Redis) getPubSubConn() (*redigo.PubSubConn, error) {
|
|
conn := r.rc.Get()
|
|
|
|
psc := redigo.PubSubConn{Conn: conn}
|
|
if err := psc.Subscribe(rts.ChanDevice, rts.ChanProject); err != nil {
|
|
return nil, err
|
|
}
|
|
return &psc, nil
|
|
}
|
|
|
|
// eventListener listens for change events. This is necessary for the WaitForChange function
|
|
func (r *Redis) eventListener(t *testing.T) {
|
|
defer r.wg.Done()
|
|
|
|
psc, err := r.getPubSubConn()
|
|
require.Nil(t, err)
|
|
|
|
for {
|
|
switch psc.Receive().(type) {
|
|
case redigo.Message:
|
|
select {
|
|
case r.event <- true:
|
|
default:
|
|
}
|
|
case error:
|
|
// Check if servePublishEvents loop must be closed due to Close call
|
|
select {
|
|
case <-r.isClosed:
|
|
return
|
|
default:
|
|
}
|
|
// Reconnect a new PUBSUB connection on error
|
|
require.Nil(t, psc.Close())
|
|
time.Sleep(time.Second)
|
|
psc, err = r.getPubSubConn()
|
|
require.Nil(t, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ClearEvent resets the event flag. WaitForChange will block till a new event is raised (or on a timeout).
|
|
func (r *Redis) ClearEvent() {
|
|
select {
|
|
case <-r.event:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// WaitForChange blocks till a change is made in Redis (and return true) or when the timeout is reached (return false).
|
|
func (r *Redis) WaitForChange(timeout time.Duration) bool {
|
|
select {
|
|
case <-r.event:
|
|
return true
|
|
case <-time.After(timeout):
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (r *Redis) field(t *testing.T, key, field string) interface{} {
|
|
rply, err := r.do("HGET", key, field)
|
|
require.Nil(t, err)
|
|
return rply
|
|
}
|
|
|
|
// FieldUint64 gets the value for key and field and validates if it decodes into a uint64
|
|
func (r *Redis) FieldUint64(t *testing.T, key, field string) uint64 {
|
|
u64, err := redigo.Uint64(r.do("HGET", key, field))
|
|
require.Nil(t, err)
|
|
return u64
|
|
}
|
|
|
|
// FieldExist checks if the field exist in redis via HGET
|
|
func (r *Redis) FieldExist(t *testing.T, key, field string) interface{} {
|
|
rply := r.field(t, key, field)
|
|
require.NotNil(t, rply, fmt.Sprintf("Expected field %s - %s found", key, field))
|
|
return rply
|
|
}
|
|
|
|
// FieldNotExist checks if the field doesn't exist in redis via HGET
|
|
func (r *Redis) FieldNotExist(t *testing.T, key, field string) interface{} {
|
|
rply := r.field(t, key, field)
|
|
require.Nil(t, rply, fmt.Sprintf("Expected field %s - %s not found", key, field))
|
|
return rply
|
|
}
|
|
|
|
// fieldJSON retrieves the json field value. If the field doesn't exist the test failes
|
|
func (r *Redis) fieldJSON(t *testing.T, key, field, jsonkey string) interface{} {
|
|
value, err := redigo.String(r.FieldExist(t, key, field), nil)
|
|
require.Nil(t, err)
|
|
|
|
m := objx.MustFromJSON(value)
|
|
return m.Get(jsonkey).Data()
|
|
}
|
|
|
|
// FieldJSONExist checks if the jsonkey exists in the field in Redis
|
|
func (r *Redis) FieldJSONExist(t *testing.T, key, field, jsonkey string) interface{} {
|
|
value := r.fieldJSON(t, key, field, jsonkey)
|
|
require.NotNil(t, value, fmt.Sprintf("Expected field %s - %s - %s found", key, field, jsonkey))
|
|
return value
|
|
}
|
|
|
|
// FieldJSONNotExist checks if the jsonkey doesn't exists in the field in Redis. If the field doesn't exist the test
|
|
// failes
|
|
func (r *Redis) FieldJSONNotExist(t *testing.T, key, field, jsonkey string) {
|
|
value := r.fieldJSON(t, key, field, jsonkey)
|
|
require.Nil(t, value, fmt.Sprintf("Expected field %s - %s - %s not found", key, field, jsonkey))
|
|
}
|
|
|
|
// FieldJSONKeyValue checks if the value is the same as the value in the jsonkey in the field in Redis.
|
|
func (r *Redis) FieldJSONKeyValue(t *testing.T, key, field, jsonkey string, jsonvalue interface{}) {
|
|
value := r.FieldJSONExist(t, key, field, jsonkey)
|
|
require.Equal(t, jsonvalue, value)
|
|
}
|
|
|
|
// GPSRecordExists returns true when the DeviceUID contains a GEOHash
|
|
func (r *Redis) GPSRecordExists(t *testing.T, deviceUID string) bool {
|
|
rply, err := redigo.Positions(r.do("GEOPOS", rts.SensorGeohashKey, deviceUID))
|
|
require.Nil(t, err)
|
|
|
|
// Length is 0 when rts.SensorGeohashKey doesn't exists in Redis (there are none GPS coordinates)
|
|
// reply[0] is nil when the deviceUID is not in Redis (but there are other deviceUID's in Redis)
|
|
return len(rply) > 0 && rply[0] != nil
|
|
}
|
|
|
|
// do execute a command on the redis cluster
|
|
func (r *Redis) do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
|
c, err := r.getConn()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
closeerr := c.Close()
|
|
// we are closing the connection so it is too late for errors.
|
|
_ = closeerr
|
|
}()
|
|
|
|
return c.Do(commandName, args...)
|
|
}
|
|
|
|
func (r *Redis) getConn() (redigo.Conn, error) {
|
|
conn := r.rc.Get()
|
|
|
|
// create the retry connection - only Do, Close and Err are
|
|
// supported on that connection. It will make up to 3 attempts
|
|
// to get a valid response, and will wait 100ms before a retry
|
|
// in case of a TRYAGAIN redis error.
|
|
return redisc.RetryConn(conn, 3, time.Second)
|
|
}
|
|
|
|
// Close the connection to redis
|
|
func (r *Redis) Close() error {
|
|
close(r.isClosed)
|
|
err := r.rc.Close()
|
|
r.cleanup()
|
|
r.wg.Wait()
|
|
return err
|
|
}
|