-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathah_listener.c
118 lines (103 loc) · 2.86 KB
/
ah_listener.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* Description:
* History: yang@haipo.me, 2016/04/01, create
*/
# include "ah_config.h"
# include "ah_listener.h"
static nw_svr *listener_svr;
static rpc_svr *worker_svr;
static int listener_decode_pkg(nw_ses *ses, void *data, size_t max)
{
return max;
}
static void listener_on_recv_pkg(nw_ses *ses, void *data, size_t size)
{
return;
}
static void listener_on_error_msg(nw_ses *ses, const char *msg)
{
log_error("listener error, peer: %s, msg: %s", nw_sock_human_addr(&ses->peer_addr), msg);
}
static int listener_on_accept(nw_ses *ses, int sockfd, nw_addr_t *peer_addr)
{
if (worker_svr->raw_svr->clt_count == 0) {
log_error("no available worker");
return -1;
}
int worker = rand() % worker_svr->raw_svr->clt_count;
nw_ses *curr = worker_svr->raw_svr->clt_list_head;
for (int i = 0; i < worker && curr; ++i) {
curr = curr->next;
}
if (!curr) {
log_error("choice worker fail");
return -1;
}
if (nw_ses_send_fd(curr, sockfd) < 0) {
log_error("send sockfd fail: %s", strerror(errno));
return -1;
}
close(sockfd);
return 0;
}
static int init_listener_svr(void)
{
nw_svr_type type;
memset(&type, 0, sizeof(type));
type.decode_pkg = listener_decode_pkg;
type.on_accept = listener_on_accept;
type.on_recv_pkg = listener_on_recv_pkg;
type.on_error_msg = listener_on_error_msg;
nw_svr_cfg *cfg = (nw_svr_cfg *)&settings.svr;
listener_svr = nw_svr_create(cfg, &type, NULL);
if (listener_svr == NULL)
return -__LINE__;
if (nw_svr_start(listener_svr) < 0)
return -__LINE__;
return 0;
}
static void worker_on_recv_pkg(nw_ses *ses, rpc_pkg *pkg)
{
return;
}
static void worker_on_new_connection(nw_ses *ses)
{
log_info("new worker connected, current worker number: %u", worker_svr->raw_svr->clt_count);
}
static void worker_on_connection_close(nw_ses *ses)
{
log_info("worker close, current worker number: %u", worker_svr->raw_svr->clt_count - 1);
}
static int init_worker_svr(void)
{
rpc_svr_cfg cfg;
nw_svr_bind bind;
memset(&cfg, 0, sizeof(cfg));
cfg.bind_count = 1;
if (nw_sock_cfg_parse(AH_LISTENER_BIND, &bind.addr, &bind.sock_type) < 0)
return -__LINE__;
cfg.bind_arr = &bind;
cfg.max_pkg_size = 1024;
cfg.heartbeat_check = true;
rpc_svr_type type;
type.on_recv_pkg = worker_on_recv_pkg;
type.on_new_connection = worker_on_new_connection;
type.on_connection_close = worker_on_connection_close;
worker_svr = rpc_svr_create(&cfg, &type);
if (worker_svr == NULL)
return -__LINE__;
if (rpc_svr_start(worker_svr) < 0)
return -__LINE__;
return 0;
}
int init_listener(void)
{
int ret;
ret = init_listener_svr();
if (ret < 0)
return ret;
ret = init_worker_svr();
if (ret < 0)
return ret;
return 0;
}