src.dualinventive.com/go/redis-proxy/cmd/redis-stress-test/main.go

168 lines
3.9 KiB
Go

package main
import (
"flag"
"fmt"
"math/rand"
"os"
"time"
redigo "github.com/gomodule/redigo/redis"
"github.com/mna/redisc"
"src.dualinventive.com/go/dinet"
"src.dualinventive.com/go/dinet/ditime"
"src.dualinventive.com/go/dinet/rpc"
)
// letterRunes contains all hexadecimal characters. Keep it global for performance
// nolint: gochecknoglobals
var letterRunes = []rune("abcdef0123456789")
func randomUID() string {
n := 28
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return "1337" + string(b)
}
func main() {
rand.Seed(time.Now().UnixNano())
pubSubURI := flag.String("zmq", "tcp://127.0.0.1:6091", "the pub sub ZMQ uri")
redisUrls := ArrayVar("r", []string{
"127.0.0.1:46526",
"127.0.0.1:35962",
"127.0.0.1:42060",
}, "redis cluster urls")
numbers := flag.Int("n", 200, "the number of requests")
// waitPeriod := flag.Duration("w", 200 * time.Second, "the wait period")
numSensorData := flag.Int("d", 1000, "the number of sensor data")
flag.Parse()
// Calculate the result
result := make([]*rpc.ResultValueItem, 0, *numSensorData)
for a := 0; a < *numSensorData; a++ {
result = append(result, &rpc.ResultValueItem{
UID: uint16(a),
Time: ditime.Now(),
Value: a,
})
}
fmt.Println("Connect to", *pubSubURI)
con := getZMQConn(*pubSubURI)
fmt.Println("Connect to", *redisUrls)
redis := getRedisConn(*redisUrls)
c := redis.Get()
var err error
conn, err := redisc.RetryConn(c, 3, time.Second)
if err != nil {
panic(err)
}
defer func() {
closeerr := conn.Close()
// we are closing the connection so it is too late for errors.
_ = closeerr
}()
for a := 0; a < *numbers || *numbers < 0; a++ {
uid := randomUID()
delDevice(conn, uid)
fmt.Println(a, "Send values with ", *numSensorData, " sensor data")
insert(con, uid, result)
start := time.Now()
for !check(conn, uid, int64(*numSensorData+1)) {
time.Sleep(time.Millisecond * 2)
if time.Since(start) > time.Minute {
panic("ai ai ai")
}
}
end := time.Now()
fmt.Println("all inserted... Duration: ", end.Sub(start))
ok := true
if !check(conn, uid, int64(*numSensorData+1)) {
ok = false
}
if !ok {
os.Exit(1)
}
}
}
// getZMQConn returns the ZMQ connection
func getZMQConn(uri string) dinet.Conn {
con, err := dinet.NewConn(dinet.TransportZmq)
if err != nil {
panic(err)
}
err = con.Connect(uri + "?type=pub&bind=true")
if err != nil {
panic(err)
}
time.Sleep(time.Second)
return con
}
// getRedisConn returns the redis cluster connection
func getRedisConn(redisHosts []string) *redisc.Cluster {
cluster := &redisc.Cluster{
StartupNodes: redisHosts,
DialOptions: []redigo.DialOption{redigo.DialConnectTimeout(10 * time.Second)},
}
err := cluster.Refresh()
if err != nil {
panic(err)
}
return cluster
}
// flushCluster flushes the cluster
func delDevice(c redigo.Conn, uid string) {
redisKey := fmt.Sprintf("device:%s", uid)
_, err := redisDo(c, "DEL", redisKey)
if err != nil {
panic(err)
}
}
// redisDo executes the command on the redis cluster
func redisDo(conn redigo.Conn, commandName string, args ...interface{}) (reply interface{}, err error) {
return conn.Do(commandName, args...)
}
// insert sends a ZMQ request to the redis proxy
func insert(pubsub dinet.Writer, deviceUID string, result []*rpc.ResultValueItem) {
/* #nosec */
err := pubsub.Send(&rpc.Msg{
DeviceUID: deviceUID,
Time: ditime.Now(),
Type: rpc.MsgTypeReply,
ID: uint32(rand.Int()),
ClassMethod: rpc.ClassMethodSensorData,
Result: rpc.NewResult(result),
})
if err != nil {
panic(err)
}
}
// check checks if the data is in the redis cluster
func check(redis redigo.Conn, uid string, expectedFields int64) bool {
deviceUID := fmt.Sprintf("device:%s", uid)
res, err := redigo.Int64(redisDo(redis, "HLEN", deviceUID))
if err != nil {
panic(err)
}
if res != expectedFields {
return false
}
fmt.Printf("%s contains %d fields\n", deviceUID, res)
return true
}