Skip to content

Commit

Permalink
Optionally queue outgoing data
Browse files Browse the repository at this point in the history
Support queueing outgoing stanzas and stream management elements for up
to a configurable number of milliseconds (with a configurable queue size
limit).  This allows for batching up multiple XML elements into a single
TCP packet in order to reduce the TCP/IP overhead.

The feature is supported by ejabberd_c2s, ejabberd_s2s_out, and
ejabberd_service.  It can be enabled by configuring the max. number of
milliseconds to queue an element (default: 0), and optionally the max.
number of elements to queue (default: 10).  This can be done by using
the following new ejabberd_c2s/ejabberd_service listener options:

- max_send_queue_size
- max_send_queue_delay

For ejabberd_c2s, the following global options can be specified instead:

- c2s_max_send_queue_size
- c2s_max_send_queue_delay

For ejabberd_s2s_out, the following global options can be specified:

- s2s_max_send_queue_size
- s2s_max_send_queue_delay
  • Loading branch information
weiss committed Jun 10, 2022
1 parent a89b1f3 commit 2da98d5
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 5 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}},
{if_var_true, stun,
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}},
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}}
]}.

Expand Down
18 changes: 17 additions & 1 deletion src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,19 @@ init([State, Opts]) ->
TLSVerify = proplists:get_bool(tls_verify, Opts),
Zlib = proplists:get_bool(zlib, Opts),
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of
undefined ->
proplists:get_value(max_send_queue_size, Opts, 10);
C2SMaxQSize ->
C2SMaxQSize
end,
MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of
undefined ->
?LOG_WARNING("DELAY: ~s OPTS: ~p", [undefined, Opts]),
proplists:get_value(max_send_queue_delay, Opts, 0);
C2SMaxQDelay ->
C2SMaxQDelay
end,
State1 = State#{tls_options => TLSOpts2,
tls_required => TLSRequired,
tls_enabled => TLSEnabled,
Expand All @@ -567,7 +580,8 @@ init([State, Opts]) ->
access => Access,
shaper => Shaper},
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]).
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]).

handle_call(get_presence, From, #{jid := JID} = State) ->
Pres = case maps:get(pres_last, State, error) of
Expand Down Expand Up @@ -1022,4 +1036,6 @@ listen_options() ->
{tls_verify, false},
{zlib, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000}].
4 changes: 4 additions & 0 deletions src/ejabberd_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ listen_opt_type(tls) ->
econf:bool();
listen_opt_type(max_stanza_size) ->
econf:pos_int(infinity);
listen_opt_type(max_send_queue_size) ->
econf:non_neg_int();
listen_opt_type(max_send_queue_delay) ->
econf:non_neg_int();
listen_opt_type(max_fsm_queue) ->
econf:pos_int();
listen_opt_type(send_timeout) ->
Expand Down
26 changes: 26 additions & 0 deletions src/ejabberd_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
-export([c2s_cafile/0, c2s_cafile/1]).
-export([c2s_ciphers/0, c2s_ciphers/1]).
-export([c2s_dhfile/0, c2s_dhfile/1]).
-export([c2s_max_send_queue_delay/0]).
-export([c2s_max_send_queue_size/0]).
-export([c2s_protocol_options/0, c2s_protocol_options/1]).
-export([c2s_tls_compression/0, c2s_tls_compression/1]).
-export([ca_file/0]).
Expand Down Expand Up @@ -124,6 +126,8 @@
-export([s2s_dns_retries/0, s2s_dns_retries/1]).
-export([s2s_dns_timeout/0, s2s_dns_timeout/1]).
-export([s2s_max_retry_delay/0]).
-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]).
-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]).
-export([s2s_protocol_options/0, s2s_protocol_options/1]).
-export([s2s_queue_type/0, s2s_queue_type/1]).
-export([s2s_timeout/0, s2s_timeout/1]).
Expand Down Expand Up @@ -275,6 +279,14 @@ c2s_dhfile() ->
c2s_dhfile(Host) ->
ejabberd_config:get_option({c2s_dhfile, Host}).

-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_delay() ->
ejabberd_config:get_option({c2s_max_send_queue_delay, global}).

-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_size() ->
ejabberd_config:get_option({c2s_max_send_queue_size, global}).

-spec c2s_protocol_options() -> 'undefined' | binary().
c2s_protocol_options() ->
c2s_protocol_options(global).
Expand Down Expand Up @@ -851,6 +863,20 @@ s2s_dns_timeout(Host) ->
s2s_max_retry_delay() ->
ejabberd_config:get_option({s2s_max_retry_delay, global}).

-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay() ->
s2s_max_send_queue_delay(global).
-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_delay, Host}).

-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size() ->
s2s_max_send_queue_size(global).
-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_size, Host}).

-spec s2s_protocol_options() -> 'undefined' | binary().
s2s_protocol_options() ->
s2s_protocol_options(global).
Expand Down
16 changes: 16 additions & 0 deletions src/ejabberd_options.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ opt_type(c2s_ciphers) ->
end;
opt_type(c2s_dhfile) ->
econf:file();
opt_type(c2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(c2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(c2s_protocol_options) ->
econf:and_then(
econf:list(econf:binary(), [unique]),
Expand Down Expand Up @@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) ->
econf:timeout(second, infinity);
opt_type(s2s_max_retry_delay) ->
econf:timeout(second);
opt_type(s2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(s2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(s2s_protocol_options) ->
opt_type(c2s_protocol_options);
opt_type(s2s_queue_type) ->
Expand Down Expand Up @@ -527,6 +535,8 @@ options() ->
{c2s_cafile, undefined},
{c2s_ciphers, undefined},
{c2s_dhfile, undefined},
{c2s_max_send_queue_delay, undefined},
{c2s_max_send_queue_size, undefined},
{c2s_protocol_options, undefined},
{c2s_tls_compression, undefined},
{ca_file, iolist_to_binary(pkix:get_cafile())},
Expand Down Expand Up @@ -635,6 +645,8 @@ options() ->
{s2s_dns_retries, 2},
{s2s_dns_timeout, timer:seconds(10)},
{s2s_max_retry_delay, timer:seconds(300)},
{s2s_max_send_queue_delay, 0},
{s2s_max_send_queue_size, 10},
{s2s_protocol_options, undefined},
{s2s_queue_type,
fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end},
Expand Down Expand Up @@ -705,6 +717,8 @@ globals() ->
auth_cache_life_time,
auth_cache_missed,
auth_cache_size,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
ca_file,
captcha_cmd,
captcha_host,
Expand Down Expand Up @@ -752,6 +766,8 @@ globals() ->
router_use_cache,
rpc_timeout,
s2s_max_retry_delay,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
shaper,
sm_cache_life_time,
sm_cache_missed,
Expand Down
50 changes: 50 additions & 0 deletions src/ejabberd_options_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{c2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 'c2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{c2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 'c2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{c2s_protocol_options,
#{value => "[Option, ...]",
desc =>
Expand Down Expand Up @@ -1118,6 +1143,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{s2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 's2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{s2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 's2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{s2s_protocol_options,
#{value => "[Option, ...]",
desc =>
Expand Down
5 changes: 4 additions & 1 deletion src/ejabberd_s2s_out.erl
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,19 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
false -> unlimited
end,
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = ejabberd_option:s2s_max_send_queue_size(),
MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(),
State1 = State#{on_route => queue,
queue => p1_queue:new(QueueType, QueueLimit),
xmlns => ?NS_SERVER,
lang => ejabberd_option:language(),
server_host => ServerHost,
shaper => none},
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay),
?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts",
[LServer, RServer]),
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]).
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]).

handle_call(Request, From, #{server_host := ServerHost} = State) ->
ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]).
Expand Down
9 changes: 7 additions & 2 deletions src/ejabberd_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ init([State, Opts]) ->
true -> TLSOpts1
end,
GlobalRoutes = proplists:get_value(global_routes, Opts, true),
MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10),
MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0),
Timeout = ejabberd_option:negotiation_timeout(),
State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)),
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
State3 = State2#{access => Access,
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
State4 = State3#{access => Access,
xmlns => ?NS_COMPONENT,
lang => ejabberd_option:language(),
server => ejabberd_config:get_myname(),
Expand All @@ -129,7 +132,7 @@ init([State, Opts]) ->
tls_options => TLSOpts,
global_routes => GlobalRoutes,
check_from => CheckFrom},
ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]).
ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]).

handle_stream_start(_StreamStart,
#{remote_server := RemoteServer,
Expand Down Expand Up @@ -302,6 +305,8 @@ listen_options() ->
{tls, false},
{tls_compression, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000},
{password, undefined},
{hosts, []},
Expand Down

0 comments on commit 2da98d5

Please sign in to comment.