diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb index f147182..b5726d7 100644 --- a/lib/event_source/operations/mime_decode.rb +++ b/lib/event_source/operations/mime_decode.rb @@ -21,7 +21,7 @@ class MimeDecode # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, decoding failure) def call(mime_type, payload) - valid_payload = yield validate_payload(payload, mime_type) + valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s) decoded_data = yield decode(valid_payload, mime_type) Success(decoded_data) @@ -38,15 +38,15 @@ def call(mime_type, payload) # @return [Dry::Monads::Success] if the payload is valid # @return [Dry::Monads::Failure] if the payload is invalid def validate_payload(payload, mime_type) - unless MIME_TYPES.include?(mime_type.to_s) + unless MIME_TYPES.include?(mime_type) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - if mime_type.to_s == 'application/zlib' && !binary_payload?(payload) - return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") + if mime_type == 'application/zlib' + return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload) end - Success(payload) + Success([payload, mime_type]) end # Decodes the payload using the specified MIME type. @@ -58,7 +58,7 @@ def validate_payload(payload, mime_type) # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if decoding fails def decode(payload, mime_type) - decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib' + decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload) Success(decoded_data || payload) rescue Zlib::Error => e @@ -75,6 +75,10 @@ def binary_payload?(payload) payload.encoding == Encoding::BINARY end + + def valid_json_string?(data) + data.is_a?(String) && JSON.parse(data) rescue false + end end end end diff --git a/lib/event_source/operations/mime_encode.rb b/lib/event_source/operations/mime_encode.rb index c5b6f8e..689b7cf 100644 --- a/lib/event_source/operations/mime_encode.rb +++ b/lib/event_source/operations/mime_encode.rb @@ -18,60 +18,68 @@ class MimeEncode # For example, compresses the payload using Zlib for 'application/zlib'. # # @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json') - # @param payload [String, Hash] the payload to encode; must be a Hash or String + # @param payload [Any] the payload to encode; # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure) def call(mime_type, payload) - json_payload = yield validate_payload(payload, mime_type) - encoded_data = yield encode(json_payload, mime_type) + mime_type = yield validate(mime_type) + encoded_data = yield encode(mime_type, payload) Success(encoded_data) end private - # Validates the payload and MIME type before encoding. - # Ensures the MIME type is supported and the payload is either a Hash or a String. + # Validates theMIME type before encoding. + # Ensures the MIME type is supported # - # @param payload [String, Hash] the payload to validate # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if the payload and MIME type are valid # @return [Dry::Monads::Failure] if the MIME type is unsupported or the payload is invalid - def validate_payload(payload, mime_type) + def validate(mime_type) unless MIME_TYPES.include?(mime_type.to_s) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - unless payload.is_a?(Hash) || payload.is_a?(String) - return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.") - end - - Success(payload.is_a?(Hash) ? payload.to_json : payload) + Success(mime_type.to_s) end # Encodes the payload based on the MIME type. # For 'application/zlib', compresses the payload using Zlib. # Logs the original and encoded payload sizes for debugging. # - # @param json_payload [String] the JSON stringified payload to encode + # @param data [String] the JSON stringified payload to encode # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if encoding fails - def encode(json_payload, mime_type) - encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib' + def encode(mime_type, payload) + case mime_type + when 'application/zlib' + encoded_data = Zlib.deflate(payload.to_json) + when 'application/json' + encoded_data = payload.to_json + end + log_encoding_details(mime_type, payload, encoded_data) if encoded_data + + Success(encoded_data || payload) + rescue JSON::GeneratorError => e + Failure("Failed to encode payload to JSON: #{e.message}") + rescue Zlib::Error => e + Failure("Failed to compress payload using Zlib: #{e.message}") + rescue StandardError => e + Failure("Unexpected error during encoding: #{e.message}") + end + # Logs details of the encoding process. + def log_encoding_details(mime_type, payload, encoded_data) logger.debug "*" * 80 logger.debug "Starting payload encoding for MIME type: '#{mime_type}'" - logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB" - logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data + logger.debug "Original payload size: #{data_size_in_kb(payload)} KB" + logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" logger.debug "*" * 80 - - Success(encoded_data || json_payload) - rescue Zlib::Error => e - Failure("Failed to compress payload using Zlib: #{e.message}") end # Calculates the size of the data in kilobytes (KB). diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 4dd1b2c..a73e6d3 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -15,6 +15,8 @@ class BunnyExchangeProxy # @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange attr_reader :subject, :channel_proxy + DefaultMimeType = 'application/json'.freeze + # @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange # @param [Hash] exchange_bindings instance with configuration for this Exchange def initialize(channel_proxy, exchange_bindings) @@ -50,7 +52,6 @@ def publish(payload:, publish_bindings:, headers: {}) logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" - payload = payload.to_json unless is_binary?(payload) @subject.publish(payload, bunny_publish_bindings) logger.debug "BunnyExchange#publish published message: #{payload}" @@ -70,12 +71,6 @@ def message_id SecureRandom.uuid end - def is_binary?(payload) - return false unless payload.respond_to?(:encoding) - - payload.encoding == Encoding::BINARY - end - # Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ # bindings # diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index ddb7fb9..5aa341c 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -49,10 +49,7 @@ def call(payload, options = {}) # @return [String] The encoded payload, or the original payload if no encoding is specified. # @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails. def encode_payload(payload) - return payload unless @async_api_publish_operation.message - - message_bindings = @async_api_publish_operation.message['bindings'] - encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + encoding = determine_encoding return payload unless encoding output = EventSource::Operations::MimeEncode.new.call(encoding, payload) @@ -63,5 +60,16 @@ def encode_payload(payload) raise EventSource::Error::PayloadEncodeError, output.failure end end + + def determine_encoding + message_bindings = @async_api_publish_operation.message&.dig('bindings') + return message_bindings.first[1]['contentEncoding'] if message_bindings.present? + + amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil + end + + def amqp_protocol? + subject.is_a?(EventSource::Protocols::Amqp::BunnyPublisherProxy) + end end end