Skip to content

Commit

Permalink
add default mime type encode json
Browse files Browse the repository at this point in the history
  • Loading branch information
raghuramg committed Jan 17, 2025
1 parent ab7531f commit 5ad8bba
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 38 deletions.
16 changes: 10 additions & 6 deletions lib/event_source/operations/mime_decode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MimeDecode
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, decoding failure)
def call(mime_type, payload)
valid_payload = yield validate_payload(payload, mime_type)
valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s)
decoded_data = yield decode(valid_payload, mime_type)

Success(decoded_data)
Expand All @@ -38,15 +38,15 @@ def call(mime_type, payload)
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload(payload, mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
unless MIME_TYPES.include?(mime_type)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

if mime_type.to_s == 'application/zlib' && !binary_payload?(payload)
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.")
if mime_type == 'application/zlib'
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload)
end

Success(payload)
Success([payload, mime_type])
end

# Decodes the payload using the specified MIME type.
Expand All @@ -58,7 +58,7 @@ def validate_payload(payload, mime_type)
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if decoding fails
def decode(payload, mime_type)
decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib'
decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload)

Success(decoded_data || payload)
rescue Zlib::Error => e
Expand All @@ -75,6 +75,10 @@ def binary_payload?(payload)

payload.encoding == Encoding::BINARY
end

def valid_json_string?(data)
data.is_a?(String) && JSON.parse(data) rescue false
end
end
end
end
50 changes: 29 additions & 21 deletions lib/event_source/operations/mime_encode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,68 @@ class MimeEncode
# For example, compresses the payload using Zlib for 'application/zlib'.
#
# @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json')
# @param payload [String, Hash] the payload to encode; must be a Hash or String
# @param payload [Any] the payload to encode;
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure)
def call(mime_type, payload)
json_payload = yield validate_payload(payload, mime_type)
encoded_data = yield encode(json_payload, mime_type)
mime_type = yield validate(mime_type)
encoded_data = yield encode(mime_type, payload)

Success(encoded_data)
end

private

# Validates the payload and MIME type before encoding.
# Ensures the MIME type is supported and the payload is either a Hash or a String.
# Validates theMIME type before encoding.
# Ensures the MIME type is supported
#
# @param payload [String, Hash] the payload to validate
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if the payload and MIME type are valid
# @return [Dry::Monads::Failure<String>] if the MIME type is unsupported or the payload is invalid
def validate_payload(payload, mime_type)
def validate(mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

unless payload.is_a?(Hash) || payload.is_a?(String)
return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.")
end

Success(payload.is_a?(Hash) ? payload.to_json : payload)
Success(mime_type.to_s)
end

# Encodes the payload based on the MIME type.
# For 'application/zlib', compresses the payload using Zlib.
# Logs the original and encoded payload sizes for debugging.
#
# @param json_payload [String] the JSON stringified payload to encode
# @param data [String] the JSON stringified payload to encode
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if encoding fails
def encode(json_payload, mime_type)
encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib'
def encode(mime_type, payload)
case mime_type
when 'application/zlib'
encoded_data = Zlib.deflate(payload.to_json)
when 'application/json'
encoded_data = payload.to_json
end
log_encoding_details(mime_type, payload, encoded_data) if encoded_data

Success(encoded_data || payload)
rescue JSON::GeneratorError => e
Failure("Failed to encode payload to JSON: #{e.message}")
rescue Zlib::Error => e
Failure("Failed to compress payload using Zlib: #{e.message}")
rescue StandardError => e
Failure("Unexpected error during encoding: #{e.message}")
end

# Logs details of the encoding process.
def log_encoding_details(mime_type, payload, encoded_data)
logger.debug "*" * 80
logger.debug "Starting payload encoding for MIME type: '#{mime_type}'"
logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB"
logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data
logger.debug "Original payload size: #{data_size_in_kb(payload)} KB"
logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB"
logger.debug "*" * 80

Success(encoded_data || json_payload)
rescue Zlib::Error => e
Failure("Failed to compress payload using Zlib: #{e.message}")
end

# Calculates the size of the data in kilobytes (KB).
Expand Down
9 changes: 2 additions & 7 deletions lib/event_source/protocols/amqp/bunny_exchange_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class BunnyExchangeProxy
# @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange
attr_reader :subject, :channel_proxy

DefaultMimeType = 'application/json'.freeze

# @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange
# @param [Hash<EventSource::AsyncApi::Exchange>] exchange_bindings instance with configuration for this Exchange
def initialize(channel_proxy, exchange_bindings)
Expand Down Expand Up @@ -50,7 +52,6 @@ def publish(payload:, publish_bindings:, headers: {})

logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}"

payload = payload.to_json unless is_binary?(payload)
@subject.publish(payload, bunny_publish_bindings)

logger.debug "BunnyExchange#publish published message: #{payload}"
Expand All @@ -70,12 +71,6 @@ def message_id
SecureRandom.uuid
end

def is_binary?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end

# Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ
# bindings
#
Expand Down
16 changes: 12 additions & 4 deletions lib/event_source/publish_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def call(payload, options = {})
# @return [String] The encoded payload, or the original payload if no encoding is specified.
# @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails.
def encode_payload(payload)
return payload unless @async_api_publish_operation.message

message_bindings = @async_api_publish_operation.message['bindings']
encoding = message_bindings.first[1]['contentEncoding'] if message_bindings
encoding = determine_encoding
return payload unless encoding

output = EventSource::Operations::MimeEncode.new.call(encoding, payload)
Expand All @@ -63,5 +60,16 @@ def encode_payload(payload)
raise EventSource::Error::PayloadEncodeError, output.failure
end
end

def determine_encoding
message_bindings = @async_api_publish_operation.message&.dig('bindings')
return message_bindings.first[1]['contentEncoding'] if message_bindings.present?

amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil
end

def amqp_protocol?
subject.is_a?(EventSource::Protocols::Amqp::BunnyPublisherProxy)
end
end
end

0 comments on commit 5ad8bba

Please sign in to comment.