diff --git a/docs/make.jl b/docs/make.jl index c8344c9..ce3fd24 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -2,7 +2,7 @@ using Documenter, CloudWatchLogs makedocs(; modules=[CloudWatchLogs], - format=Documenter.HTML(prettyurls=(get(ENV, "CI", nothing) == "true")), + format=Documenter.HTML(; prettyurls=(get(ENV, "CI", nothing) == "true")), pages=[ "Home" => "index.md", "API" => "pages/api.md", @@ -11,15 +11,9 @@ makedocs(; repo="https://github.com/invenia/CloudWatchLogs.jl/blob/{commit}{path}#L{line}", sitename="CloudWatchLogs.jl", authors="Invenia Technical Computing Corporation", - assets=[ - "assets/invenia.css", - "assets/logo.png", - ], - strict = true, - checkdocs = :exports, + assets=["assets/invenia.css", "assets/logo.png"], + strict=true, + checkdocs=:exports, ) -deploydocs(; - repo="github.com/invenia/CloudWatchLogs.jl", - target="build", -) +deploydocs(; repo="github.com/invenia/CloudWatchLogs.jl", target="build") diff --git a/src/CloudWatchLogs.jl b/src/CloudWatchLogs.jl index 75eab86..e056fe6 100644 --- a/src/CloudWatchLogs.jl +++ b/src/CloudWatchLogs.jl @@ -33,10 +33,11 @@ const MAX_BATCH_LENGTH = 10000 # 5 requests per second per log stream. This limit cannot be changed. const PUTLOGEVENTS_RATE_LIMIT = 0.2 -const PUTLOGEVENTS_DELAYS = - ExponentialBackOff(n=10, first_delay=PUTLOGEVENTS_RATE_LIMIT, factor=1.1) +const PUTLOGEVENTS_DELAYS = ExponentialBackOff(; + n=10, first_delay=PUTLOGEVENTS_RATE_LIMIT, factor=1.1 +) -const GENERIC_AWS_DELAYS = ExponentialBackOff(n=10, first_delay=0.2, factor=2, jitter=0.2) +const GENERIC_AWS_DELAYS = ExponentialBackOff(; n=10, first_delay=0.2, factor=2, jitter=0.2) __init__() = Memento.register(LOGGER) diff --git a/src/event.jl b/src/event.jl index da7294c..fdf3241 100644 --- a/src/event.jl +++ b/src/event.jl @@ -39,18 +39,21 @@ struct LogEvent idx = MAX_EVENT_SIZE - 29 # Truncated messages include a "..." message = string(message[1:idx], "...") - warn(LOGGER, "CloudWatch Log Event message cannot be more than $MAX_EVENT_SIZE bytes") + warn( + LOGGER, + "CloudWatch Log Event message cannot be more than $MAX_EVENT_SIZE bytes", + ) end if timestamp < 0 throw(ArgumentError("Log Event timestamp must be non-negative")) end - new(message, timestamp) + return new(message, timestamp) end end -function LogEvent(message::AbstractString, dt::Union{DateTime, ZonedDateTime}) +function LogEvent(message::AbstractString, dt::Union{DateTime,ZonedDateTime}) return LogEvent(message, unix_timestamp_ms(dt)) end diff --git a/src/exceptions.jl b/src/exceptions.jl index 9bd255c..f53a645 100644 --- a/src/exceptions.jl +++ b/src/exceptions.jl @@ -3,7 +3,7 @@ abstract type CloudWatchLogsException <: Exception end struct StreamNotFoundException <: CloudWatchLogsException stream::String group::String - msg::Union{String, Nothing} + msg::Union{String,Nothing} end function Base.showerror(io::IO, exception::StreamNotFoundException) diff --git a/src/handler.jl b/src/handler.jl index b2f1bdb..802b2ab 100644 --- a/src/handler.jl +++ b/src/handler.jl @@ -31,12 +31,10 @@ function CloudWatchLogHandler( log_group_name::AbstractString, log_stream_name::AbstractString, formatter::F=DefaultFormatter(), -) where F<:Formatter +) where {F<:Formatter} ch = Channel{LogEvent}(Inf) handler = CloudWatchLogHandler( - CloudWatchLogStream(config, log_group_name, log_stream_name), - ch, - formatter, + CloudWatchLogStream(config, log_group_name, log_stream_name), ch, formatter ) tsk = @async process_logs!(handler) @@ -62,10 +60,13 @@ function process_available_logs!(handler::CloudWatchLogHandler) end if isempty(events) - warn(LOGGER, string( - "Channel was ready but no events were found. ", - "Is there another task pulling logs from this handler?", - )) + warn( + LOGGER, + string( + "Channel was ready but no events were found. ", + "Is there another task pulling logs from this handler?", + ), + ) end try @@ -113,5 +114,5 @@ function Memento.emit(handler::CloudWatchLogHandler, record::Record) dt = isdefined(record, :date) ? record.date : Dates.now(tz"UTC") message = format(handler.fmt, record) event = LogEvent(message, dt) - put!(handler.channel, event) + return put!(handler.channel, event) end diff --git a/src/stream.jl b/src/stream.jl index 39643d5..5b2a1a9 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -1,8 +1,4 @@ -const RETRYABLE_CODES = ( - "IncompleteSignature", - "ThrottlingException", - "RequestExpired", -) +const RETRYABLE_CODES = ("IncompleteSignature", "ThrottlingException", "RequestExpired") function aws_retry_cond(s, e) if e isa AWSException && (500 <= e.cause.status <= 504 || e.code in RETRYABLE_CODES) @@ -16,13 +12,13 @@ function aws_retry_cond(s, e) return (s, false) end -aws_retry(f) = retry(f, delays=GENERIC_AWS_DELAYS, check=aws_retry_cond)() +aws_retry(f) = retry(f; delays=GENERIC_AWS_DELAYS, check=aws_retry_cond)() struct CloudWatchLogStream config::AWSConfig log_group_name::String log_stream_name::String - token::Ref{Union{String, Nothing}} + token::Ref{Union{String,Nothing}} end """ @@ -34,15 +30,10 @@ This constructor will automatically fetch the latest [sequence token](https://do for the stream. """ function CloudWatchLogStream( - config::AWSConfig, - log_group_name::AbstractString, - log_stream_name::AbstractString, + config::AWSConfig, log_group_name::AbstractString, log_stream_name::AbstractString ) stream = CloudWatchLogStream( - config, - log_group_name, - log_stream_name, - Ref{Union{String, Nothing}}(), + config, log_group_name, log_stream_name, Ref{Union{String,Nothing}}() ) update_sequence_token!(stream) return stream @@ -61,16 +52,18 @@ function create_group( config::AWSConfig, # this probably won't collide, most callers should add identifying information though log_group_name::AbstractString="julia-$(uuid4())"; - tags::AbstractDict{<:AbstractString, <:AbstractString}=Dict{String, String}(), + tags::AbstractDict{<:AbstractString,<:AbstractString}=Dict{String,String}(), ) if isempty(tags) aws_retry() do CloudWatch_Logs.create_log_group(log_group_name; aws_config=config) end else - tags = Dict{String, String}(tags) + tags = Dict{String,String}(tags) aws_retry() do - CloudWatch_Logs.create_log_group(log_group_name, Dict("tags"=>tags); aws_config=config) + CloudWatch_Logs.create_log_group( + log_group_name, Dict("tags" => tags); aws_config=config + ) end end return String(log_group_name) @@ -81,10 +74,7 @@ end Delete a CloudWatch Log Group. """ -function delete_group( - config::AWSConfig, - log_group_name::AbstractString, -) +function delete_group(config::AWSConfig, log_group_name::AbstractString) aws_retry() do CloudWatch_Logs.delete_log_group(log_group_name; aws_config=config) end @@ -107,7 +97,9 @@ function create_stream( log_stream_name::AbstractString="julia-$(uuid4())", ) aws_retry() do - CloudWatch_Logs.create_log_stream(log_group_name, log_stream_name; aws_config=config) + CloudWatch_Logs.create_log_stream( + log_group_name, log_stream_name; aws_config=config + ) end return String(log_stream_name) end @@ -118,12 +110,12 @@ end Delete a CloudWatch Log Stream from a given Log Group. """ function delete_stream( - config::AWSConfig, - log_group_name::AbstractString, - log_stream_name::AbstractString, + config::AWSConfig, log_group_name::AbstractString, log_stream_name::AbstractString ) aws_retry() do - CloudWatch_Logs.delete_log_stream(log_group_name, log_stream_name; aws_config=config) + CloudWatch_Logs.delete_log_stream( + log_group_name, log_stream_name; aws_config=config + ) end return nothing end @@ -139,7 +131,11 @@ function new_sequence_token(stream::CloudWatchLogStream) return new_sequence_token(stream.config, stream.log_group_name, stream.log_stream_name) end -describe_log_streams(config::AWSConfig, log_group_name::AbstractString, params::AbstractDict) = CloudWatch_Logs.describe_log_streams(log_group_name, params; aws_config=config) +function describe_log_streams( + config::AWSConfig, log_group_name::AbstractString, params::AbstractDict +) + return CloudWatch_Logs.describe_log_streams(log_group_name, params; aws_config=config) +end """ new_sequence_token(stream::CloudWatchLogStream) -> Union{String, Nothing} @@ -151,10 +147,8 @@ Returns `nothing` if the stream does not have a sequence token yet (e.g., if no been logged). """ function new_sequence_token( - config::AWSConfig, - log_group::AbstractString, - log_stream::AbstractString, -)::Union{String, Nothing} + config::AWSConfig, log_group::AbstractString, log_stream::AbstractString +)::Union{String,Nothing} response = aws_retry() do @mock describe_log_streams( config, @@ -196,19 +190,18 @@ Alternatively, set the token for the stream to `token`. Returns the token. """ function update_sequence_token!( - stream::CloudWatchLogStream, - token::Union{String, Nothing}=new_sequence_token(stream), + stream::CloudWatchLogStream, token::Union{String,Nothing}=new_sequence_token(stream) ) - stream.token[] = token + return stream.token[] = token end function _put_log_events(stream::CloudWatchLogStream, events::AbstractVector{LogEvent}) - CloudWatch_Logs.put_log_events( + return CloudWatch_Logs.put_log_events( events, stream.log_group_name, stream.log_stream_name, Dict("sequenceToken" => sequence_token(stream)); - aws_config=stream.config + aws_config=stream.config, ) end @@ -232,23 +225,27 @@ Returns the number of events successfully submitted. """ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEvent}) if length(events) > MAX_BATCH_LENGTH - error(LOGGER, LogSubmissionException( - "Log batch length exceeded 10000 events; submit fewer log events at once" - )) + error( + LOGGER, + LogSubmissionException( + "Log batch length exceeded 10000 events; submit fewer log events at once" + ), + ) end batch_size = sum(aws_size, events) if batch_size > MAX_BATCH_SIZE - error(LOGGER, LogSubmissionException( - "Log batch size exceeded 1 MiB; submit fewer log events at once" - )) + error( + LOGGER, + LogSubmissionException( + "Log batch size exceeded 1 MiB; submit fewer log events at once" + ), + ) end if !issorted(events; by=timestamp) - debug(LOGGER, - "Log submission will be faster if log events are sorted by timestamp" - ) + debug(LOGGER, "Log submission will be faster if log events are sorted by timestamp") # a stable sort to avoid putting related messages out of order sort!(events; alg=MergeSort, by=timestamp) @@ -256,9 +253,12 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven min_timestamp, max_timestamp = extrema(timestamp(e) for e in events) if max_timestamp - min_timestamp > 24 * 3600 * 1000 # 24 hours in milliseconds - error(LOGGER, LogSubmissionException( - "Log events cannot span more than 24 hours; submit log events separately" - )) + error( + LOGGER, + LogSubmissionException( + "Log events cannot span more than 24 hours; submit log events separately" + ), + ) end function retry_cond(s, e) @@ -286,7 +286,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven return (s, false) end - f = retry(delays=PUTLOGEVENTS_DELAYS, check=retry_cond) do + f = retry(; delays=PUTLOGEVENTS_DELAYS, check=retry_cond) do @mock CloudWatchLogs._put_log_events(stream, events) end @@ -310,7 +310,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven string( "Cannot log the following events, ", "as they are older than the log retention policy allows: ", - events[1:idx-1], + events[1:(idx - 1)], ) end end @@ -323,7 +323,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven string( "Cannot log the following events, ", "as they are more than 14 days old: ", - events[1:idx-1], + events[1:(idx - 1)], ) end end @@ -336,7 +336,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven string( "Cannot log the following events, ", "as they are newer than 2 hours in the future: ", - events[idx+1:end], + events[(idx + 1):end], ) end end diff --git a/test/event.jl b/test/event.jl index e73a1e7..3bd5859 100644 --- a/test/event.jl +++ b/test/event.jl @@ -1,34 +1,32 @@ @testset "LogEvent" begin + @testset "Timestamp" begin + time_in_ms = round(Int64, time() * 1000) -@testset "Timestamp" begin - time_in_ms = round(Int64, time() * 1000) + event = LogEvent("Foo", time_in_ms) + @test CloudWatchLogs.timestamp(event) == time_in_ms - event = LogEvent("Foo", time_in_ms) - @test CloudWatchLogs.timestamp(event) == time_in_ms + dt = DateTime(Dates.UTM(time_in_ms + Dates.UNIXEPOCH)) + event = LogEvent("Foo", dt) + @test CloudWatchLogs.timestamp(event) == time_in_ms - dt = DateTime(Dates.UTM(time_in_ms + Dates.UNIXEPOCH)) - event = LogEvent("Foo", dt) - @test CloudWatchLogs.timestamp(event) == time_in_ms + zdt = ZonedDateTime(dt, tz"UTC") + event = LogEvent("Foo", zdt) + @test CloudWatchLogs.timestamp(event) == time_in_ms - zdt = ZonedDateTime(dt, tz"UTC") - event = LogEvent("Foo", zdt) - @test CloudWatchLogs.timestamp(event) == time_in_ms - - event = LogEvent("Foo") - one_hour = Dates.value(Millisecond(Hour(1))) - @test time_in_ms <= CloudWatchLogs.timestamp(event) <= time_in_ms + one_hour -end - -@testset "Message" begin - @test CloudWatchLogs.message(LogEvent("Foo")) == "Foo" -end + event = LogEvent("Foo") + one_hour = Dates.value(Millisecond(Hour(1))) + @test time_in_ms <= CloudWatchLogs.timestamp(event) <= time_in_ms + one_hour + end -@testset "Bad construction" begin - @test_throws ArgumentError LogEvent("") - @test_throws ArgumentError LogEvent("Foo", -45) - @test_warn getlogger("CloudWatchLogs") "Log Event message cannot be more than" begin - LogEvent("A" ^ (MAX_EVENT_SIZE - 25)) + @testset "Message" begin + @test CloudWatchLogs.message(LogEvent("Foo")) == "Foo" end -end + @testset "Bad construction" begin + @test_throws ArgumentError LogEvent("") + @test_throws ArgumentError LogEvent("Foo", -45) + @test_warn getlogger("CloudWatchLogs") "Log Event message cannot be more than" begin + LogEvent("A"^(MAX_EVENT_SIZE - 25)) + end + end end diff --git a/test/mocked_aws.jl b/test/mocked_aws.jl index 15b4bef..3afb617 100644 --- a/test/mocked_aws.jl +++ b/test/mocked_aws.jl @@ -1,122 +1,141 @@ # These tests should never contact AWS @testset "Mocked AWS Interactions" begin + CFG = AWSConfig() -CFG = AWSConfig() - -function dls_patch(output) - @patch function CloudWatchLogs.describe_log_streams(config, log_group_name, params) - output + function dls_patch(output) + @patch function CloudWatchLogs.describe_log_streams(config, log_group_name, params) + return output + end end -end -put_patch = @patch function CloudWatchLogs._put_log_events(stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent}) - return Dict("nextSequenceToken" => "3") -end - -function submit_patch(log_dump) - @patch function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent}) - append!(log_dump, events) - return length(events) + put_patch = @patch function CloudWatchLogs._put_log_events( + stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent} + ) + return Dict("nextSequenceToken" => "3") end -end -function throttle_patch() - first_time = true - @patch function CloudWatchLogs._put_log_events(stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent}) - if first_time - first_time = false - response = HTTP.Messages.Response(400, "") - http_error = HTTP.ExceptionRequest.StatusError(400, "", "", response) - throw(AWSException("ThrottlingException", "", "", http_error)) + function submit_patch(log_dump) + @patch function submit_logs( + stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent} + ) + append!(log_dump, events) + return length(events) end - - return Dict() end -end -streams = [ - Dict( - "storageBytes" => 1048576, - "arn" => "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group-1:log-stream:my-log-stream-1", - "creationTime" => 1393545600000, - "firstEventTimestamp" => 1393545600000, - "lastEventTimestamp" => 1393567800000, - "lastIngestionTime" => 1393589200000, - "logStreamName" => "my-log-stream-1", - "uploadSequenceToken" => "88602967394531410094953670125156212707622379445839968487", - ), - Dict( - "storageBytes" => 5242880, - "arn" => "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group-2:log-stream:my-log-stream-2", - "creationTime" => 1396224000000, - "firstEventTimestamp" => 1396224000000, - "lastEventTimestamp" => 1396235500000, - "lastIngestionTime" => 1396225560000, - "logStreamName" => "my-log-stream-2", - "uploadSequenceToken" => "07622379445839968487886029673945314100949536701251562127", - ), -] - -@testset "Sequence Token" begin - # no streams - apply(dls_patch(Dict("logStreams" => []))) do - @test_throws LOGGER StreamNotFoundException CloudWatchLogs.new_sequence_token(CFG, "group", "stream") - end + function throttle_patch() + first_time = true + @patch function CloudWatchLogs._put_log_events( + stream::CloudWatchLogStream, events::AbstractVector{CloudWatchLogs.LogEvent} + ) + if first_time + first_time = false + response = HTTP.Messages.Response(400, "") + http_error = HTTP.ExceptionRequest.StatusError(400, "", "", response) + throw(AWSException("ThrottlingException", "", "", http_error)) + end - # prefix-only match - apply(dls_patch(Dict("logStreams" => streams))) do - @test_throws LOGGER StreamNotFoundException CloudWatchLogs.new_sequence_token(CFG, "my-log-group-1", "my-log-stream") + return Dict() + end end - # match - apply(dls_patch(Dict("logStreams" => streams[1:1]))) do - @test CloudWatchLogs.new_sequence_token(CFG, "my-log-group-1", "my-log-stream-1") == "88602967394531410094953670125156212707622379445839968487" - end -end + streams = [ + Dict( + "storageBytes" => 1048576, + "arn" => "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group-1:log-stream:my-log-stream-1", + "creationTime" => 1393545600000, + "firstEventTimestamp" => 1393545600000, + "lastEventTimestamp" => 1393567800000, + "lastIngestionTime" => 1393589200000, + "logStreamName" => "my-log-stream-1", + "uploadSequenceToken" => "88602967394531410094953670125156212707622379445839968487", + ), + Dict( + "storageBytes" => 5242880, + "arn" => "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group-2:log-stream:my-log-stream-2", + "creationTime" => 1396224000000, + "firstEventTimestamp" => 1396224000000, + "lastEventTimestamp" => 1396235500000, + "lastIngestionTime" => 1396225560000, + "logStreamName" => "my-log-stream-2", + "uploadSequenceToken" => "07622379445839968487886029673945314100949536701251562127", + ), + ] + + @testset "Sequence Token" begin + # no streams + apply(dls_patch(Dict("logStreams" => []))) do + @test_throws LOGGER StreamNotFoundException CloudWatchLogs.new_sequence_token( + CFG, "group", "stream" + ) + end + + # prefix-only match + apply(dls_patch(Dict("logStreams" => streams))) do + @test_throws LOGGER StreamNotFoundException CloudWatchLogs.new_sequence_token( + CFG, "my-log-group-1", "my-log-stream" + ) + end -@testset "CloudWatchLogStream" begin - apply([dls_patch(Dict("logStreams" => streams)), put_patch]) do - stream = CloudWatchLogStream(CFG, "my-log-group-1", "my-log-stream-1") - @test CloudWatchLogs.sequence_token(stream) == "88602967394531410094953670125156212707622379445839968487" - # no AWS requests are sent so it shouldn't matter that these timestamps are so small - @test submit_logs(stream, [CloudWatchLogs.LogEvent("Help", 10), CloudWatchLogs.LogEvent("Alert", 124)]) == 2 - # should be the sequence token returned from the API call in put_patch - @test CloudWatchLogs.sequence_token(stream) == "3" + # match + apply(dls_patch(Dict("logStreams" => streams[1:1]))) do + @test CloudWatchLogs.new_sequence_token( + CFG, "my-log-group-1", "my-log-stream-1" + ) == "88602967394531410094953670125156212707622379445839968487" + end end -end -@testset "CloudWatchLogHandler" begin - logs = CloudWatchLogs.LogEvent[] - apply([dls_patch(Dict("logStreams" => streams[1:1])), submit_patch(logs)]) do - cwlh = CloudWatchLogHandler(CFG, "my-log-group-1", "my-log-stream-1", DefaultFormatter("{msg}")) - logger = Logger("CWLHTest"; propagate=false) - push!(logger, cwlh) - for c = 'a':'e' - warn(logger, "$c") + @testset "CloudWatchLogStream" begin + apply([dls_patch(Dict("logStreams" => streams)), put_patch]) do + stream = CloudWatchLogStream(CFG, "my-log-group-1", "my-log-stream-1") + @test CloudWatchLogs.sequence_token(stream) == + "88602967394531410094953670125156212707622379445839968487" + # no AWS requests are sent so it shouldn't matter that these timestamps are so small + @test submit_logs( + stream, + [ + CloudWatchLogs.LogEvent("Help", 10), + CloudWatchLogs.LogEvent("Alert", 124), + ], + ) == 2 + # should be the sequence token returned from the API call in put_patch + @test CloudWatchLogs.sequence_token(stream) == "3" end - sleep(5 * CloudWatchLogs.PUTLOGEVENTS_RATE_LIMIT) # probably max time we might have to wait + end - messages = map(le -> le.message, logs) - timestamps = map(le -> le.timestamp, logs) + @testset "CloudWatchLogHandler" begin + logs = CloudWatchLogs.LogEvent[] + apply([dls_patch(Dict("logStreams" => streams[1:1])), submit_patch(logs)]) do + cwlh = CloudWatchLogHandler( + CFG, "my-log-group-1", "my-log-stream-1", DefaultFormatter("{msg}") + ) + logger = Logger("CWLHTest"; propagate=false) + push!(logger, cwlh) + for c in 'a':'e' + warn(logger, "$c") + end + sleep(5 * CloudWatchLogs.PUTLOGEVENTS_RATE_LIMIT) # probably max time we might have to wait + + messages = map(le -> le.message, logs) + timestamps = map(le -> le.timestamp, logs) - @test messages == map(string, 'a':'e') - @test issorted(timestamps) + @test messages == map(string, 'a':'e') + @test issorted(timestamps) + end end -end -@testset "Throttled" begin - start_time = CloudWatchLogs.unix_timestamp_ms() - apply([dls_patch(Dict("logStreams" => streams)), throttle_patch()]) do - stream = CloudWatchLogStream(CFG, "my-log-group-1", "my-log-stream-1") - event = LogEvent("log", start_time) + @testset "Throttled" begin + start_time = CloudWatchLogs.unix_timestamp_ms() + apply([dls_patch(Dict("logStreams" => streams)), throttle_patch()]) do + stream = CloudWatchLogStream(CFG, "my-log-group-1", "my-log-stream-1") + event = LogEvent("log", start_time) - setlevel!(LOGGER, "debug") do - @test_log LOGGER "debug" "ThrottlingException" begin - submit_log(stream, event) + setlevel!(LOGGER, "debug") do + @test_log LOGGER "debug" "ThrottlingException" begin + submit_log(stream, event) + end end end end end - -end diff --git a/test/online.jl b/test/online.jl index 69bc222..bad8485 100644 --- a/test/online.jl +++ b/test/online.jl @@ -1,471 +1,476 @@ @testset "Online" begin - -CI_USER_CFG = global_aws_config() -# do not set this variable in CI; it should be versioned with the code -# this is for locally overriding the stack used in testing -TEST_STACK_NAME = get(ENV, "CLOUDWATCHLOGSJL_STACK_NAME", "CloudWatchLogs-jl-00015") -TEST_RESOURCE_PREFIX = "pubci-$TEST_STACK_NAME-cwl-test" -TEST_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group" -FORBIDDEN_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group-forbidden" -FORBIDDEN_GROUP_LOG_STREAM = "$TEST_RESOURCE_PREFIX-group-forbidden-stream" -BAD_STREAM_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group-badstream" -FORBIDDEN_LOG_STREAM = "$TEST_RESOURCE_PREFIX-stream-forbidden" -TEST_ROLE = stack_output(CI_USER_CFG, TEST_STACK_NAME)["LogTestRoleArn"] - -# This sets the global AWSConfig to what is passed in -CFG = global_aws_config(creds=assume_role(CI_USER_CFG, TEST_ROLE, Dict("DurationSeconds"=>"7200"))) -LOG_RUN_ID = uuid1() - -new_stream = let - counter = 1 - - function new_stream(category::AbstractString) - stream_name = @sprintf "pubci-%s-%03d-%s" category counter LOG_RUN_ID - counter += 1 - return stream_name + CI_USER_CFG = global_aws_config() + # do not set this variable in CI; it should be versioned with the code + # this is for locally overriding the stack used in testing + TEST_STACK_NAME = get(ENV, "CLOUDWATCHLOGSJL_STACK_NAME", "CloudWatchLogs-jl-00015") + TEST_RESOURCE_PREFIX = "pubci-$TEST_STACK_NAME-cwl-test" + TEST_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group" + FORBIDDEN_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group-forbidden" + FORBIDDEN_GROUP_LOG_STREAM = "$TEST_RESOURCE_PREFIX-group-forbidden-stream" + BAD_STREAM_LOG_GROUP = "$TEST_RESOURCE_PREFIX-group-badstream" + FORBIDDEN_LOG_STREAM = "$TEST_RESOURCE_PREFIX-stream-forbidden" + TEST_ROLE = stack_output(CI_USER_CFG, TEST_STACK_NAME)["LogTestRoleArn"] + + # This sets the global AWSConfig to what is passed in + CFG = global_aws_config(; + creds=assume_role(CI_USER_CFG, TEST_ROLE, Dict("DurationSeconds" => "7200")) + ) + LOG_RUN_ID = uuid1() + + new_stream = let + counter = 1 + + function new_stream(category::AbstractString) + stream_name = @sprintf "pubci-%s-%03d-%s" category counter LOG_RUN_ID + counter += 1 + return stream_name + end end -end -new_group = let - counter = 1 + new_group = let + counter = 1 - function new_group(category::AbstractString) - stream_name = @sprintf "pubci-%s-%03d-%s" category counter LOG_RUN_ID - counter += 1 - return stream_name + function new_group(category::AbstractString) + stream_name = @sprintf "pubci-%s-%03d-%s" category counter LOG_RUN_ID + counter += 1 + return stream_name + end end -end -@testset "Create/delete groups and streams" begin - @testset "Named group" begin - group_name = new_group("create_group") - @test create_group(CFG, group_name; tags=Dict("Temporary"=>"true")) == group_name + @testset "Create/delete groups and streams" begin + @testset "Named group" begin + group_name = new_group("create_group") + @test create_group(CFG, group_name; tags=Dict("Temporary" => "true")) == + group_name - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test !isempty(groups) - @test groups[1]["logGroupName"] == group_name + @test !isempty(groups) + @test groups[1]["logGroupName"] == group_name - delete_group(CFG, group_name) + delete_group(CFG, group_name) - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test isempty(groups) || groups[1]["logGroupName"] != group_name - end + @test isempty(groups) || groups[1]["logGroupName"] != group_name + end - @testset "Named group no tags" begin - group_name = new_group("create_group_no_tags") - @test create_group(CFG, group_name) == group_name + @testset "Named group no tags" begin + group_name = new_group("create_group_no_tags") + @test create_group(CFG, group_name) == group_name - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test !isempty(groups) - @test groups[1]["logGroupName"] == group_name + @test !isempty(groups) + @test groups[1]["logGroupName"] == group_name - delete_group(CFG, group_name) + delete_group(CFG, group_name) - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test isempty(groups) || groups[1]["logGroupName"] != group_name - end + @test isempty(groups) || groups[1]["logGroupName"] != group_name + end - @testset "Unnamed group" begin - group_name = create_group(CFG; tags=Dict("Temporary"=>"true")) + @testset "Unnamed group" begin + group_name = create_group(CFG; tags=Dict("Temporary" => "true")) - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test !isempty(groups) - @test groups[1]["logGroupName"] == group_name + @test !isempty(groups) + @test groups[1]["logGroupName"] == group_name - delete_group(CFG, group_name) + delete_group(CFG, group_name) - response = CloudWatch_Logs.describe_log_groups( - Dict("logGroupNamePrefix" => group_name, "limit" => 1) - ) - groups = response["logGroups"] + response = CloudWatch_Logs.describe_log_groups( + Dict("logGroupNamePrefix" => group_name, "limit" => 1) + ) + groups = response["logGroups"] - @test isempty(groups) || groups[1]["logGroupName"] != group_name - end + @test isempty(groups) || groups[1]["logGroupName"] != group_name + end - @testset "Not allowed" begin - @test_throws AWSException create_group(CFG, "delta∆") # invalid characters - @test_throws AWSException delete_group(CFG, TEST_LOG_GROUP) # explicitly forbidden - end + @testset "Not allowed" begin + @test_throws AWSException create_group(CFG, "delta∆") # invalid characters + @test_throws AWSException delete_group(CFG, TEST_LOG_GROUP) # explicitly forbidden + end - @testset "Named stream" begin - stream_name = new_stream("create_stream") - @test create_stream(CFG, TEST_LOG_GROUP, stream_name) == stream_name - - response = CloudWatch_Logs.describe_log_streams( - TEST_LOG_GROUP, - Dict( - "logStreamNamePrefix" => stream_name, - "limit" => 1, - "orderBy" => "LogStreamName"), - ) - streams = response["logStreams"] - - @test !isempty(streams) - @test streams[1]["logStreamName"] == stream_name - - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - - response = CloudWatch_Logs.describe_log_streams( - TEST_LOG_GROUP, - Dict( - "logStreamNamePrefix" => stream_name, - "limit" => 1, - "orderBy" => "LogStreamName"), - ) - streams = response["logStreams"] - - @test isempty(streams) || streams[1]["logStreamName"] != stream_name - end + @testset "Named stream" begin + stream_name = new_stream("create_stream") + @test create_stream(CFG, TEST_LOG_GROUP, stream_name) == stream_name + + response = CloudWatch_Logs.describe_log_streams( + TEST_LOG_GROUP, + Dict( + "logStreamNamePrefix" => stream_name, + "limit" => 1, + "orderBy" => "LogStreamName", + ), + ) + streams = response["logStreams"] - @testset "Unnamed stream" begin - stream_name = create_stream(CFG, TEST_LOG_GROUP) - - response = CloudWatch_Logs.describe_log_streams( - TEST_LOG_GROUP, - Dict( - "logStreamNamePrefix" => stream_name, - "limit" => 1, - "orderBy" => "LogStreamName"), - ) - streams = response["logStreams"] - - @test !isempty(streams) - @test streams[1]["logStreamName"] == stream_name - - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - - response = CloudWatch_Logs.describe_log_streams( - TEST_LOG_GROUP, - Dict( - "logStreamNamePrefix" => stream_name, - "limit" => 1, - "orderBy" => "LogStreamName"), - ) - streams = response["logStreams"] - - @test isempty(streams) || streams[1]["logStreamName"] != stream_name - end + @test !isempty(streams) + @test streams[1]["logStreamName"] == stream_name - @testset "Not allowed" begin - @test_throws AWSException create_stream(CFG, FORBIDDEN_LOG_GROUP) - @test_throws AWSException delete_stream(CFG, FORBIDDEN_LOG_GROUP, FORBIDDEN_GROUP_LOG_STREAM) - end -end + delete_stream(CFG, TEST_LOG_GROUP, stream_name) + + response = CloudWatch_Logs.describe_log_streams( + TEST_LOG_GROUP, + Dict( + "logStreamNamePrefix" => stream_name, + "limit" => 1, + "orderBy" => "LogStreamName", + ), + ) + streams = response["logStreams"] -@testset "CloudWatchLogStream" begin - @testset "Normal log submission" begin - start_time = CloudWatchLogs.unix_timestamp_ms() - stream_name = new_stream("stream_type") - @test create_stream(CFG, TEST_LOG_GROUP, stream_name) == stream_name + @test isempty(streams) || streams[1]["logStreamName"] != stream_name + end - stream = CloudWatchLogStream(CFG, TEST_LOG_GROUP, stream_name) - @test submit_log(stream, LogEvent("Hello AWS")) == 1 - @test submit_logs(stream, LogEvent.(["Second log", "Third log"])) == 2 + @testset "Unnamed stream" begin + stream_name = create_stream(CFG, TEST_LOG_GROUP) - sleep(2) # wait until AWS has injested the logs; this may or may not be enough - response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) - ) + response = CloudWatch_Logs.describe_log_streams( + TEST_LOG_GROUP, + Dict( + "logStreamNamePrefix" => stream_name, + "limit" => 1, + "orderBy" => "LogStreamName", + ), + ) + streams = response["logStreams"] - time_range = (start_time - 10):(CloudWatchLogs.unix_timestamp_ms() + 10) + @test !isempty(streams) + @test streams[1]["logStreamName"] == stream_name - @test length(response["events"]) == 3 - messages = map(response["events"]) do event - @test round(Int64, event["timestamp"]) in time_range - event["message"] - end + delete_stream(CFG, TEST_LOG_GROUP, stream_name) - @test messages == ["Hello AWS", "Second log", "Third log"] - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - end + response = CloudWatch_Logs.describe_log_streams( + TEST_LOG_GROUP, + Dict( + "logStreamNamePrefix" => stream_name, + "limit" => 1, + "orderBy" => "LogStreamName", + ), + ) + streams = response["logStreams"] - @testset "Not allowed" begin - @test_throws AWSException CloudWatchLogStream(CFG, FORBIDDEN_LOG_GROUP, FORBIDDEN_GROUP_LOG_STREAM) + @test isempty(streams) || streams[1]["logStreamName"] != stream_name + end - stream = CloudWatchLogStream(CFG, BAD_STREAM_LOG_GROUP, FORBIDDEN_LOG_STREAM, nothing) - @test_throws AWSException submit_log(stream, LogEvent("Foo")) + @testset "Not allowed" begin + @test_throws AWSException create_stream(CFG, FORBIDDEN_LOG_GROUP) + @test_throws AWSException delete_stream( + CFG, FORBIDDEN_LOG_GROUP, FORBIDDEN_GROUP_LOG_STREAM + ) + end end - @testset "Too many logs" begin - start_time = CloudWatchLogs.unix_timestamp_ms() + @testset "CloudWatchLogStream" begin + @testset "Normal log submission" begin + start_time = CloudWatchLogs.unix_timestamp_ms() + stream_name = new_stream("stream_type") + @test create_stream(CFG, TEST_LOG_GROUP, stream_name) == stream_name - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, new_stream("too_many_logs")), - ) + stream = CloudWatchLogStream(CFG, TEST_LOG_GROUP, stream_name) + @test submit_log(stream, LogEvent("Hello AWS")) == 1 + @test submit_logs(stream, LogEvent.(["Second log", "Third log"])) == 2 - events = map(Iterators.take(Iterators.countfrom(start_time), 10001)) do ts - LogEvent("A", ts) - end + sleep(2) # wait until AWS has injested the logs; this may or may not be enough + response = CloudWatch_Logs.get_log_events( + TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) + ) - @test_throws LOGGER LogSubmissionException submit_logs(stream, events) - end + time_range = (start_time - 10):(CloudWatchLogs.unix_timestamp_ms() + 10) - @testset "Logs too big" begin - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, new_stream("logs_too_big")), - ) + @test length(response["events"]) == 3 + messages = map(response["events"]) do event + @test round(Int64, event["timestamp"]) in time_range + event["message"] + end - event_size = CloudWatchLogs.MAX_EVENT_SIZE - 26 - events = map(1:(div(CloudWatchLogs.MAX_BATCH_SIZE, event_size) + 1)) do i - LogEvent("A" ^ event_size) + @test messages == ["Hello AWS", "Second log", "Third log"] + delete_stream(CFG, TEST_LOG_GROUP, stream_name) end - @test_throws LOGGER LogSubmissionException submit_logs(stream, events) - end - @testset "Logs too spread" begin - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, new_stream("logs_too_spread")), - ) + @testset "Not allowed" begin + @test_throws AWSException CloudWatchLogStream( + CFG, FORBIDDEN_LOG_GROUP, FORBIDDEN_GROUP_LOG_STREAM + ) - last_time = Dates.now(tz"UTC") - first_time = last_time - Hour(25) + stream = CloudWatchLogStream( + CFG, BAD_STREAM_LOG_GROUP, FORBIDDEN_LOG_STREAM, nothing + ) + @test_throws AWSException submit_log(stream, LogEvent("Foo")) + end - events = [LogEvent("First", first_time), LogEvent("Last", last_time)] + @testset "Too many logs" begin + start_time = CloudWatchLogs.unix_timestamp_ms() - @test_throws LOGGER LogSubmissionException submit_logs(stream, events) - end + stream = CloudWatchLogStream( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, new_stream("too_many_logs")), + ) - @testset "Invalid sequence token" begin - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, new_stream("invalid_token")), - ) - - @test submit_log(stream, LogEvent("Foo")) == 1 - CloudWatchLogs.update_sequence_token!(stream, "oops_invalid") - setlevel!(LOGGER, "debug") do - @test_log LOGGER "debug" "InvalidSequenceTokenException" begin - submit_log(stream, LogEvent("Second time's the charm")) + events = map(Iterators.take(Iterators.countfrom(start_time), 10001)) do ts + LogEvent("A", ts) end + + @test_throws LOGGER LogSubmissionException submit_logs(stream, events) end - end - @testset "Unsorted logs" begin - stream_name = new_stream("unsorted") - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, stream_name), - ) - - current_time = Dates.now(tz"UTC") - events = [ - LogEvent("First hey", current_time), - LogEvent("Second hey", current_time + Second(1)), - LogEvent("Third hey", current_time - Second(1)), - ] - - setlevel!(LOGGER, "debug") do - @test_log LOGGER "debug" "sorted" begin - @test submit_logs(stream, events) == 3 + @testset "Logs too big" begin + stream = CloudWatchLogStream( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, new_stream("logs_too_big")), + ) + + event_size = CloudWatchLogs.MAX_EVENT_SIZE - 26 + events = map(1:(div(CloudWatchLogs.MAX_BATCH_SIZE, event_size) + 1)) do i + LogEvent("A"^event_size) end + @test_throws LOGGER LogSubmissionException submit_logs(stream, events) end - sleep(5) # wait until AWS has injested the logs; this may or may not be enough - response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) - ) - - @test length(response["events"]) == 3 - messages = [event["message"] for event in response["events"]] - @test messages == ["Third hey", "First hey", "Second hey"] - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - end + @testset "Logs too spread" begin + stream = CloudWatchLogStream( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, new_stream("logs_too_spread")), + ) - @testset "Rejected Logs" begin - stream_name = new_stream("out_of_bounds") - stream = CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, stream_name), - ) + last_time = Dates.now(tz"UTC") + first_time = last_time - Hour(25) - current_time = Dates.now(tz"UTC") + events = [LogEvent("First", first_time), LogEvent("Last", last_time)] - @test_warn LOGGER "retention policy" begin - @test submit_log(stream, LogEvent("Way old", current_time - Day(8))) == 0 + @test_throws LOGGER LogSubmissionException submit_logs(stream, events) end - @test_warn LOGGER "days old" begin - @test submit_log(stream, LogEvent("Too old", current_time - Day(15))) == 0 - end + @testset "Invalid sequence token" begin + stream = CloudWatchLogStream( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, new_stream("invalid_token")), + ) - @test_warn LOGGER "hours in the future" begin - @test submit_log(stream, LogEvent("Too new", current_time + Hour(3))) == 0 + @test submit_log(stream, LogEvent("Foo")) == 1 + CloudWatchLogs.update_sequence_token!(stream, "oops_invalid") + setlevel!(LOGGER, "debug") do + @test_log LOGGER "debug" "InvalidSequenceTokenException" begin + submit_log(stream, LogEvent("Second time's the charm")) + end + end end - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - end - - @testset "Stream not found" begin - @test_throws LOGGER StreamNotFoundException CloudWatchLogStream( - CFG, - TEST_LOG_GROUP, - "made-up-stream-that-doesnt-exist", - ) - end -end + @testset "Unsorted logs" begin + stream_name = new_stream("unsorted") + stream = CloudWatchLogStream( + CFG, TEST_LOG_GROUP, create_stream(CFG, TEST_LOG_GROUP, stream_name) + ) -@testset "CloudWatchLogHandler" begin - @testset "Normal logging" begin - start_time = CloudWatchLogs.unix_timestamp_ms() - stream_name = new_stream("handler_type") - - handler = setlevel!(LOGGER, "debug") do - @test_log LOGGER "debug" "initiated" begin - handler = CloudWatchLogHandler( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, stream_name), - DefaultFormatter("{level} | {msg}"), - ) - sleep(1) + current_time = Dates.now(tz"UTC") + events = [ + LogEvent("First hey", current_time), + LogEvent("Second hey", current_time + Second(1)), + LogEvent("Third hey", current_time - Second(1)), + ] + + setlevel!(LOGGER, "debug") do + @test_log LOGGER "debug" "sorted" begin + @test submit_logs(stream, events) == 3 + end end - handler - end - logger = Logger("CWLHLive.Normal"; propagate=false) - push!(logger, handler) - - info(logger, "First log") - warn(logger, "Second log") + sleep(5) # wait until AWS has injested the logs; this may or may not be enough + response = CloudWatch_Logs.get_log_events( + TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) + ) - sleep(1) # wait for the handler to submit the logs - @test !isready(handler.channel) + @test length(response["events"]) == 3 + messages = [event["message"] for event in response["events"]] + @test messages == ["Third hey", "First hey", "Second hey"] + delete_stream(CFG, TEST_LOG_GROUP, stream_name) + end - sleep(1) # wait until AWS has injested the logs; this may or may not be enough - response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) - ) + @testset "Rejected Logs" begin + stream_name = new_stream("out_of_bounds") + stream = CloudWatchLogStream( + CFG, TEST_LOG_GROUP, create_stream(CFG, TEST_LOG_GROUP, stream_name) + ) - time_range = (start_time - 10):(CloudWatchLogs.unix_timestamp_ms() + 10) + current_time = Dates.now(tz"UTC") - @test length(response["events"]) == 2 - messages = map(response["events"]) do event - @test round(Int64, event["timestamp"]) in time_range - event["message"] - end + @test_warn LOGGER "retention policy" begin + @test submit_log(stream, LogEvent("Way old", current_time - Day(8))) == 0 + end - @test messages == ["info | First log", "warn | Second log"] + @test_warn LOGGER "days old" begin + @test submit_log(stream, LogEvent("Too old", current_time - Day(15))) == 0 + end - setlevel!(LOGGER, "debug") do - # should cause the task to terminate with a debug message - @test_log LOGGER "debug" "terminated normally" begin - close(handler.channel) - sleep(1) + @test_warn LOGGER "hours in the future" begin + @test submit_log(stream, LogEvent("Too new", current_time + Hour(3))) == 0 end + + delete_stream(CFG, TEST_LOG_GROUP, stream_name) end - delete_stream(CFG, TEST_LOG_GROUP, stream_name) + @testset "Stream not found" begin + @test_throws LOGGER StreamNotFoundException CloudWatchLogStream( + CFG, TEST_LOG_GROUP, "made-up-stream-that-doesnt-exist" + ) + end end - @testset "Big logs" begin - stream_name = new_stream("handler_big_logs") - handler = CloudWatchLogHandler( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, stream_name), - DefaultFormatter("{msg}"), - ) - logger = Logger("CWLHLive.Big"; propagate=false) - push!(logger, handler) - - log_size = CloudWatchLogs.MAX_EVENT_SIZE - 26 - - num_events = 26 - for (c, i) in zip('A':'Z', 1:num_events) - info(logger, "$c" ^ log_size) - end + @testset "CloudWatchLogHandler" begin + @testset "Normal logging" begin + start_time = CloudWatchLogs.unix_timestamp_ms() + stream_name = new_stream("handler_type") + + handler = setlevel!(LOGGER, "debug") do + @test_log LOGGER "debug" "initiated" begin + handler = CloudWatchLogHandler( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, stream_name), + DefaultFormatter("{level} | {msg}"), + ) + sleep(1) + end + + handler + end + logger = Logger("CWLHLive.Normal"; propagate=false) + push!(logger, handler) - # wait for the logs to be submitted and for AWS to injest them - sleep(10) - response = CloudWatch_Logs.get_log_events(TEST_LOG_GROUP, stream_name) - prev_token = "" - num_events_injested = 0 - while prev_token != response["nextBackwardToken"] - prev_token = response["nextBackwardToken"] - @test length(response["events"]) <= 4 - num_events_injested += length(response["events"]) + info(logger, "First log") + warn(logger, "Second log") + sleep(1) # wait for the handler to submit the logs + @test !isready(handler.channel) + + sleep(1) # wait until AWS has injested the logs; this may or may not be enough response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("nextToken" => prev_token) + TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true) ) - end - @test num_events_injested == num_events - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - end - @testset "So many logs" begin - start_time = CloudWatchLogs.unix_timestamp_ms() - stream_name = new_stream("handler_so_many_logs") - handler = CloudWatchLogHandler( - CFG, - TEST_LOG_GROUP, - create_stream(CFG, TEST_LOG_GROUP, stream_name), - DefaultFormatter("{msg}"), - ) - logger = Logger("CWLHLive.SoMany"; propagate=false) - push!(logger, handler) - - # not sure if this will actually go over the batch limit but we'll try - max_num = CloudWatchLogs.MAX_BATCH_LENGTH * 2 - for i = 1:max_num - info(logger, "$i") + time_range = (start_time - 10):(CloudWatchLogs.unix_timestamp_ms() + 10) + + @test length(response["events"]) == 2 + messages = map(response["events"]) do event + @test round(Int64, event["timestamp"]) in time_range + event["message"] + end + + @test messages == ["info | First log", "warn | Second log"] + + setlevel!(LOGGER, "debug") do + # should cause the task to terminate with a debug message + @test_log LOGGER "debug" "terminated normally" begin + close(handler.channel) + sleep(1) + end + end + + delete_stream(CFG, TEST_LOG_GROUP, stream_name) end - # wait for the logs to be submitted - for delay in CloudWatchLogs.PUTLOGEVENTS_DELAYS - isready(handler.channel) || break - sleep(delay) + @testset "Big logs" begin + stream_name = new_stream("handler_big_logs") + handler = CloudWatchLogHandler( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, stream_name), + DefaultFormatter("{msg}"), + ) + logger = Logger("CWLHLive.Big"; propagate=false) + push!(logger, handler) + + log_size = CloudWatchLogs.MAX_EVENT_SIZE - 26 + + num_events = 26 + for (c, i) in zip('A':'Z', 1:num_events) + info(logger, "$c"^log_size) + end + + # wait for the logs to be submitted and for AWS to injest them + sleep(10) + response = CloudWatch_Logs.get_log_events(TEST_LOG_GROUP, stream_name) + prev_token = "" + num_events_injested = 0 + while prev_token != response["nextBackwardToken"] + prev_token = response["nextBackwardToken"] + @test length(response["events"]) <= 4 + num_events_injested += length(response["events"]) + + response = CloudWatch_Logs.get_log_events( + TEST_LOG_GROUP, stream_name, Dict("nextToken" => prev_token) + ) + end + @test num_events_injested == num_events + delete_stream(CFG, TEST_LOG_GROUP, stream_name) end - sleep(1) # wait until AWS has injested the logs; this may or may not be enough - response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true, "limit" => 1) - ) + @testset "So many logs" begin + start_time = CloudWatchLogs.unix_timestamp_ms() + stream_name = new_stream("handler_so_many_logs") + handler = CloudWatchLogHandler( + CFG, + TEST_LOG_GROUP, + create_stream(CFG, TEST_LOG_GROUP, stream_name), + DefaultFormatter("{msg}"), + ) + logger = Logger("CWLHLive.SoMany"; propagate=false) + push!(logger, handler) - @test length(response["events"]) == 1 - event = response["events"][1] - @test event["message"] == "1" + # not sure if this will actually go over the batch limit but we'll try + max_num = CloudWatchLogs.MAX_BATCH_LENGTH * 2 + for i in 1:max_num + info(logger, "$i") + end - sleep(5) # wait until AWS has injested the logs; this may or may not be enough - response = CloudWatch_Logs.get_log_events( - TEST_LOG_GROUP, stream_name, Dict("startFromHead" => false, "limit" => 1) - ) + # wait for the logs to be submitted + for delay in CloudWatchLogs.PUTLOGEVENTS_DELAYS + isready(handler.channel) || break + sleep(delay) + end - @test length(response["events"]) == 1 - event = response["events"][1] - @test event["message"] == "$max_num" + sleep(1) # wait until AWS has injested the logs; this may or may not be enough + response = CloudWatch_Logs.get_log_events( + TEST_LOG_GROUP, stream_name, Dict("startFromHead" => true, "limit" => 1) + ) - delete_stream(CFG, TEST_LOG_GROUP, stream_name) - end -end + @test length(response["events"]) == 1 + event = response["events"][1] + @test event["message"] == "1" + sleep(5) # wait until AWS has injested the logs; this may or may not be enough + response = CloudWatch_Logs.get_log_events( + TEST_LOG_GROUP, stream_name, Dict("startFromHead" => false, "limit" => 1) + ) + + @test length(response["events"]) == 1 + event = response["events"][1] + @test event["message"] == "$max_num" + + delete_stream(CFG, TEST_LOG_GROUP, stream_name) + end + end end diff --git a/test/runtests.jl b/test/runtests.jl index d84ef96..63a1182 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -23,18 +23,13 @@ const LOGGER = getlogger(CloudWatchLogs) @service STS function assume_role(config::AWSConfig, role_arn::AbstractString, params::AbstractDict) - response = STS.assume_role( - role_arn, - session_name(), - params; - aws_config=config - ) + response = STS.assume_role(role_arn, session_name(), params; aws_config=config) response = response["AssumeRoleResult"] response_creds = response["Credentials"] response_user = response["AssumedRoleUser"] - AWSCredentials( + return AWSCredentials( response_creds["AccessKeyId"], response_creds["SecretAccessKey"], response_creds["SessionToken"], @@ -53,18 +48,17 @@ function session_name() max_name_length = 64 - length(ts) - 1 if length(name) > max_name_length - name = name[1:max_name_length-3] * "..." + name = name[1:(max_name_length - 3)] * "..." end return "$name-$ts" end function stack_output(config::AWSConfig, stack_name::AbstractString) - outputs = Dict{String, String}() + outputs = Dict{String,String}() response = CloudFormation.describe_stacks( - Dict("StackName" => stack_name); - aws_config=config + Dict("StackName" => stack_name); aws_config=config ) response = response["DescribeStacksResult"]["Stacks"]["member"]["Outputs"]["member"] @@ -81,7 +75,6 @@ function stack_output(config::AWSConfig, stack_name::AbstractString) return outputs end - @testset "CloudWatchLogs.jl" begin include("event.jl") include("mocked_aws.jl")