265 lines
6.5 KiB
Go
265 lines
6.5 KiB
Go
//
|
|
// Broker peering simulation (part 2).
|
|
// Prototypes the request-reply flow
|
|
//
|
|
|
|
package main
|
|
|
|
import (
|
|
zmq "github.com/pebbe/zmq4"
|
|
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
NBR_CLIENTS = 10
|
|
NBR_WORKERS = 3
|
|
WORKER_READY = "**READY**" // Signals worker is ready
|
|
)
|
|
|
|
var (
|
|
peers = make(map[string]bool)
|
|
)
|
|
|
|
// The client task does a request-reply dialog using a standard
|
|
// synchronous REQ socket:
|
|
|
|
func client_task(name string, i int) {
|
|
clientname := fmt.Sprintf("Client-%s-%d", name, i)
|
|
|
|
client, _ := zmq.NewSocket(zmq.REQ)
|
|
defer client.Close()
|
|
client.SetIdentity(clientname)
|
|
client.Connect("ipc://" + name + "-localfe.ipc")
|
|
|
|
for {
|
|
// Send request, get reply
|
|
client.Send("HELLO from "+clientname, 0)
|
|
reply, err := client.Recv(0)
|
|
if err != nil {
|
|
fmt.Println("client_task interrupted", name)
|
|
break // Interrupted
|
|
}
|
|
fmt.Printf("%s: %s\n", clientname, reply)
|
|
time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// The worker task plugs into the load-balancer using a REQ
|
|
// socket:
|
|
|
|
func worker_task(name string, i int) {
|
|
workername := fmt.Sprintf("Worker-%s-%d", name, i)
|
|
|
|
worker, _ := zmq.NewSocket(zmq.REQ)
|
|
defer worker.Close()
|
|
worker.SetIdentity(workername)
|
|
worker.Connect("ipc://" + name + "-localbe.ipc")
|
|
|
|
// Tell broker we're ready for work
|
|
worker.SendMessage(WORKER_READY)
|
|
|
|
// Process messages as they arrive
|
|
for {
|
|
msg, err := worker.RecvMessage(0)
|
|
if err != nil {
|
|
fmt.Println("worker_task interrupted", name)
|
|
break // Interrupted
|
|
}
|
|
|
|
i := len(msg) - 1
|
|
fmt.Printf("%s: %s\n", workername, msg[i])
|
|
worker.SendMessage(msg[:i], "OK from "+workername)
|
|
}
|
|
}
|
|
|
|
// The main task begins by setting-up its frontend and backend sockets
|
|
// and then starting its client and worker tasks:
|
|
|
|
func main() {
|
|
// First argument is this broker's name
|
|
// Other arguments are our peers' names
|
|
//
|
|
if len(os.Args) < 2 {
|
|
fmt.Println("syntax: peering2 me {you}...")
|
|
os.Exit(1)
|
|
}
|
|
for _, peer := range os.Args[2:] {
|
|
peers[peer] = true
|
|
}
|
|
|
|
self := os.Args[1]
|
|
fmt.Println("I: preparing broker at", self)
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
// Bind cloud frontend to endpoint
|
|
cloudfe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer cloudfe.Close()
|
|
cloudfe.SetIdentity(self)
|
|
cloudfe.Bind("ipc://" + self + "-cloud.ipc")
|
|
|
|
// Connect cloud backend to all peers
|
|
cloudbe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer cloudbe.Close()
|
|
cloudbe.SetIdentity(self)
|
|
for _, peer := range os.Args[2:] {
|
|
fmt.Println("I: connecting to cloud frontend at", peer)
|
|
cloudbe.Connect("ipc://" + peer + "-cloud.ipc")
|
|
}
|
|
// Prepare local frontend and backend
|
|
localfe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer localfe.Close()
|
|
localfe.Bind("ipc://" + self + "-localfe.ipc")
|
|
localbe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer localbe.Close()
|
|
localbe.Bind("ipc://" + self + "-localbe.ipc")
|
|
|
|
// Get user to tell us when we can start...
|
|
fmt.Print("Press Enter when all brokers are started: ")
|
|
var line string
|
|
fmt.Scanln(&line)
|
|
|
|
// Start local workers
|
|
for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
|
|
go worker_task(self, worker_nbr)
|
|
}
|
|
|
|
// Start local clients
|
|
for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ {
|
|
go client_task(self, client_nbr)
|
|
}
|
|
|
|
// Here we handle the request-reply flow. We're using load-balancing
|
|
// to poll workers at all times, and clients only when there are one or
|
|
// more workers available.
|
|
|
|
// Least recently used queue of available workers
|
|
workers := make([]string, 0)
|
|
|
|
backends := zmq.NewPoller()
|
|
backends.Add(localbe, zmq.POLLIN)
|
|
backends.Add(cloudbe, zmq.POLLIN)
|
|
frontends := zmq.NewPoller()
|
|
frontends.Add(localfe, zmq.POLLIN)
|
|
frontends.Add(cloudfe, zmq.POLLIN)
|
|
|
|
msg := []string{}
|
|
number_of_peers := len(os.Args) - 2
|
|
|
|
for {
|
|
// First, route any waiting replies from workers
|
|
// If we have no workers anyhow, wait indefinitely
|
|
timeout := time.Second
|
|
if len(workers) == 0 {
|
|
timeout = -1
|
|
}
|
|
sockets, err := backends.Poll(timeout)
|
|
if err != nil {
|
|
log.Println(err)
|
|
break // Interrupted
|
|
}
|
|
|
|
msg = msg[:]
|
|
if socketInPolled(localbe, sockets) {
|
|
// Handle reply from local worker
|
|
msg, err = localbe.RecvMessage(0)
|
|
if err != nil {
|
|
log.Println(err)
|
|
break // Interrupted
|
|
}
|
|
var identity string
|
|
identity, msg = unwrap(msg)
|
|
workers = append(workers, identity)
|
|
|
|
// If it's READY, don't route the message any further
|
|
if msg[0] == WORKER_READY {
|
|
msg = msg[0:0]
|
|
}
|
|
} else if socketInPolled(cloudbe, sockets) {
|
|
// Or handle reply from peer broker
|
|
msg, err = cloudbe.RecvMessage(0)
|
|
if err != nil {
|
|
log.Println(err)
|
|
break // Interrupted
|
|
}
|
|
|
|
// We don't use peer broker identity for anything
|
|
_, msg = unwrap(msg)
|
|
}
|
|
|
|
if len(msg) > 0 {
|
|
// Route reply to cloud if it's addressed to a broker
|
|
if peers[msg[0]] {
|
|
cloudfe.SendMessage(msg)
|
|
} else {
|
|
localfe.SendMessage(msg)
|
|
}
|
|
}
|
|
|
|
// Now we route as many client requests as we have worker capacity
|
|
// for. We may reroute requests from our local frontend, but not from
|
|
// the cloud frontend. We reroute randomly now, just to test things
|
|
// out. In the next version we'll do this properly by calculating
|
|
// cloud capacity:
|
|
|
|
for len(workers) > 0 {
|
|
sockets, err := frontends.Poll(0)
|
|
if err != nil {
|
|
log.Println(err)
|
|
break // Interrupted
|
|
}
|
|
var reroutable bool
|
|
// We'll do peer brokers first, to prevent starvation
|
|
if socketInPolled(cloudfe, sockets) {
|
|
msg, _ = cloudfe.RecvMessage(0)
|
|
reroutable = false
|
|
} else if socketInPolled(localfe, sockets) {
|
|
msg, _ = localfe.RecvMessage(0)
|
|
reroutable = true
|
|
} else {
|
|
break // No work, go back to backends
|
|
}
|
|
|
|
// If reroutable, send to cloud 20% of the time
|
|
// Here we'd normally use cloud status information
|
|
//
|
|
if reroutable && number_of_peers > 0 && rand.Intn(5) == 0 {
|
|
// Route to random broker peer
|
|
random_peer := os.Args[2+rand.Intn(number_of_peers)]
|
|
cloudbe.SendMessage(random_peer, "", msg)
|
|
} else {
|
|
localbe.SendMessage(workers[0], "", msg)
|
|
workers = workers[1:]
|
|
}
|
|
}
|
|
}
|
|
fmt.Println("Exit")
|
|
}
|
|
|
|
// Pops frame off front of message and returns it as 'head'
|
|
// If next frame is empty, pops that empty frame.
|
|
// Return remaining frames of message as 'tail'
|
|
func unwrap(msg []string) (head string, tail []string) {
|
|
head = msg[0]
|
|
if len(msg) > 1 && msg[1] == "" {
|
|
tail = msg[2:]
|
|
} else {
|
|
tail = msg[1:]
|
|
}
|
|
return
|
|
}
|
|
|
|
// Returns true if *Socket is in []Polled
|
|
func socketInPolled(s *zmq.Socket, p []zmq.Polled) bool {
|
|
for _, pp := range p {
|
|
if pp.Socket == s {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|