83 lines
1.6 KiB
Go
83 lines
1.6 KiB
Go
//
|
|
// ROUTER-to-REQ example.
|
|
//
|
|
|
|
package main
|
|
|
|
import (
|
|
zmq "github.com/pebbe/zmq4"
|
|
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
NBR_WORKERS = 10
|
|
)
|
|
|
|
func worker_task() {
|
|
worker, _ := zmq.NewSocket(zmq.REQ)
|
|
defer worker.Close()
|
|
set_id(worker)
|
|
worker.Connect("tcp://localhost:5671")
|
|
|
|
total := 0
|
|
for {
|
|
// Tell the broker we're ready for work
|
|
worker.Send("Hi Boss", 0)
|
|
|
|
// Get workload from broker, until finished
|
|
workload, _ := worker.Recv(0)
|
|
if workload == "Fired!" {
|
|
fmt.Printf("Completed: %d tasks\n", total)
|
|
break
|
|
}
|
|
total++
|
|
|
|
// Do some random work
|
|
time.Sleep(time.Duration(rand.Intn(500)+1) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
broker, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer broker.Close()
|
|
|
|
broker.Bind("tcp://*:5671")
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
|
|
go worker_task()
|
|
}
|
|
// Run for five seconds and then tell workers to end
|
|
start_time := time.Now()
|
|
workers_fired := 0
|
|
for {
|
|
// Next message gives us least recently used worker
|
|
identity, _ := broker.Recv(0)
|
|
broker.Send(identity, zmq.SNDMORE)
|
|
broker.Recv(0) // Envelope delimiter
|
|
broker.Recv(0) // Response from worker
|
|
broker.Send("", zmq.SNDMORE)
|
|
|
|
// Encourage workers until it's time to fire them
|
|
if time.Since(start_time) < 5*time.Second {
|
|
broker.Send("Work harder", 0)
|
|
} else {
|
|
broker.Send("Fired!", 0)
|
|
workers_fired++
|
|
if workers_fired == NBR_WORKERS {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
}
|
|
|
|
func set_id(soc *zmq.Socket) {
|
|
identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
|
|
soc.SetIdentity(identity)
|
|
}
|