Skip to content

Commit

Permalink
add support for amqp payload compress/decompress
Browse files Browse the repository at this point in the history
  • Loading branch information
raghuramg committed Dec 18, 2024
1 parent 6d3acdb commit b5e6db9
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 2 deletions.
1 change: 1 addition & 0 deletions event_source.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'oj', '~> 3.11'
spec.add_dependency 'ox', '~> 2.14'
spec.add_dependency 'typhoeus', '~> 1.4.0'
spec.add_dependency 'zlib', '~> 3.2.1'

# TODO: Change to development dependency
spec.add_development_dependency 'database_cleaner'
Expand Down
1 change: 1 addition & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
require 'event_source/event'
require 'event_source/subscriber'
require 'event_source/operations/codec64'
require 'event_source/operations/payload_codec'
require 'event_source/operations/create_message'
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
Expand Down
1 change: 1 addition & 0 deletions lib/event_source/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ class Error < StandardError
ServerConfigurationNotFound = Class.new(Error)
ServerConfigurationInvalid = Class.new(Error)
MessageBuildError = Class.new(Error)
PayloadDecompressionError = Class.new(Error)
end
end
2 changes: 1 addition & 1 deletion lib/event_source/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def initialize(options = {})
send(:headers=, options[:headers] || {})

@publisher_path = klass_var_for(:publisher_path) || nil
build_message(options) if headers.delete(:build_message)
build_message(options) if headers.delete(:build_message) || headers[:compress]

if @publisher_path.eql?(nil)
raise EventSource::Error::PublisherKeyMissing,
Expand Down
14 changes: 13 additions & 1 deletion lib/event_source/operations/build_message_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "dry/monads"
require "dry/monads/do"
require "securerandom"
require 'zlib'

module EventSource
module Operations
Expand All @@ -13,7 +14,7 @@ class BuildMessageOptions
def call(params)
headers = yield build_headers(params)
payload = yield build_payload(params)
headers = yield append_account_details(headers)
headers = yield append_account_details(headers) if params[:headers].delete(:include_session)

Success(headers: headers, payload: payload)
end
Expand All @@ -33,6 +34,16 @@ def build_headers(params)
def build_payload(params)
payload = params[:payload]&.symbolize_keys || {}

if params[:headers][:compress]
output = PayloadCodec.new(payload: payload).compress

if output.success?
payload = output.value!
else
return Failure(output.failure)
end
end

Success(payload)
end

Expand All @@ -56,3 +67,4 @@ def append_account_details(headers)
end
end
end

71 changes: 71 additions & 0 deletions lib/event_source/operations/payload_codec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

require 'dry/monads'
require 'dry/monads/do'
require 'dry-initializer'
# require 'zlib'
# require 'base64'
module EventSource
module Operations
# A class for handling payload compression and decompression using Dry-rb.
class PayloadCodec
include Dry::Monads[:result] # For Success/Failure monads
include Dry::Monads::Do # For do notation
extend Dry::Initializer

option :payload, reader: :private

# Compresses the payload into a Base64-encoded compressed string
#
# @return [Dry::Monads::Success<String>] if compression is successful
# @return [Dry::Monads::Failure<String>] if an error occurs
def compress
json_payload = yield validate_payload_for_compression
compressed_data = Zlib.deflate(json_payload)
encoded_data = Base64.encode64(compressed_data)

Success(encoded_data)
rescue StandardError => e
Failure("Compression failed: #{e.message}")
end

# Decompresses a Base64-encoded compressed payload back to its original form
#
# @return [Dry::Monads::Success<String>] if decompression is successful
# @return [Dry::Monads::Failure<String>] if an error occurs
def decompress
decoded_data = yield validate_payload_for_decompression
decompressed_data = Zlib.inflate(decoded_data)

Success(decompressed_data)
rescue StandardError => e
Failure("Decompression failed: #{e.message}")
end

private

# Validates the payload before compression
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload_for_compression
return Failure('Payload must be a Hash or String') unless payload.is_a?(Hash) || payload.is_a?(String)

Success(payload.is_a?(Hash) ? payload.to_json : payload)
end

# Validates the payload before decompression
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload_for_decompression
return Failure('Payload must be a Base64-encoded String') unless payload.is_a?(String)

decoded_data = Base64.decode64(payload)
Success(decoded_data)
rescue StandardError
Failure('Invalid Base64 string for decompression')
end
end
end
end
14 changes: 14 additions & 0 deletions lib/event_source/protocols/amqp/bunny_consumer_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ def initialize(
@executable = executable
end

def decompress_payload_if_required
if @metadata[:headers]["compress"]
output = EventSource::Operations::PayloadCodec.new(payload: payload).decompress

if output.success?
@payload = output.value!
else
logger.error "Failed to decompress message \n due to: #{output.failure}"
raise EventSource::Error::PayloadDecompressionError, output.failure
end
end
end

def run
decompress_payload_if_required
subscriber.instance_exec(
@delivery_info,
@metadata,
Expand Down

0 comments on commit b5e6db9

Please sign in to comment.