Skip to content

Commit

Permalink
Merge pull request #36 from invenia/MB/format
Browse files Browse the repository at this point in the history
BlueStyle Formatting
  • Loading branch information
mattBrzezinski authored Aug 24, 2021
2 parents 497417a + 8e4a170 commit 1819fca
Show file tree
Hide file tree
Showing 10 changed files with 605 additions and 591 deletions.
16 changes: 5 additions & 11 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ using Documenter, CloudWatchLogs

makedocs(;
modules=[CloudWatchLogs],
format=Documenter.HTML(prettyurls=(get(ENV, "CI", nothing) == "true")),
format=Documenter.HTML(; prettyurls=(get(ENV, "CI", nothing) == "true")),
pages=[
"Home" => "index.md",
"API" => "pages/api.md",
Expand All @@ -11,15 +11,9 @@ makedocs(;
repo="https://github.com/invenia/CloudWatchLogs.jl/blob/{commit}{path}#L{line}",
sitename="CloudWatchLogs.jl",
authors="Invenia Technical Computing Corporation",
assets=[
"assets/invenia.css",
"assets/logo.png",
],
strict = true,
checkdocs = :exports,
assets=["assets/invenia.css", "assets/logo.png"],
strict=true,
checkdocs=:exports,
)

deploydocs(;
repo="github.com/invenia/CloudWatchLogs.jl",
target="build",
)
deploydocs(; repo="github.com/invenia/CloudWatchLogs.jl", target="build")
7 changes: 4 additions & 3 deletions src/CloudWatchLogs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ const MAX_BATCH_LENGTH = 10000

# 5 requests per second per log stream. This limit cannot be changed.
const PUTLOGEVENTS_RATE_LIMIT = 0.2
const PUTLOGEVENTS_DELAYS =
ExponentialBackOff(n=10, first_delay=PUTLOGEVENTS_RATE_LIMIT, factor=1.1)
const PUTLOGEVENTS_DELAYS = ExponentialBackOff(;
n=10, first_delay=PUTLOGEVENTS_RATE_LIMIT, factor=1.1
)

const GENERIC_AWS_DELAYS = ExponentialBackOff(n=10, first_delay=0.2, factor=2, jitter=0.2)
const GENERIC_AWS_DELAYS = ExponentialBackOff(; n=10, first_delay=0.2, factor=2, jitter=0.2)

__init__() = Memento.register(LOGGER)

Expand Down
9 changes: 6 additions & 3 deletions src/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ struct LogEvent
idx = MAX_EVENT_SIZE - 29
# Truncated messages include a "..."
message = string(message[1:idx], "...")
warn(LOGGER, "CloudWatch Log Event message cannot be more than $MAX_EVENT_SIZE bytes")
warn(
LOGGER,
"CloudWatch Log Event message cannot be more than $MAX_EVENT_SIZE bytes",
)
end

if timestamp < 0
throw(ArgumentError("Log Event timestamp must be non-negative"))
end

new(message, timestamp)
return new(message, timestamp)
end
end

function LogEvent(message::AbstractString, dt::Union{DateTime, ZonedDateTime})
function LogEvent(message::AbstractString, dt::Union{DateTime,ZonedDateTime})
return LogEvent(message, unix_timestamp_ms(dt))
end

Expand Down
2 changes: 1 addition & 1 deletion src/exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ abstract type CloudWatchLogsException <: Exception end
struct StreamNotFoundException <: CloudWatchLogsException
stream::String
group::String
msg::Union{String, Nothing}
msg::Union{String,Nothing}
end

function Base.showerror(io::IO, exception::StreamNotFoundException)
Expand Down
19 changes: 10 additions & 9 deletions src/handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ function CloudWatchLogHandler(
log_group_name::AbstractString,
log_stream_name::AbstractString,
formatter::F=DefaultFormatter(),
) where F<:Formatter
) where {F<:Formatter}
ch = Channel{LogEvent}(Inf)
handler = CloudWatchLogHandler(
CloudWatchLogStream(config, log_group_name, log_stream_name),
ch,
formatter,
CloudWatchLogStream(config, log_group_name, log_stream_name), ch, formatter
)

tsk = @async process_logs!(handler)
Expand All @@ -62,10 +60,13 @@ function process_available_logs!(handler::CloudWatchLogHandler)
end

if isempty(events)
warn(LOGGER, string(
"Channel was ready but no events were found. ",
"Is there another task pulling logs from this handler?",
))
warn(
LOGGER,
string(
"Channel was ready but no events were found. ",
"Is there another task pulling logs from this handler?",
),
)
end

try
Expand Down Expand Up @@ -113,5 +114,5 @@ function Memento.emit(handler::CloudWatchLogHandler, record::Record)
dt = isdefined(record, :date) ? record.date : Dates.now(tz"UTC")
message = format(handler.fmt, record)
event = LogEvent(message, dt)
put!(handler.channel, event)
return put!(handler.channel, event)
end
104 changes: 52 additions & 52 deletions src/stream.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
const RETRYABLE_CODES = (
"IncompleteSignature",
"ThrottlingException",
"RequestExpired",
)
const RETRYABLE_CODES = ("IncompleteSignature", "ThrottlingException", "RequestExpired")

function aws_retry_cond(s, e)
if e isa AWSException && (500 <= e.cause.status <= 504 || e.code in RETRYABLE_CODES)
Expand All @@ -16,13 +12,13 @@ function aws_retry_cond(s, e)
return (s, false)
end

aws_retry(f) = retry(f, delays=GENERIC_AWS_DELAYS, check=aws_retry_cond)()
aws_retry(f) = retry(f; delays=GENERIC_AWS_DELAYS, check=aws_retry_cond)()

struct CloudWatchLogStream
config::AWSConfig
log_group_name::String
log_stream_name::String
token::Ref{Union{String, Nothing}}
token::Ref{Union{String,Nothing}}
end

"""
Expand All @@ -34,15 +30,10 @@ This constructor will automatically fetch the latest [sequence token](https://do
for the stream.
"""
function CloudWatchLogStream(
config::AWSConfig,
log_group_name::AbstractString,
log_stream_name::AbstractString,
config::AWSConfig, log_group_name::AbstractString, log_stream_name::AbstractString
)
stream = CloudWatchLogStream(
config,
log_group_name,
log_stream_name,
Ref{Union{String, Nothing}}(),
config, log_group_name, log_stream_name, Ref{Union{String,Nothing}}()
)
update_sequence_token!(stream)
return stream
Expand All @@ -61,16 +52,18 @@ function create_group(
config::AWSConfig,
# this probably won't collide, most callers should add identifying information though
log_group_name::AbstractString="julia-$(uuid4())";
tags::AbstractDict{<:AbstractString, <:AbstractString}=Dict{String, String}(),
tags::AbstractDict{<:AbstractString,<:AbstractString}=Dict{String,String}(),
)
if isempty(tags)
aws_retry() do
CloudWatch_Logs.create_log_group(log_group_name; aws_config=config)
end
else
tags = Dict{String, String}(tags)
tags = Dict{String,String}(tags)
aws_retry() do
CloudWatch_Logs.create_log_group(log_group_name, Dict("tags"=>tags); aws_config=config)
CloudWatch_Logs.create_log_group(
log_group_name, Dict("tags" => tags); aws_config=config
)
end
end
return String(log_group_name)
Expand All @@ -81,10 +74,7 @@ end
Delete a CloudWatch Log Group.
"""
function delete_group(
config::AWSConfig,
log_group_name::AbstractString,
)
function delete_group(config::AWSConfig, log_group_name::AbstractString)
aws_retry() do
CloudWatch_Logs.delete_log_group(log_group_name; aws_config=config)
end
Expand All @@ -107,7 +97,9 @@ function create_stream(
log_stream_name::AbstractString="julia-$(uuid4())",
)
aws_retry() do
CloudWatch_Logs.create_log_stream(log_group_name, log_stream_name; aws_config=config)
CloudWatch_Logs.create_log_stream(
log_group_name, log_stream_name; aws_config=config
)
end
return String(log_stream_name)
end
Expand All @@ -118,12 +110,12 @@ end
Delete a CloudWatch Log Stream from a given Log Group.
"""
function delete_stream(
config::AWSConfig,
log_group_name::AbstractString,
log_stream_name::AbstractString,
config::AWSConfig, log_group_name::AbstractString, log_stream_name::AbstractString
)
aws_retry() do
CloudWatch_Logs.delete_log_stream(log_group_name, log_stream_name; aws_config=config)
CloudWatch_Logs.delete_log_stream(
log_group_name, log_stream_name; aws_config=config
)
end
return nothing
end
Expand All @@ -139,7 +131,11 @@ function new_sequence_token(stream::CloudWatchLogStream)
return new_sequence_token(stream.config, stream.log_group_name, stream.log_stream_name)
end

describe_log_streams(config::AWSConfig, log_group_name::AbstractString, params::AbstractDict) = CloudWatch_Logs.describe_log_streams(log_group_name, params; aws_config=config)
function describe_log_streams(
config::AWSConfig, log_group_name::AbstractString, params::AbstractDict
)
return CloudWatch_Logs.describe_log_streams(log_group_name, params; aws_config=config)
end

"""
new_sequence_token(stream::CloudWatchLogStream) -> Union{String, Nothing}
Expand All @@ -151,10 +147,8 @@ Returns `nothing` if the stream does not have a sequence token yet (e.g., if no
been logged).
"""
function new_sequence_token(
config::AWSConfig,
log_group::AbstractString,
log_stream::AbstractString,
)::Union{String, Nothing}
config::AWSConfig, log_group::AbstractString, log_stream::AbstractString
)::Union{String,Nothing}
response = aws_retry() do
@mock describe_log_streams(
config,
Expand Down Expand Up @@ -196,19 +190,18 @@ Alternatively, set the token for the stream to `token`.
Returns the token.
"""
function update_sequence_token!(
stream::CloudWatchLogStream,
token::Union{String, Nothing}=new_sequence_token(stream),
stream::CloudWatchLogStream, token::Union{String,Nothing}=new_sequence_token(stream)
)
stream.token[] = token
return stream.token[] = token
end

function _put_log_events(stream::CloudWatchLogStream, events::AbstractVector{LogEvent})
CloudWatch_Logs.put_log_events(
return CloudWatch_Logs.put_log_events(
events,
stream.log_group_name,
stream.log_stream_name,
Dict("sequenceToken" => sequence_token(stream));
aws_config=stream.config
aws_config=stream.config,
)
end

Expand All @@ -232,33 +225,40 @@ Returns the number of events successfully submitted.
"""
function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEvent})
if length(events) > MAX_BATCH_LENGTH
error(LOGGER, LogSubmissionException(
"Log batch length exceeded 10000 events; submit fewer log events at once"
))
error(
LOGGER,
LogSubmissionException(
"Log batch length exceeded 10000 events; submit fewer log events at once"
),
)
end

batch_size = sum(aws_size, events)

if batch_size > MAX_BATCH_SIZE
error(LOGGER, LogSubmissionException(
"Log batch size exceeded 1 MiB; submit fewer log events at once"
))
error(
LOGGER,
LogSubmissionException(
"Log batch size exceeded 1 MiB; submit fewer log events at once"
),
)
end

if !issorted(events; by=timestamp)
debug(LOGGER,
"Log submission will be faster if log events are sorted by timestamp"
)
debug(LOGGER, "Log submission will be faster if log events are sorted by timestamp")

# a stable sort to avoid putting related messages out of order
sort!(events; alg=MergeSort, by=timestamp)
end

min_timestamp, max_timestamp = extrema(timestamp(e) for e in events)
if max_timestamp - min_timestamp > 24 * 3600 * 1000 # 24 hours in milliseconds
error(LOGGER, LogSubmissionException(
"Log events cannot span more than 24 hours; submit log events separately"
))
error(
LOGGER,
LogSubmissionException(
"Log events cannot span more than 24 hours; submit log events separately"
),
)
end

function retry_cond(s, e)
Expand Down Expand Up @@ -286,7 +286,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven
return (s, false)
end

f = retry(delays=PUTLOGEVENTS_DELAYS, check=retry_cond) do
f = retry(; delays=PUTLOGEVENTS_DELAYS, check=retry_cond) do
@mock CloudWatchLogs._put_log_events(stream, events)
end

Expand All @@ -310,7 +310,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven
string(
"Cannot log the following events, ",
"as they are older than the log retention policy allows: ",
events[1:idx-1],
events[1:(idx - 1)],
)
end
end
Expand All @@ -323,7 +323,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven
string(
"Cannot log the following events, ",
"as they are more than 14 days old: ",
events[1:idx-1],
events[1:(idx - 1)],
)
end
end
Expand All @@ -336,7 +336,7 @@ function submit_logs(stream::CloudWatchLogStream, events::AbstractVector{LogEven
string(
"Cannot log the following events, ",
"as they are newer than 2 hours in the future: ",
events[idx+1:end],
events[(idx + 1):end],
)
end
end
Expand Down
Loading

2 comments on commit 1819fca

@mattBrzezinski
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/43482

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v2.0.0 -m "<description of version>" 1819fca42e4e6c4160566f20eb320ae7a9500a27
git push origin v2.0.0

Please sign in to comment.