src.dualinventive.com/go/redis-proxy/vendor/github.com/pebbe/zmq4/examples/peering2.go

265 lines
6.5 KiB
Go

//
// Broker peering simulation (part 2).
// Prototypes the request-reply flow
//
package main
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"log"
"math/rand"
"os"
"time"
)
const (
NBR_CLIENTS = 10
NBR_WORKERS = 3
WORKER_READY = "**READY**" // Signals worker is ready
)
var (
peers = make(map[string]bool)
)
// The client task does a request-reply dialog using a standard
// synchronous REQ socket:
func client_task(name string, i int) {
clientname := fmt.Sprintf("Client-%s-%d", name, i)
client, _ := zmq.NewSocket(zmq.REQ)
defer client.Close()
client.SetIdentity(clientname)
client.Connect("ipc://" + name + "-localfe.ipc")
for {
// Send request, get reply
client.Send("HELLO from "+clientname, 0)
reply, err := client.Recv(0)
if err != nil {
fmt.Println("client_task interrupted", name)
break // Interrupted
}
fmt.Printf("%s: %s\n", clientname, reply)
time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
}
}
// The worker task plugs into the load-balancer using a REQ
// socket:
func worker_task(name string, i int) {
workername := fmt.Sprintf("Worker-%s-%d", name, i)
worker, _ := zmq.NewSocket(zmq.REQ)
defer worker.Close()
worker.SetIdentity(workername)
worker.Connect("ipc://" + name + "-localbe.ipc")
// Tell broker we're ready for work
worker.SendMessage(WORKER_READY)
// Process messages as they arrive
for {
msg, err := worker.RecvMessage(0)
if err != nil {
fmt.Println("worker_task interrupted", name)
break // Interrupted
}
i := len(msg) - 1
fmt.Printf("%s: %s\n", workername, msg[i])
worker.SendMessage(msg[:i], "OK from "+workername)
}
}
// The main task begins by setting-up its frontend and backend sockets
// and then starting its client and worker tasks:
func main() {
// First argument is this broker's name
// Other arguments are our peers' names
//
if len(os.Args) < 2 {
fmt.Println("syntax: peering2 me {you}...")
os.Exit(1)
}
for _, peer := range os.Args[2:] {
peers[peer] = true
}
self := os.Args[1]
fmt.Println("I: preparing broker at", self)
rand.Seed(time.Now().UnixNano())
// Bind cloud frontend to endpoint
cloudfe, _ := zmq.NewSocket(zmq.ROUTER)
defer cloudfe.Close()
cloudfe.SetIdentity(self)
cloudfe.Bind("ipc://" + self + "-cloud.ipc")
// Connect cloud backend to all peers
cloudbe, _ := zmq.NewSocket(zmq.ROUTER)
defer cloudbe.Close()
cloudbe.SetIdentity(self)
for _, peer := range os.Args[2:] {
fmt.Println("I: connecting to cloud frontend at", peer)
cloudbe.Connect("ipc://" + peer + "-cloud.ipc")
}
// Prepare local frontend and backend
localfe, _ := zmq.NewSocket(zmq.ROUTER)
defer localfe.Close()
localfe.Bind("ipc://" + self + "-localfe.ipc")
localbe, _ := zmq.NewSocket(zmq.ROUTER)
defer localbe.Close()
localbe.Bind("ipc://" + self + "-localbe.ipc")
// Get user to tell us when we can start...
fmt.Print("Press Enter when all brokers are started: ")
var line string
fmt.Scanln(&line)
// Start local workers
for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
go worker_task(self, worker_nbr)
}
// Start local clients
for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ {
go client_task(self, client_nbr)
}
// Here we handle the request-reply flow. We're using load-balancing
// to poll workers at all times, and clients only when there are one or
// more workers available.
// Least recently used queue of available workers
workers := make([]string, 0)
backends := zmq.NewPoller()
backends.Add(localbe, zmq.POLLIN)
backends.Add(cloudbe, zmq.POLLIN)
frontends := zmq.NewPoller()
frontends.Add(localfe, zmq.POLLIN)
frontends.Add(cloudfe, zmq.POLLIN)
msg := []string{}
number_of_peers := len(os.Args) - 2
for {
// First, route any waiting replies from workers
// If we have no workers anyhow, wait indefinitely
timeout := time.Second
if len(workers) == 0 {
timeout = -1
}
sockets, err := backends.Poll(timeout)
if err != nil {
log.Println(err)
break // Interrupted
}
msg = msg[:]
if socketInPolled(localbe, sockets) {
// Handle reply from local worker
msg, err = localbe.RecvMessage(0)
if err != nil {
log.Println(err)
break // Interrupted
}
var identity string
identity, msg = unwrap(msg)
workers = append(workers, identity)
// If it's READY, don't route the message any further
if msg[0] == WORKER_READY {
msg = msg[0:0]
}
} else if socketInPolled(cloudbe, sockets) {
// Or handle reply from peer broker
msg, err = cloudbe.RecvMessage(0)
if err != nil {
log.Println(err)
break // Interrupted
}
// We don't use peer broker identity for anything
_, msg = unwrap(msg)
}
if len(msg) > 0 {
// Route reply to cloud if it's addressed to a broker
if peers[msg[0]] {
cloudfe.SendMessage(msg)
} else {
localfe.SendMessage(msg)
}
}
// Now we route as many client requests as we have worker capacity
// for. We may reroute requests from our local frontend, but not from
// the cloud frontend. We reroute randomly now, just to test things
// out. In the next version we'll do this properly by calculating
// cloud capacity:
for len(workers) > 0 {
sockets, err := frontends.Poll(0)
if err != nil {
log.Println(err)
break // Interrupted
}
var reroutable bool
// We'll do peer brokers first, to prevent starvation
if socketInPolled(cloudfe, sockets) {
msg, _ = cloudfe.RecvMessage(0)
reroutable = false
} else if socketInPolled(localfe, sockets) {
msg, _ = localfe.RecvMessage(0)
reroutable = true
} else {
break // No work, go back to backends
}
// If reroutable, send to cloud 20% of the time
// Here we'd normally use cloud status information
//
if reroutable && number_of_peers > 0 && rand.Intn(5) == 0 {
// Route to random broker peer
random_peer := os.Args[2+rand.Intn(number_of_peers)]
cloudbe.SendMessage(random_peer, "", msg)
} else {
localbe.SendMessage(workers[0], "", msg)
workers = workers[1:]
}
}
}
fmt.Println("Exit")
}
// Pops frame off front of message and returns it as 'head'
// If next frame is empty, pops that empty frame.
// Return remaining frames of message as 'tail'
func unwrap(msg []string) (head string, tail []string) {
head = msg[0]
if len(msg) > 1 && msg[1] == "" {
tail = msg[2:]
} else {
tail = msg[1:]
}
return
}
// Returns true if *Socket is in []Polled
func socketInPolled(s *zmq.Socket, p []zmq.Polled) bool {
for _, pp := range p {
if pp.Socket == s {
return true
}
}
return false
}