Skip to content

Commit

Permalink
Don't start event source until Rails is ready. (#117)
Browse files Browse the repository at this point in the history
* Don't start event source until rails is ready.

* Fix up when HTTP subscriber bindings are resolved.

* Properly name parameter.

* Fix http subscriber routing.

* Add more documentation.
  • Loading branch information
TreyE authored Sep 23, 2024
1 parent 7fa9b28 commit 6d3acdb
Show file tree
Hide file tree
Showing 29 changed files with 331 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2']
ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--format documentation
--color
--require spec_helper
--exclude-pattern "spec/rails_app/**/*"
3 changes: 3 additions & 0 deletions .rspec_rails_specs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--format documentation
--color
--exclude-pattern "**/*"
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gemspec
group :development, :test do
gem "rails", '>= 6.1.4'
gem "rspec-rails"
gem "parallel_tests"
gem "pry", platform: :mri, require: false
gem "pry-byebug", platform: :mri, require: false
gem 'rubocop'
Expand Down
28 changes: 23 additions & 5 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'
require 'event_source/boot_registry'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand All @@ -64,14 +65,23 @@ class << self
:async_api_schemas=

def configure
@configured = true
yield(config)
end

def initialize!
load_protocols
create_connections
load_async_api_resources
load_components
def initialize!(force = false)
# Don't boot if I was never configured.
return unless @configured
boot_registry.boot!(force) do
load_protocols
create_connections
load_async_api_resources
load_components
end
end

def boot_registry
@boot_registry ||= EventSource::BootRegistry.new
end

def config
Expand All @@ -89,6 +99,14 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def register_subscriber(subscriber_klass)
boot_registry.register_subscriber(subscriber_klass)
end

def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end
end

class EventSourceLogger
Expand Down
96 changes: 96 additions & 0 deletions lib/event_source/boot_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'set'
require 'monitor'

module EventSource
# This class manages correct/loading of subscribers and publishers
# based on the current stage of the EventSource lifecycle.
#
# Depending on both the time the initialization of EventSource is invoked
# and when subscriber/publisher code is loaded, this can become complicated.
# This is largely caused by two confounding factors:
# 1. We want to delay initialization of EventSource until Rails is fully
# 'ready'
# 2. Based on the Rails environment, such as production, development, or
# test (primarily how those different environments treat lazy vs. eager
# loading of classes in a Rails application), subscriber and publisher
# code can be loaded before, after, or sometimes even DURING the
# EventSource boot process - we need to support all models
class BootRegistry
def initialize
@unbooted_publishers = Set.new
@unbooted_subscribers = Set.new
@booted_publishers = Set.new
@booted_subscribers = Set.new
# This is our re-entrant mutex. We're going to use it to make sure that
# registration and boot methods aren't allowed to simultaneously alter
# our state. You'll notice most methods on this class are wrapped in
# synchronize calls against this.
@bootex = Monitor.new
@booted = false
end

def boot!(force = false)
@bootex.synchronize do
return if @booted && !force
yield
boot_publishers!
boot_subscribers!
@booted = true
end
end

# Register a publisher for EventSource.
#
# If the EventSource hasn't been booted, save publisher for later.
# Otherwise, boot it now.
def register_publisher(publisher_klass)
@bootex.synchronize do
if @booted
publisher_klass.validate
@booted_publishers << publisher_klass
else
@unbooted_publishers << publisher_klass
end
end
end

# Register a subscriber for EventSource.
#
# If the EventSource hasn't been booted, save the subscriber for later.
# Otherwise, boot it now.
def register_subscriber(subscriber_klass)
@bootex.synchronize do
if @booted
subscriber_klass.create_subscription
@booted_subscribers << subscriber_klass
else
@unbooted_subscribers << subscriber_klass
end
end
end

# Boot the publishers.
def boot_publishers!
@bootex.synchronize do
@unbooted_publishers.each do |pk|
pk.validate
@booted_publishers << pk
end
@unbooted_publishers = Set.new
end
end

# Boot the subscribers.
def boot_subscribers!
@bootex.synchronize do
@unbooted_subscribers.each do |sk|
sk.create_subscription
@booted_subscribers << sk
end
@unbooted_subscribers = Set.new
end
end
end
end
2 changes: 1 addition & 1 deletion lib/event_source/protocols/amqp_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require_relative 'amqp/contracts/contract'

Gem
.find_files('event_source/protocols/amqp/contracts/**/*.rb')
.find_files('event_source/protocols/amqp/contracts/**/*.rb', false)
.sort
.each { |f| require(f) }

Expand Down
10 changes: 6 additions & 4 deletions lib/event_source/protocols/http/faraday_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ def actions
# @return [Queue] Queue instance
def subscribe(subscriber_klass, _options)
unique_key = [app_name, formatted_exchange_name].join(delimiter)
logger.debug "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.debug "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
executable = subscriber_klass.executable_for(unique_key)
@subject.actions.push(executable)
subscription_key = [app_name, formatted_exchange_name].join(delimiter)
subscriber_suffix = subscriber_klass.name.downcase.gsub('::', '_')
unique_key = subscription_key + "_#{subscriber_suffix}"
logger.info "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.info "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
@subject.register_action(subscriber_klass, unique_key)
end

def consumer_proxy_for(operation_bindings)
Expand Down
17 changes: 11 additions & 6 deletions lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def self.publisher_container
@publisher_container ||= Concurrent::Map.new
end

def self.initialization_registry
@initialization_registry ||= Concurrent::Array.new
end

def self.initialize_publishers
self.initialization_registry.each do |pub|
pub.validate
end
end

def self.[](exchange_ref)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
Expand All @@ -46,12 +56,7 @@ def included(base)
}
base.extend(ClassMethods)

TracePoint.trace(:end) do |t|
if base == t.self
base.validate
t.disable
end
end
EventSource.register_publisher(base)
end

# methods to register events
Expand Down
16 changes: 13 additions & 3 deletions lib/event_source/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ class Queue
# @attr_reader [Object] queue_proxy the protocol-specific class supporting this DSL
# @attr_reader [String] name
# @attr_reader [Hash] bindings
# @attr_reader [Hash] actions
attr_reader :queue_proxy, :name, :bindings, :actions
attr_reader :queue_proxy, :name, :bindings

def initialize(queue_proxy, name, bindings = {})
@queue_proxy = queue_proxy
@name = name
@bindings = bindings
@subject = ::Queue.new
@actions = []
@registered_actions = []
end

# def subscribe(subscriber_klass, &block)
Expand Down Expand Up @@ -49,5 +48,16 @@ def close
def closed?
@subject.closed?
end

# Register an action to be performed, with a resolver class and key.
def register_action(resolver, key)
@registered_actions << [resolver, key]
end

def actions
@registered_actions.map do |ra|
ra.first.executable_for(ra.last)
end
end
end
end
8 changes: 5 additions & 3 deletions lib/event_source/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

module EventSource
# :nodoc:
class Railtie < Rails::Railtie

module Railtie
Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do
EventSource.initialize!
end
end
end
end
7 changes: 1 addition & 6 deletions lib/event_source/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ def included(base)
base.extend ClassMethods
base.include InstanceMethods

TracePoint.trace(:end) do |t|
if base == t.self
base.create_subscription
t.disable
end
end
EventSource.register_subscriber(base)
end

module InstanceMethods
Expand Down
5 changes: 5 additions & 0 deletions spec/event_source/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def call
# binding.pry
end

before :each do
EventSource.initialize!(true)
end

context '.event' do

let(:organization_params) do
{
hbx_id: '553234',
Expand Down
25 changes: 25 additions & 0 deletions spec/event_source/rails_application_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require "spec_helper"
require "parallel_tests"
require "parallel_tests/rspec/runner"

RSpec.describe EventSource, "rails specs" do
it "runs the rails tests in the rails application context" do
ParallelTests.with_pid_file do
specs_run_result = ParallelTests::RSpec::Runner.run_tests(
[
"spec/rails_app/spec/railtie_spec.rb",
"spec/rails_app/spec/http_service_integration_spec.rb"
],
1,
1,
{
serialize_stdout: true,
test_options: ["-O", ".rspec_rails_specs", "--format", "documentation"]
}
)
if specs_run_result[:exit_status] != 0
fail(specs_run_result[:stdout] + "\n\n")
end
end
end
end
11 changes: 11 additions & 0 deletions spec/rails_app/app/event_source/events/determinations/eval.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module Events
module Determinations
# Eval will register event publisher for MiTC
class Eval < EventSource::Event
publisher_path 'publishers.mitc_publisher'

end
end
end
9 changes: 9 additions & 0 deletions spec/rails_app/app/event_source/publishers/mitc_publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Publishers
class MitcPublisher
# Publisher will send request payload to MiTC for determinations
include ::EventSource::Publisher[http: '/determinations/eval']
register_event '/determinations/eval'
end
end

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module Subscribers
class MitcResponseSubscriber
include ::EventSource::Subscriber[http: '/determinations/eval']
extend EventSource::Logging

subscribe(:on_determinations_eval) do |body, status, headers|
$GLOBAL_TEST_FLAG = true
end
end
end

This file was deleted.

Loading

0 comments on commit 6d3acdb

Please sign in to comment.