336 lines
8.9 KiB
Go
336 lines
8.9 KiB
Go
//
|
|
// Broker peering simulation (part 3).
|
|
// Prototypes the full flow of status and tasks
|
|
//
|
|
|
|
/*
|
|
|
|
One of the differences between peering2 and peering3 is that
|
|
peering2 always uses Poll() and then uses a helper function socketInPolled()
|
|
to check if a specific socket returned a result, while peering3 uses PollAll()
|
|
and checks the event state of the socket in a specific index in the list.
|
|
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
zmq "github.com/pebbe/zmq4"
|
|
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
NBR_CLIENTS = 10
|
|
NBR_WORKERS = 5
|
|
WORKER_READY = "**READY**" // Signals worker is ready
|
|
)
|
|
|
|
var (
|
|
// Our own name; in practice this would be configured per node
|
|
self string
|
|
)
|
|
|
|
// This is the client task. It issues a burst of requests and then
|
|
// sleeps for a few seconds. This simulates sporadic activity; when
|
|
// a number of clients are active at once, the local workers should
|
|
// be overloaded. The client uses a REQ socket for requests and also
|
|
// pushes statistics to the monitor socket:
|
|
|
|
func client_task(i int) {
|
|
client, _ := zmq.NewSocket(zmq.REQ)
|
|
defer client.Close()
|
|
client.Connect("ipc://" + self + "-localfe.ipc")
|
|
monitor, _ := zmq.NewSocket(zmq.PUSH)
|
|
defer monitor.Close()
|
|
monitor.Connect("ipc://" + self + "-monitor.ipc")
|
|
|
|
poller := zmq.NewPoller()
|
|
poller.Add(client, zmq.POLLIN)
|
|
for {
|
|
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
|
|
for burst := rand.Intn(15); burst > 0; burst-- {
|
|
task_id := fmt.Sprintf("%04X-%s-%d", rand.Intn(0x10000), self, i)
|
|
|
|
// Send request with random hex ID
|
|
client.Send(task_id, 0)
|
|
|
|
// Wait max ten seconds for a reply, then complain
|
|
sockets, err := poller.Poll(10 * time.Second)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
|
|
if len(sockets) == 1 {
|
|
reply, err := client.Recv(0)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
// Worker is supposed to answer us with our task id
|
|
id := strings.Fields(reply)[0]
|
|
if id != task_id {
|
|
panic("id != task_id")
|
|
}
|
|
monitor.Send(reply, 0)
|
|
} else {
|
|
monitor.Send("E: CLIENT EXIT - lost task "+task_id, 0)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// This is the worker task, which uses a REQ socket to plug into the
|
|
// load-balancer. It's the same stub worker task you've seen in other
|
|
// examples:
|
|
|
|
func worker_task(i int) {
|
|
worker, _ := zmq.NewSocket(zmq.REQ)
|
|
defer worker.Close()
|
|
worker.Connect("ipc://" + self + "-localbe.ipc")
|
|
|
|
// Tell broker we're ready for work
|
|
worker.SendMessage(WORKER_READY)
|
|
|
|
// Process messages as they arrive
|
|
for {
|
|
msg, err := worker.RecvMessage(0)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
|
|
// Workers are busy for 0/1 seconds
|
|
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
|
|
n := len(msg) - 1
|
|
worker.SendMessage(msg[:n], fmt.Sprintf("%s %s-%d", msg[n], self, i))
|
|
}
|
|
}
|
|
|
|
// The main task begins by setting-up all its sockets. The local frontend
|
|
// talks to clients, and our local backend talks to workers. The cloud
|
|
// frontend talks to peer brokers as if they were clients, and the cloud
|
|
// backend talks to peer brokers as if they were workers. The state
|
|
// backend publishes regular state messages, and the state frontend
|
|
// subscribes to all state backends to collect these messages. Finally,
|
|
// we use a PULL monitor socket to collect printable messages from tasks:
|
|
|
|
func main() {
|
|
// First argument is this broker's name
|
|
// Other arguments are our peers' names
|
|
//
|
|
if len(os.Args) < 2 {
|
|
fmt.Println("syntax: peering1 me {you}...")
|
|
os.Exit(1)
|
|
}
|
|
self = os.Args[1]
|
|
fmt.Printf("I: preparing broker at %s...\n", self)
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
// Prepare local frontend and backend
|
|
localfe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer localfe.Close()
|
|
localfe.Bind("ipc://" + self + "-localfe.ipc")
|
|
|
|
localbe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer localbe.Close()
|
|
localbe.Bind("ipc://" + self + "-localbe.ipc")
|
|
|
|
// Bind cloud frontend to endpoint
|
|
cloudfe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer cloudfe.Close()
|
|
cloudfe.SetIdentity(self)
|
|
cloudfe.Bind("ipc://" + self + "-cloud.ipc")
|
|
|
|
// Connect cloud backend to all peers
|
|
cloudbe, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer cloudbe.Close()
|
|
cloudbe.SetIdentity(self)
|
|
for _, peer := range os.Args[2:] {
|
|
fmt.Printf("I: connecting to cloud frontend at '%s'\n", peer)
|
|
cloudbe.Connect("ipc://" + peer + "-cloud.ipc")
|
|
}
|
|
// Bind state backend to endpoint
|
|
statebe, _ := zmq.NewSocket(zmq.PUB)
|
|
defer statebe.Close()
|
|
statebe.Bind("ipc://" + self + "-state.ipc")
|
|
|
|
// Connect state frontend to all peers
|
|
statefe, _ := zmq.NewSocket(zmq.SUB)
|
|
defer statefe.Close()
|
|
statefe.SetSubscribe("")
|
|
for _, peer := range os.Args[2:] {
|
|
fmt.Printf("I: connecting to state backend at '%s'\n", peer)
|
|
statefe.Connect("ipc://" + peer + "-state.ipc")
|
|
}
|
|
// Prepare monitor socket
|
|
monitor, _ := zmq.NewSocket(zmq.PULL)
|
|
defer monitor.Close()
|
|
monitor.Bind("ipc://" + self + "-monitor.ipc")
|
|
|
|
// After binding and connecting all our sockets, we start our child
|
|
// tasks - workers and clients:
|
|
|
|
for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
|
|
go worker_task(worker_nbr)
|
|
}
|
|
|
|
// Start local clients
|
|
for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ {
|
|
go client_task(client_nbr)
|
|
}
|
|
|
|
// Queue of available workers
|
|
local_capacity := 0
|
|
cloud_capacity := 0
|
|
workers := make([]string, 0)
|
|
|
|
primary := zmq.NewPoller()
|
|
primary.Add(localbe, zmq.POLLIN)
|
|
primary.Add(cloudbe, zmq.POLLIN)
|
|
primary.Add(statefe, zmq.POLLIN)
|
|
primary.Add(monitor, zmq.POLLIN)
|
|
|
|
secondary1 := zmq.NewPoller()
|
|
secondary1.Add(localfe, zmq.POLLIN)
|
|
secondary2 := zmq.NewPoller()
|
|
secondary2.Add(localfe, zmq.POLLIN)
|
|
secondary2.Add(cloudfe, zmq.POLLIN)
|
|
|
|
msg := make([]string, 0)
|
|
for {
|
|
|
|
// If we have no workers ready, wait indefinitely
|
|
timeout := time.Duration(time.Second)
|
|
if local_capacity == 0 {
|
|
timeout = -1
|
|
}
|
|
sockets, err := primary.PollAll(timeout)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
|
|
// Track if capacity changes during this iteration
|
|
previous := local_capacity
|
|
|
|
// Handle reply from local worker
|
|
msg = msg[0:0]
|
|
|
|
if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localbe
|
|
msg, err = localbe.RecvMessage(0)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
var identity string
|
|
identity, msg = unwrap(msg)
|
|
workers = append(workers, identity)
|
|
local_capacity++
|
|
|
|
// If it's READY, don't route the message any further
|
|
if msg[0] == WORKER_READY {
|
|
msg = msg[0:0]
|
|
}
|
|
} else if sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudbe
|
|
// Or handle reply from peer broker
|
|
msg, err = cloudbe.RecvMessage(0)
|
|
if err != nil {
|
|
break // Interrupted
|
|
}
|
|
// We don't use peer broker identity for anything
|
|
_, msg = unwrap(msg)
|
|
}
|
|
|
|
if len(msg) > 0 {
|
|
|
|
// Route reply to cloud if it's addressed to a broker
|
|
to_broker := false
|
|
for _, peer := range os.Args[2:] {
|
|
if peer == msg[0] {
|
|
to_broker = true
|
|
break
|
|
}
|
|
}
|
|
if to_broker {
|
|
cloudfe.SendMessage(msg)
|
|
} else {
|
|
localfe.SendMessage(msg)
|
|
}
|
|
}
|
|
|
|
// If we have input messages on our statefe or monitor sockets we
|
|
// can process these immediately:
|
|
|
|
if sockets[2].Events&zmq.POLLIN != 0 { // 2 == statefe
|
|
var status string
|
|
m, _ := statefe.RecvMessage(0)
|
|
_, m = unwrap(m) // peer
|
|
status, _ = unwrap(m)
|
|
cloud_capacity, _ = strconv.Atoi(status)
|
|
}
|
|
if sockets[3].Events&zmq.POLLIN != 0 { // 3 == monitor
|
|
status, _ := monitor.Recv(0)
|
|
fmt.Println(status)
|
|
}
|
|
// Now route as many clients requests as we can handle. If we have
|
|
// local capacity we poll both localfe and cloudfe. If we have cloud
|
|
// capacity only, we poll just localfe. We route any request locally
|
|
// if we can, else we route to the cloud.
|
|
|
|
for local_capacity+cloud_capacity > 0 {
|
|
var sockets []zmq.Polled
|
|
var err error
|
|
if local_capacity > 0 {
|
|
sockets, err = secondary2.PollAll(0)
|
|
} else {
|
|
sockets, err = secondary1.PollAll(0)
|
|
}
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localfe
|
|
msg, _ = localfe.RecvMessage(0)
|
|
} else if len(sockets) > 1 && sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudfe
|
|
msg, _ = cloudfe.RecvMessage(0)
|
|
} else {
|
|
break // No work, go back to primary
|
|
}
|
|
|
|
if local_capacity > 0 {
|
|
localbe.SendMessage(workers[0], "", msg)
|
|
workers = workers[1:]
|
|
local_capacity--
|
|
} else {
|
|
// Route to random broker peer
|
|
random_peer := rand.Intn(len(os.Args)-2) + 2
|
|
cloudbe.SendMessage(os.Args[random_peer], "", msg)
|
|
}
|
|
}
|
|
// We broadcast capacity messages to other peers; to reduce chatter
|
|
// we do this only if our capacity changed.
|
|
|
|
if local_capacity != previous {
|
|
// We stick our own identity onto the envelope
|
|
// Broadcast new capacity
|
|
statebe.SendMessage(self, "", local_capacity)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Pops frame off front of message and returns it as 'head'
|
|
// If next frame is empty, pops that empty frame.
|
|
// Return remaining frames of message as 'tail'
|
|
func unwrap(msg []string) (head string, tail []string) {
|
|
head = msg[0]
|
|
if len(msg) > 1 && msg[1] == "" {
|
|
tail = msg[2:]
|
|
} else {
|
|
tail = msg[1:]
|
|
}
|
|
return
|
|
}
|