168 lines
3.9 KiB
Go
168 lines
3.9 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"time"
|
|
|
|
redigo "github.com/gomodule/redigo/redis"
|
|
"github.com/mna/redisc"
|
|
"src.dualinventive.com/go/dinet"
|
|
"src.dualinventive.com/go/dinet/ditime"
|
|
"src.dualinventive.com/go/dinet/rpc"
|
|
)
|
|
|
|
// letterRunes contains all hexadecimal characters. Keep it global for performance
|
|
// nolint: gochecknoglobals
|
|
var letterRunes = []rune("abcdef0123456789")
|
|
|
|
func randomUID() string {
|
|
n := 28
|
|
b := make([]rune, n)
|
|
for i := range b {
|
|
b[i] = letterRunes[rand.Intn(len(letterRunes))]
|
|
}
|
|
return "1337" + string(b)
|
|
}
|
|
|
|
func main() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
pubSubURI := flag.String("zmq", "tcp://127.0.0.1:6091", "the pub sub ZMQ uri")
|
|
redisUrls := ArrayVar("r", []string{
|
|
"127.0.0.1:46526",
|
|
"127.0.0.1:35962",
|
|
"127.0.0.1:42060",
|
|
}, "redis cluster urls")
|
|
numbers := flag.Int("n", 200, "the number of requests")
|
|
// waitPeriod := flag.Duration("w", 200 * time.Second, "the wait period")
|
|
numSensorData := flag.Int("d", 1000, "the number of sensor data")
|
|
flag.Parse()
|
|
|
|
// Calculate the result
|
|
result := make([]*rpc.ResultValueItem, 0, *numSensorData)
|
|
for a := 0; a < *numSensorData; a++ {
|
|
result = append(result, &rpc.ResultValueItem{
|
|
UID: uint16(a),
|
|
Time: ditime.Now(),
|
|
Value: a,
|
|
})
|
|
}
|
|
|
|
fmt.Println("Connect to", *pubSubURI)
|
|
con := getZMQConn(*pubSubURI)
|
|
fmt.Println("Connect to", *redisUrls)
|
|
redis := getRedisConn(*redisUrls)
|
|
|
|
c := redis.Get()
|
|
var err error
|
|
conn, err := redisc.RetryConn(c, 3, time.Second)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer func() {
|
|
closeerr := conn.Close()
|
|
// we are closing the connection so it is too late for errors.
|
|
_ = closeerr
|
|
}()
|
|
|
|
for a := 0; a < *numbers || *numbers < 0; a++ {
|
|
uid := randomUID()
|
|
delDevice(conn, uid)
|
|
|
|
fmt.Println(a, "Send values with ", *numSensorData, " sensor data")
|
|
insert(con, uid, result)
|
|
start := time.Now()
|
|
for !check(conn, uid, int64(*numSensorData+1)) {
|
|
time.Sleep(time.Millisecond * 2)
|
|
if time.Since(start) > time.Minute {
|
|
panic("ai ai ai")
|
|
}
|
|
}
|
|
end := time.Now()
|
|
fmt.Println("all inserted... Duration: ", end.Sub(start))
|
|
|
|
ok := true
|
|
if !check(conn, uid, int64(*numSensorData+1)) {
|
|
ok = false
|
|
}
|
|
if !ok {
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// getZMQConn returns the ZMQ connection
|
|
func getZMQConn(uri string) dinet.Conn {
|
|
con, err := dinet.NewConn(dinet.TransportZmq)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
err = con.Connect(uri + "?type=pub&bind=true")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
time.Sleep(time.Second)
|
|
return con
|
|
}
|
|
|
|
// getRedisConn returns the redis cluster connection
|
|
func getRedisConn(redisHosts []string) *redisc.Cluster {
|
|
cluster := &redisc.Cluster{
|
|
StartupNodes: redisHosts,
|
|
DialOptions: []redigo.DialOption{redigo.DialConnectTimeout(10 * time.Second)},
|
|
}
|
|
err := cluster.Refresh()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return cluster
|
|
}
|
|
|
|
// flushCluster flushes the cluster
|
|
func delDevice(c redigo.Conn, uid string) {
|
|
redisKey := fmt.Sprintf("device:%s", uid)
|
|
_, err := redisDo(c, "DEL", redisKey)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// redisDo executes the command on the redis cluster
|
|
func redisDo(conn redigo.Conn, commandName string, args ...interface{}) (reply interface{}, err error) {
|
|
return conn.Do(commandName, args...)
|
|
}
|
|
|
|
// insert sends a ZMQ request to the redis proxy
|
|
func insert(pubsub dinet.Writer, deviceUID string, result []*rpc.ResultValueItem) {
|
|
/* #nosec */
|
|
err := pubsub.Send(&rpc.Msg{
|
|
DeviceUID: deviceUID,
|
|
Time: ditime.Now(),
|
|
Type: rpc.MsgTypeReply,
|
|
ID: uint32(rand.Int()),
|
|
ClassMethod: rpc.ClassMethodSensorData,
|
|
Result: rpc.NewResult(result),
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// check checks if the data is in the redis cluster
|
|
func check(redis redigo.Conn, uid string, expectedFields int64) bool {
|
|
deviceUID := fmt.Sprintf("device:%s", uid)
|
|
res, err := redigo.Int64(redisDo(redis, "HLEN", deviceUID))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if res != expectedFields {
|
|
return false
|
|
}
|
|
fmt.Printf("%s contains %d fields\n", deviceUID, res)
|
|
return true
|
|
}
|