src.dualinventive.com/go/redis-proxy/vendor/github.com/pebbe/zmq4/examples/clonesrv2.go

120 lines
3.0 KiB
Go

//
// Clone server Model Two
//
// In the original C example, the client misses updates between snapshot
// and further updates. Sometimes, it even misses the END message of
// the snapshot, so it waits for it forever.
// This Go implementation has some modifications to improve this, but it
// is still not fully reliable.
package main
import (
zmq "github.com/pebbe/zmq4"
"github.com/pebbe/zmq4/examples/kvsimple"
"fmt"
"math/rand"
"time"
)
func main() {
// Prepare our context and sockets
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:5557")
sequence := int64(0)
rand.Seed(time.Now().UnixNano())
// Start state manager and wait for synchronization signal
updates, _ := zmq.NewSocket(zmq.PAIR)
updates.Bind("inproc://pipe")
go state_manager()
updates.RecvMessage(0) // "READY"
for {
// Distribute as key-value message
sequence++
kvmsg := kvsimple.NewKvmsg(sequence)
kvmsg.SetKey(fmt.Sprint(rand.Intn(10000)))
kvmsg.SetBody(fmt.Sprint(rand.Intn(1000000)))
if kvmsg.Send(publisher) != nil {
break
}
if kvmsg.Send(updates) != nil {
break
}
}
fmt.Printf("Interrupted\n%d messages out\n", sequence)
}
// The state manager task maintains the state and handles requests from
// clients for snapshots:
func state_manager() {
kvmap := make(map[string]*kvsimple.Kvmsg)
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Connect("inproc://pipe")
pipe.SendMessage("READY")
snapshot, _ := zmq.NewSocket(zmq.ROUTER)
snapshot.Bind("tcp://*:5556")
poller := zmq.NewPoller()
poller.Add(pipe, zmq.POLLIN)
poller.Add(snapshot, zmq.POLLIN)
sequence := int64(0) // Current snapshot version number
LOOP:
for {
polled, err := poller.Poll(-1)
if err != nil {
break // Context has been shut down
}
for _, item := range polled {
switch socket := item.Socket; socket {
case pipe:
// Apply state update from main thread
kvmsg, err := kvsimple.RecvKvmsg(pipe)
if err != nil {
break LOOP // Interrupted
}
sequence, _ = kvmsg.GetSequence()
kvmsg.Store(kvmap)
case snapshot:
// Execute state snapshot request
msg, err := snapshot.RecvMessage(0)
if err != nil {
break LOOP // Interrupted
}
identity := msg[0]
// Request is in second frame of message
request := msg[1]
if request != "ICANHAZ?" {
fmt.Println("E: bad request, aborting")
break LOOP
}
// Send state snapshot to client
// For each entry in kvmap, send kvmsg to client
for _, kvmsg := range kvmap {
snapshot.Send(identity, zmq.SNDMORE)
kvmsg.Send(snapshot)
}
// Give client some time to deal with it.
// This reduces the risk that the client won't see
// the END message, but it doesn't eliminate the risk.
time.Sleep(100 * time.Millisecond)
// Now send END message with sequence number
fmt.Printf("Sending state shapshot=%d\n", sequence)
snapshot.Send(identity, zmq.SNDMORE)
kvmsg := kvsimple.NewKvmsg(sequence)
kvmsg.SetKey("KTHXBAI")
kvmsg.SetBody("")
kvmsg.Send(snapshot)
}
}
}
}