src.dualinventive.com/go/redis-proxy/vendor/github.com/pebbe/zmq4/examples/asyncsrv.go

139 lines
3.3 KiB
Go

//
// Asynchronous client-to-server (DEALER to ROUTER).
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.
package main
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
// ---------------------------------------------------------------------
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
func client_task() {
var mu sync.Mutex
client, _ := zmq.NewSocket(zmq.DEALER)
defer client.Close()
// Set random identity to make tracing easier
set_id(client)
client.Connect("tcp://localhost:5570")
go func() {
for request_nbr := 1; true; request_nbr++ {
time.Sleep(time.Second)
mu.Lock()
client.SendMessage(fmt.Sprintf("request #%d", request_nbr))
mu.Unlock()
}
}()
for {
time.Sleep(10 * time.Millisecond)
mu.Lock()
msg, err := client.RecvMessage(zmq.DONTWAIT)
if err == nil {
id, _ := client.GetIdentity()
fmt.Println(msg[0], id)
}
mu.Unlock()
}
}
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
func server_task() {
// Frontend socket talks to clients over TCP
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5570")
// Backend socket talks to workers over inproc
backend, _ := zmq.NewSocket(zmq.DEALER)
defer backend.Close()
backend.Bind("inproc://backend")
// Launch pool of worker threads, precise number is not critical
for i := 0; i < 5; i++ {
go server_worker()
}
// Connect backend to frontend via a proxy
err := zmq.Proxy(frontend, backend, nil)
log.Fatalln("Proxy interrupted:", err)
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
func server_worker() {
worker, _ := zmq.NewSocket(zmq.DEALER)
defer worker.Close()
worker.Connect("inproc://backend")
for {
// The DEALER socket gives us the reply envelope and message
msg, _ := worker.RecvMessage(0)
identity, content := pop(msg)
// Send 0..4 replies back
replies := rand.Intn(5)
for reply := 0; reply < replies; reply++ {
// Sleep for some fraction of a second
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
worker.SendMessage(identity, content)
}
}
}
// The main thread simply starts several clients, and a server, and then
// waits for the server to finish.
func main() {
rand.Seed(time.Now().UnixNano())
go client_task()
go client_task()
go client_task()
go server_task()
// Run for 5 seconds then quit
time.Sleep(5 * time.Second)
}
func set_id(soc *zmq.Socket) {
identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
soc.SetIdentity(identity)
}
func pop(msg []string) (head, tail []string) {
if msg[1] == "" {
head = msg[:2]
tail = msg[2:]
} else {
head = msg[:1]
tail = msg[1:]
}
return
}