diff --git a/rebar.config b/rebar.config index 6db4fe2fc23..d0aa28d0f30 100644 --- a/rebar.config +++ b/rebar.config @@ -70,7 +70,7 @@ {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}}, {if_var_true, stun, {stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}}, - {xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}}, + {xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}}, {yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}} ]}. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 6f10389474e..8d86bc85d3d 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -555,6 +555,18 @@ init([State, Opts]) -> TLSVerify = proplists:get_bool(tls_verify, Opts), Zlib = proplists:get_bool(zlib, Opts), Timeout = ejabberd_option:negotiation_timeout(), + MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of + undefined -> + proplists:get_value(max_send_queue_size, Opts, 10); + C2SMaxQSize -> + C2SMaxQSize + end, + MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of + undefined -> + proplists:get_value(max_send_queue_delay, Opts, 0); + C2SMaxQDelay -> + C2SMaxQDelay + end, State1 = State#{tls_options => TLSOpts2, tls_required => TLSRequired, tls_enabled => TLSEnabled, @@ -567,7 +579,8 @@ init([State, Opts]) -> access => Access, shaper => Shaper}, State2 = xmpp_stream_in:set_timeout(State1, Timeout), - ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]). + State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay), + ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]). handle_call(get_presence, From, #{jid := JID} = State) -> Pres = case maps:get(pres_last, State, error) of @@ -1022,4 +1035,6 @@ listen_options() -> {tls_verify, false}, {zlib, false}, {max_stanza_size, infinity}, + {max_send_queue_size, 10}, + {max_send_queue_delay, 0}, {max_fsm_queue, 10000}]. diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 9c962be6e5c..01d0c67759f 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -680,6 +680,10 @@ listen_opt_type(tls) -> econf:bool(); listen_opt_type(max_stanza_size) -> econf:pos_int(infinity); +listen_opt_type(max_send_queue_size) -> + econf:non_neg_int(); +listen_opt_type(max_send_queue_delay) -> + econf:non_neg_int(); listen_opt_type(max_fsm_queue) -> econf:pos_int(); listen_opt_type(send_timeout) -> diff --git a/src/ejabberd_option.erl b/src/ejabberd_option.erl index 29b5b40e868..e912a7f5fe3 100644 --- a/src/ejabberd_option.erl +++ b/src/ejabberd_option.erl @@ -22,6 +22,8 @@ -export([c2s_cafile/0, c2s_cafile/1]). -export([c2s_ciphers/0, c2s_ciphers/1]). -export([c2s_dhfile/0, c2s_dhfile/1]). +-export([c2s_max_send_queue_delay/0]). +-export([c2s_max_send_queue_size/0]). -export([c2s_protocol_options/0, c2s_protocol_options/1]). -export([c2s_tls_compression/0, c2s_tls_compression/1]). -export([ca_file/0]). @@ -124,6 +126,8 @@ -export([s2s_dns_retries/0, s2s_dns_retries/1]). -export([s2s_dns_timeout/0, s2s_dns_timeout/1]). -export([s2s_max_retry_delay/0]). +-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]). +-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]). -export([s2s_protocol_options/0, s2s_protocol_options/1]). -export([s2s_queue_type/0, s2s_queue_type/1]). -export([s2s_timeout/0, s2s_timeout/1]). @@ -275,6 +279,14 @@ c2s_dhfile() -> c2s_dhfile(Host) -> ejabberd_config:get_option({c2s_dhfile, Host}). +-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer(). +c2s_max_send_queue_delay() -> + ejabberd_config:get_option({c2s_max_send_queue_delay, global}). + +-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer(). +c2s_max_send_queue_size() -> + ejabberd_config:get_option({c2s_max_send_queue_size, global}). + -spec c2s_protocol_options() -> 'undefined' | binary(). c2s_protocol_options() -> c2s_protocol_options(global). @@ -851,6 +863,20 @@ s2s_dns_timeout(Host) -> s2s_max_retry_delay() -> ejabberd_config:get_option({s2s_max_retry_delay, global}). +-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer(). +s2s_max_send_queue_delay() -> + s2s_max_send_queue_delay(global). +-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer(). +s2s_max_send_queue_delay(Host) -> + ejabberd_config:get_option({s2s_max_send_queue_delay, Host}). + +-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer(). +s2s_max_send_queue_size() -> + s2s_max_send_queue_size(global). +-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer(). +s2s_max_send_queue_size(Host) -> + ejabberd_config:get_option({s2s_max_send_queue_size, Host}). + -spec s2s_protocol_options() -> 'undefined' | binary(). s2s_protocol_options() -> s2s_protocol_options(global). diff --git a/src/ejabberd_options.erl b/src/ejabberd_options.erl index 8837ef4757a..7243c1e010e 100644 --- a/src/ejabberd_options.erl +++ b/src/ejabberd_options.erl @@ -95,6 +95,10 @@ opt_type(c2s_ciphers) -> end; opt_type(c2s_dhfile) -> econf:file(); +opt_type(c2s_max_send_queue_delay) -> + econf:non_neg_int(); +opt_type(c2s_max_send_queue_size) -> + econf:non_neg_int(); opt_type(c2s_protocol_options) -> econf:and_then( econf:list(econf:binary(), [unique]), @@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) -> econf:timeout(second, infinity); opt_type(s2s_max_retry_delay) -> econf:timeout(second); +opt_type(s2s_max_send_queue_delay) -> + econf:non_neg_int(); +opt_type(s2s_max_send_queue_size) -> + econf:non_neg_int(); opt_type(s2s_protocol_options) -> opt_type(c2s_protocol_options); opt_type(s2s_queue_type) -> @@ -527,6 +535,8 @@ options() -> {c2s_cafile, undefined}, {c2s_ciphers, undefined}, {c2s_dhfile, undefined}, + {c2s_max_send_queue_delay, undefined}, + {c2s_max_send_queue_size, undefined}, {c2s_protocol_options, undefined}, {c2s_tls_compression, undefined}, {ca_file, iolist_to_binary(pkix:get_cafile())}, @@ -635,6 +645,8 @@ options() -> {s2s_dns_retries, 2}, {s2s_dns_timeout, timer:seconds(10)}, {s2s_max_retry_delay, timer:seconds(300)}, + {s2s_max_send_queue_delay, 0}, + {s2s_max_send_queue_size, 10}, {s2s_protocol_options, undefined}, {s2s_queue_type, fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end}, @@ -705,6 +717,8 @@ globals() -> auth_cache_life_time, auth_cache_missed, auth_cache_size, + c2s_max_send_queue_delay, + c2s_max_send_queue_size, ca_file, captcha_cmd, captcha_host, @@ -752,6 +766,8 @@ globals() -> router_use_cache, rpc_timeout, s2s_max_retry_delay, + c2s_max_send_queue_delay, + c2s_max_send_queue_size, shaper, sm_cache_life_time, sm_cache_missed, diff --git a/src/ejabberd_options_doc.erl b/src/ejabberd_options_doc.erl index 9d4cea829df..ff39963d85d 100644 --- a/src/ejabberd_options_doc.erl +++ b/src/ejabberd_options_doc.erl @@ -430,6 +430,31 @@ doc() -> "dhparam -out dh.pem 2048\". If this option is not specified, " "2048-bit MODP Group with 256-bit Prime Order Subgroup will be " "used as defined in RFC5114 Section 2.3.")}}, + {c2s_max_send_queue_delay, + #{value => ?T("non_neg_integer()"), + desc => + [?T("Specifies the maximum number of milliseconds to queue an " + "outgoing stanza or stream management element. Setting this " + "option to a positive (non-zero) number allows for batching up " + "multiple XML elements into a single TCP packet in order to " + "reduce the TCP/IP overhead. The default value is '0', which " + "disables queueing."), "", + ?T("To set a specific file per listener, use the listener's " + "http://../listen-options/#max_send_queue_delay[max_send_queue_delay] " + "option. Please note that 'c2s_max_send_queue_delay' overrides " + "the listener's 'max_send_queue_delay' option."), ""]}}, + {c2s_max_send_queue_size, + #{value => ?T("non_neg_integer()"), + desc => + [?T("Specifies the maximum number of elements to add to the send " + "queue. The default value is '10'. Note that this option has " + "no effect if 'max_send_queue_delay' isn't set to a value " + "larger than '0'. Setting this option to '0' disables " + "queueing."), "", + ?T("To set a specific file per listener, use the listener's " + "http://../listen-options/#max_send_queue_size[max_send_queue_size] " + "option. Please note that 'c2s_max_send_queue_size' overrides " + "the listener's 'max_send_queue_size' option."), ""]}}, {c2s_protocol_options, #{value => "[Option, ...]", desc => @@ -1118,6 +1143,31 @@ doc() -> "dhparam -out dh.pem 2048\". If this option is not specified, " "2048-bit MODP Group with 256-bit Prime Order Subgroup will be " "used as defined in RFC5114 Section 2.3.")}}, + {s2s_max_send_queue_delay, + #{value => ?T("non_neg_integer()"), + desc => + [?T("Specifies the maximum number of milliseconds to queue an " + "outgoing stanza or stream management element. Setting this " + "option to a positive (non-zero) number allows for batching up " + "multiple XML elements into a single TCP packet in order to " + "reduce the TCP/IP overhead. The default value is '0', which " + "disables queueing."), "", + ?T("To set a specific file per listener, use the listener's " + "http://../listen-options/#max_send_queue_delay[max_send_queue_delay] " + "option. Please note that 's2s_max_send_queue_delay' overrides " + "the listener's 'max_send_queue_delay' option."), ""]}}, + {s2s_max_send_queue_size, + #{value => ?T("non_neg_integer()"), + desc => + [?T("Specifies the maximum number of elements to add to the send " + "queue. The default value is '10'. Note that this option has " + "no effect if 'max_send_queue_delay' isn't set to a value " + "larger than '0'. Setting this option to '0' disables " + "queueing."), "", + ?T("To set a specific file per listener, use the listener's " + "http://../listen-options/#max_send_queue_size[max_send_queue_size] " + "option. Please note that 's2s_max_send_queue_size' overrides " + "the listener's 'max_send_queue_size' option."), ""]}}, {s2s_protocol_options, #{value => "[Option, ...]", desc => diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index f7998240c6e..c9ab32285ab 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -279,6 +279,8 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) -> false -> unlimited end, Timeout = ejabberd_option:negotiation_timeout(), + MaxQSize = ejabberd_option:s2s_max_send_queue_size(), + MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(), State1 = State#{on_route => queue, queue => p1_queue:new(QueueType, QueueLimit), xmlns => ?NS_SERVER, @@ -286,9 +288,10 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) -> server_host => ServerHost, shaper => none}, State2 = xmpp_stream_out:set_timeout(State1, Timeout), + State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay), ?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts", [LServer, RServer]), - ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]). + ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]). handle_call(Request, From, #{server_host := ServerHost} = State) -> ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]). diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 5e386ed7d59..09cad827881 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -117,10 +117,13 @@ init([State, Opts]) -> true -> TLSOpts1 end, GlobalRoutes = proplists:get_value(global_routes, Opts, true), + MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10), + MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0), Timeout = ejabberd_option:negotiation_timeout(), State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)), State2 = xmpp_stream_in:set_timeout(State1, Timeout), - State3 = State2#{access => Access, + State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay), + State4 = State3#{access => Access, xmlns => ?NS_COMPONENT, lang => ejabberd_option:language(), server => ejabberd_config:get_myname(), @@ -129,7 +132,7 @@ init([State, Opts]) -> tls_options => TLSOpts, global_routes => GlobalRoutes, check_from => CheckFrom}, - ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]). + ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]). handle_stream_start(_StreamStart, #{remote_server := RemoteServer, @@ -302,6 +305,8 @@ listen_options() -> {tls, false}, {tls_compression, false}, {max_stanza_size, infinity}, + {max_send_queue_size, 10}, + {max_send_queue_delay, 0}, {max_fsm_queue, 10000}, {password, undefined}, {hosts, []},