I'am currently developing a system with a server, which gets tasks of some clients and processes them. As i need high throughput i examined the speed for roundtrips/s. I use the Paranoid Pirate mode which zguide said.
Clients have a Dealer socket and send a task to the server, server receives the tasks with a Router socket (TCP-Connection) The server fowards the Tasks to one Worker-process (Router - Dealer via Tcp) The worker processes the tasks and sends the response back on the same way.
I get only 20000 RT/s. cpu occupancy rate 80%, client 90%, worker80%. My test machine has 4 core cpu and 8g memory. Please help me what's the wrong thing i have make. Thanks
client code:
void * client ()
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_DEALER);
// 设置随机标识,方便跟踪
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zsocket_set_identity (client, identity);
zsocket_connect (client, "tcp://localhost:5755");
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { client, 0, ZMQ_POLLOUT, 0 } };
uint64_t start = zclock_time ();
uint64_t start2 = zclock_time ();
while (1) {
int64_t more = 0;
if (zmq_poll (items, 2, ZMQ_POLL_MSEC) == -1)
{
break;
}
if (items [0].revents & ZMQ_POLLIN)
{
static int nTmp = 0;
if (++nTmp % 100000 == 0)
{
printf("recv 100000 msg use %lld\n", zclock_time () - start);
start = zclock_time ();
}
zmsg_t *msg = zmsg_recv (client);
if (!msg)
{
break;
}
//zframe_print (zmsg_last (msg), identity);
zmsg_destroy (&msg);
}
if (items [1].revents & ZMQ_POLLOUT)
{
static int nTmp2 = 0;
zstr_sendf (client, "request #%d", 1);
}
}
zctx_destroy (&ctx);
return NULL;
}
server code:
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 心跳健康度,3-5是合理的
#define HEARTBEAT_INTERVAL 1000 // 单位:毫秒
// 偏执海盗协议的消息代码
#define PPP_READY "\001" // worker已就绪
#define PPP_HEARTBEAT "\002" // worker心跳
// 使用以下结构表示worker队列中的一个有效的worker
typedef struct {
zframe_t *address; // worker的地址
char *identity; // 可打印的套接字标识
int64_t expiry; // 过期时间
} worker_t;
// 创建新的worker
static worker_t *
s_worker_new (zframe_t *address)
{
worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
self->address = address;
self->identity = zframe_strdup (address);
self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}
// 销毁worker结构,包括标识
static void
s_worker_destroy (worker_t **self_p)
{
assert (self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy (&self->address);
free (self->identity);
free (self);
*self_p = NULL;
}
}
// worker已就绪,将其移至列表末尾
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (streq (self->identity, worker->identity)) {
zlist_remove (workers, worker);
s_worker_destroy (&worker);
break;
}
worker = (worker_t *) zlist_next (workers);
}
zlist_append (workers, self);
}
// 返回下一个可用的worker地址
static zframe_t *
s_workers_next (zlist_t *workers)
{
worker_t *worker = zlist_pop (workers);
assert (worker);
zframe_t *frame = worker->address;
worker->address = NULL;
s_worker_destroy (&worker);
return frame;
}
// 寻找并销毁已过期的worker。
// 由于列表中最旧的worker排在最前,所以当找到第一个未过期的worker时就停止。
static void
s_workers_purge (zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (zclock_time () < worker->expiry)
break; // worker未过期,停止扫描
zlist_remove (workers, worker);
s_worker_destroy (&worker);
worker = (worker_t *) zlist_first (workers);
}
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // client端点
zsocket_bind (backend, "tcp://*:5556"); // worker端点
// List of available workers
zlist_t *workers = zlist_new ();
// 规律地发送心跳
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// 当存在可用worker时轮询前端端点
int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中断
// 处理后端worker请求
if (items [0].revents & ZMQ_POLLIN) {
// 使用worker地址进行LRU路由
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // 中断
// worker的任何信号均表示其仍然存活
zframe_t *address = zmsg_unwrap (msg);
worker_t *worker = s_worker_new (address);
s_worker_ready (worker, workers);
// 处理控制消息,或者将应答转发给client
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_READY, 1)
&& memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
printf ("E: invalid message from worker");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// 获取下一个client请求,交给下一个可用的worker
zmsg_t *msg = zmsg_recv (frontend);
if (!msg)
break; // 中断
zmsg_push (msg, s_workers_next (workers));
zmsg_send (&msg, backend);
}
// 发送心跳给空闲的worker
if (zclock_time () >= heartbeat_at) {
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
zframe_send (&worker->address, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, backend, 0);
worker = (worker_t *) zlist_next (workers);
}
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
s_workers_purge (workers);
}
// 程序结束后进行清理
while (zlist_size (workers)) {
worker_t *worker = (worker_t *) zlist_pop (workers);
s_worker_destroy (&worker);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
worker code:
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 合理值:3-5
#define HEARTBEAT_INTERVAL 1000 // 单位:毫秒
#define INTERVAL_INIT 1000 // 重试间隔
#define INTERVAL_MAX 32000 // 回退算法最大值
// 偏执海盗规范的常量定义
#define PPP_READY "\001" // 消息:worker已就绪
#define PPP_HEARTBEAT "\002" // 消息:worker心跳
// 返回一个连接至偏执海盗队列装置的套接字
static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
// 告知队列worker已准备就绪
printf ("I: worker已就绪\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);
return worker;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);
// 如果心跳健康度为零,则表示队列装置已死亡
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// 规律地发送心跳
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
srandom ((unsigned) time (NULL));
int cycles = 0;
while (1) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中断
if (items [0].revents & ZMQ_POLLIN) {
// 获取消息
// - 3段消息,信封+内容,表示一个请求
// - 1段消息,表示心跳
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // 中断
if (zmsg_size (msg) == 3) {
// 若干词循环后模拟各种问题
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: 模拟崩溃\n");
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: 模拟CPU过载\n");
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: 正常应答\n");
zmsg_send (&msg, worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // 做一些处理工作
if (zctx_interrupted)
break;
}
else
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf ("E: 非法消息\n");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else {
printf ("E: 非法消息\n");
zmsg_dump (msg);
}
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
printf ("W: 心跳失败,无法连接队列装置\n");
printf ("W: %zd 毫秒后进行重连...\n", interval);
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
worker = s_worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
// 适时发送心跳给队列
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
printf ("I: worker心跳\n");
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, worker, 0);
}
}
zctx_destroy (&ctx);
return 0;
}
Aucun commentaire:
Enregistrer un commentaire