Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

normalise hosts to URI::Generics, which reliably preserve defaults #112

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ fields => {

* Value type is <<array,array>>
* Default value is `["localhost:9200"]`
* Format for each entry is one of:
- a valid RFC3986 URI with scheme, hostname, and optional port
- an ipv4 address, optionally followed by a colon and port number
- a hostname, optionally followed by a colon and port number
- a square-bracketed ipv6 address, optionally followed by a colon and port number
- a bare ipv6 address

List of elasticsearch hosts to use for querying.

Expand Down Expand Up @@ -281,7 +287,9 @@ Comma-delimited list of `<field>:<direction>` pairs that define the sort order
* Value type is <<boolean,boolean>>
* Default value is `false`

SSL
Force SSL/TLS secured communication to Elasticsearch cluster.
Leaving this unspecified will use whatever scheme is specified in the URLs listed in <<plugins-{type}s-{plugin}-hosts>>, where mixed schemes are supported.
If SSL is set to `true`, the plugin will refuse to start if any of the hosts specifies an `http://` scheme.

[id="plugins-{type}s-{plugin}-tag_on_failure"]
===== `tag_on_failure`
Expand Down
87 changes: 85 additions & 2 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
require "logstash/json"
java_import "java.util.concurrent.ConcurrentHashMap"

require 'resolv'


class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
config_name "elasticsearch"
Expand Down Expand Up @@ -71,6 +73,8 @@ def register
@query_dsl = file.read
end

@normalised_hosts = normalise_hosts(@hosts, @ssl)

test_connection!
end # def register

Expand Down Expand Up @@ -140,8 +144,7 @@ def filter(event)
private
def client_options
{
:ssl => @ssl,
:hosts => @hosts,
:hosts => @normalised_hosts,
:ca_file => @ca_file,
:logger => @logger
}
Expand Down Expand Up @@ -191,4 +194,84 @@ def extract_total_from_hits(hits)
def test_connection!
get_client.client.ping
end

private

PATTERN_START_WITH_URI_SCHEME =
%r{\A[[:alpha:]][[:alnum:]\.\+\-]*://}i

PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT =
%r{\A([^:\[\]]+|\[[^\]]+\])(?::([0-9]+))?\Z}

##
# Map the provided array-of-strings to an array of `URI::Generic`
# instances, which the Elasticsearch client can use to establish
# connections.
#
# @param hosts [Array<String>]: (@see `#normalise_host`)
# @param force_ssl [Boolean]: (@see `#normalise_host`)
#
# @return [Array<URI::Generic>]
def normalise_hosts(hosts, force_ssl)
hosts.map { |input| normalise_host(input, force_ssl) }
end

##
# Convert the provided string to a `URI::Generic` instance, which the
# Elasticsearch client can use to establish connections.
#
# @param input [String]: a url, in one of the following formats:
# - a qualified URL with schema, hostname, and
# optional port
# - a bare hostname or ip, optionally followed by a
# colon and port number
# - a square-bracketed ipv6 literal, optionally
# followed by a colon and port number
# - a bare ipv6-address
# @param force_ssl [Boolean]: true to force SSL; will cause failure if one
# or more hosts explicitly supplies non-SSL
# scheme (e.g., `http`).
#
# @return [URI::Generic]
def normalise_host(input, force_ssl)
if force_ssl && input.start_with?('http://')
logger.error("Plugin configured to force SSL with `ssl => true`, " +
"but a host explicitly declared non-https URL `#{input}`")

raise LogStash::ConfigurationError, "Aborting due to conflicting configuration"
end

begin
if PATTERN_START_WITH_URI_SCHEME.match(input)
# Avoid `URI::parse`, which routes to specific implementations
# that inject defaults that do not make sense in this context.
URI::Generic.new(*URI.split(input))
yaauie marked this conversation as resolved.
Show resolved Hide resolved
else
if PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT.match(input)
host, port = Regexp.last_match.captures
elsif input =~ Resolv::IPv6::Regex
# per RFC3986: to be used as hostname in URIs, ipv6 literals
# MUST be wrapped in square-brackets.
host, port = "[#{input}]", nil
else
fail('unsupported format')
end
URI::Generic.new(
force_ssl ? 'https' : 'http',
nil, # userinfo,
host,
port,
nil, # registry
nil, # path
nil, # opaque
nil, # query
nil # fragment
)
end
rescue => e
logger.error("Plugin configured with invalid host value `#{input}`",
:exception => e.message, :class => e.class.name)
raise LogStash::ConfigurationError, "Aborting due to invalid configuration"
end
end
end #class LogStash::Filters::Elasticsearch
6 changes: 2 additions & 4 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ class ElasticsearchClient
attr_reader :client

def initialize(user, password, options={})
ssl = options.fetch(:ssl, false)
hosts = options[:hosts]
@logger = options[:logger]
hosts = options.fetch(:hosts)
@logger = options.fetch(:logger)

transport_options = {}
if user && password
token = ::Base64.strict_encode64("#{user}:#{password.value}")
transport_options[:headers] = { Authorization: "Basic #{token}" }
end

hosts.map! {|h| { host: h, scheme: 'https' } } if ssl
# set ca_file even if ssl isn't on, since the host can be an https url
ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file]
ssl_options ||= {}
Expand Down
196 changes: 191 additions & 5 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,199 @@

context "registration" do

let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) }
before do
allow(plugin).to receive(:test_connection!)
let(:plugin_class) { LogStash::Plugin.lookup("filter", "elasticsearch") }
let(:plugin) { plugin_class.new(config) }
let(:config) { Hash.new }

context 'with defaults' do
before do
allow(plugin).to receive(:test_connection!)
end

it "should not raise an exception" do
expect {plugin.register}.to_not raise_error
end
end

it "should not raise an exception" do
expect {plugin.register}.to_not raise_error
context 'hosts' do
let(:config) do
super().merge(
'hosts' => hosts
)
end
let(:hosts) do
fail NotImplementedError, 'spec or spec group must define `hosts`.'
end

let(:client_stub) { double(:client).as_null_object }
let(:logger_stub) { double(:logger).as_null_object }

before(:each) do
allow(plugin).to receive(:logger).and_return(logger_stub)
end

context 'with schema://hostname' do
let(:hosts) { ['http://foo.local', 'http://bar.local'] }

it 'creates client with URIs that do not include a port' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http', port: nil))
expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with `ssl => true`' do
let(:config) { super().merge('ssl' => 'true') }
context 'and one or more explicitly-http hosts' do
let(:hosts) { ['https://foo.local', 'http://bar.local'] }

it 'raises an exception' do
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
end

it 'emits a helpful log message' do
plugin.register rescue nil
expect(plugin.logger).to have_received(:error).with(match(/force SSL/))
end
end

context 'and all explicitly-https hosts' do
let(:hosts) { ['https://foo.local', 'https://bar.local'] }

it 'sets the schemas on all to https' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
options[:hosts].each do |host|
expect(host).to be_an URI
expect(host.scheme).to eq 'https'
end
end.and_return(client_stub)

plugin.register
end
end

context 'and one or more schemaless hosts' do
let(:hosts) { ['https://foo.local', 'bar.local'] }

it 'sets the schemas on all to https' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
options[:hosts].each do |host|
expect(host).to be_an URI
expect(host.scheme).to eq 'https'
end
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more ipv6 hostnames' do
let(:hosts) { ['[::1]', '[::2]:9201', 'https://[::3]:9202', '::4'] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::1]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::2]', port: 9201))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::3]', port: 9202))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: nil))
end.and_return(client_stub)

plugin.register
end
end
end

{
'with `ssl => false' => {'ssl' => 'false'},
'without `ssl` directive' => {}
}.each do |context_string, config_override|
context(context_string) do
let(:config) { super().merge(config_override) }

context 'with a mix of http and https hosts' do
let(:hosts) { ['https://foo.local', 'http://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with https-only hosts' do
let(:hosts) { ['https://foo.local', 'https://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with http-only hosts' do
let(:hosts) { ['http://foo.local', 'http://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more schemaless hosts' do
let(:hosts) { ['foo.local', 'bar.local' ] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more square-bracketed ipv6 literals' do
let(:hosts) { ['[::1]', '[::2]:9201', 'http://[::3]','https://[::4]:9202', '::5'] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::1]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::2]', port: 9201))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::3]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: 9202))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::5]', port: nil))
end.and_return(client_stub)

plugin.register
end
end
end
end
end
end

Expand Down