diff --git a/README.md b/README.md index a6c0d101..62f0da3c 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,6 @@ For small projects, you can run Solid Queue on the same machine as your webserve **Note**: future changes to the schema will come in the form of regular migrations. - ### Single database configuration Running Solid Queue in a separate database is recommended, but it's also possible to use one single database for both the app and the queue. Just follow these steps: @@ -99,7 +98,6 @@ By default, Solid Queue will try to find your configuration under `config/queue. bin/jobs -c config/calendar.yml ``` - This is what this configuration looks like: ```yml @@ -236,6 +234,7 @@ There are several settings that control how Solid Queue works that you can set a - `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`. - `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, but this will happen automatically in the near future. - `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes. +- `calc_memory_usage`: a proc returns the memory consumption of the process(es) that you want to measure. It yields the Worker process PID and runs in the context of the Worker that is configured with `recycle_on_oom`. [Read more](#memory-consumption). ## Errors when enqueuing @@ -428,7 +427,112 @@ my_periodic_resque_job: schedule: "*/5 * * * *" ``` -and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. +and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any +`solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once +each time. + +## Recycle On OOM + +This feature recycles / restarts a worker whenever it exceeds the specified memory threshold. This is particularly +useful for jobs with high memory consumption or when deploying in a memory-constrained environment. + +If the result of the `calc_memory_usage` Proc is greater than the `recycle_on_oom` value configured on a specific +worker, that worker will restart. It's important that the units returned by the `calc_memory_usage` Proc match the units +of the `recycle_on_oom` value. +For instance, if the `calc_memory_usage` Proc returns a value MB (i.e., 300 Vs. 300_000_000), the `recycle_on_oom` value +should also be specified in MB. + +Using the `get_process_memory` gem, and configuring it return an integer value in MB, you can configure SolidQueue as +follows: + +```ruby +# application.rb, production.rb, or +# initializer/solid_queue.rb file +Rails.application.config.solid_queue.calc_memory_usage = ->(pid) { GetProcessMem.new(pid).mb.round(0) } +``` + +Here is an example of configuring a worker to recycle when memory usage exceeds 200MB: + +```yml +worker: + queues: "*" + threads: 3 + polling_interval: 2 + recycle_on_oom: 200 +``` + +You can also use the `calc_memory_usage` Proc to compute the memory usage across multiple processes: + +```ruby +SolidQueue.configure do |config| + config.calc_memory_usage = ->(_) do + SolidQueue::Process.pluck(:pid).sum do |pid| + GetProcessMem.new(pid).mb.round(0) + rescue StandardError + 0 # just in case the process for the pid is no longer running + end + end +end +``` + +Then, set the worker to recycle based on the aggregate maximum memory usage of all processes: + +```yml +worker: + queues: "*" + threads: 3 + polling_interval: 2 + recycle_on_oom: 512 +``` + +Be cautious when using this feature, as it can lead to restarting the worker after each job if not properly configured. +It is advisable to be especially careful using threads with workers configured with `recycle_on_oom`. +For example, two queues — `slow_low_memory` and `fast_high_memory` — could easily result in the slow_low_memory jobs +never completing due to the fast_high_memory jobs triggering the Worker tp recycle without allowing the slow_low_memory +jobs enough time to run to completion. + +### A Brief Digression +This is a good time to mention if you choose to use `recycle_on_oom` with threads, then your jobs *really, really should* +be **idempotent** -- a very fancy way of saying that a job could easily be started and stopped multiple times +(see previous paragraph) so it critical than the job be designed in a way to allow for multiple runs before it completes +without doing anything *"unseemly"* (such as email a customer with the same message with each restart). + +### Finishing recycle_on_oom +Anytime a Worker is recycled due to memory consumption, it will emit a standard SolidQueue log message labeled: "Worker +OOM". It will report the memory usage that triggered the restart and the vital statistics of the Worker process. +SolidQueue will also output it's standard messaging about the Worker starting and registering. + +Other ideas that might help with memory constrained environments include: + +```ruby +SolidQueue.on_start do + # If supported by your environment + # This setting will be inherited by all processes started by this Supervisor, including recycled Workers + GC.auto_compact = true + + Process.warmup +end +``` + +and + +```ruby +SolidQueue.on_worker_start { Process.warmup } +``` + +```yml +worker: + queues: "*" + threads: 3 + polling_interval: 2 + recycle_on_oom: 0 +``` + +will effectively restart at the end of every job. + +Finally, triggering a full GC via either after_perform, around_perform, or the end of your Job can't hurt, as it will +run prior to the memory +check by the Worker. ## Inspiration diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index d4abf45a..c6da0766 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -65,8 +65,6 @@ def perform else failed_with(result.error) end - ensure - job.unblock_next_blocked_job end def release diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e7070d26..7af2233e 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -40,6 +40,7 @@ module SolidQueue mattr_accessor :preserve_finished_jobs, default: true mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :calc_memory_usage, default: nil delegate :on_start, :on_stop, to: Supervisor diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 3d2ec02c..f892d72e 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -61,6 +61,20 @@ def enqueue_recurring_task(event) end end + def recycle_worker(event) + process = event.payload[:process] + + attributes = { + memory_used: event.payload[:memory_used], + pid: process.pid, + hostname: process.hostname, + process_id: process.process_id, + name: process.name + } + + warn formatted_event(event, action: "#{process.kind} OOM", **attributes) + end + def start_process(event) process = event.payload[:process] diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index c1bcf195..64e5c03f 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -15,15 +15,23 @@ def initialize(size, on_idle: nil) @mutex = Mutex.new end - def post(execution) + def post(execution, worker) available_threads.decrement - future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution| + future = Concurrent::Future.new(args: [ execution, worker ], executor: executor) do |thread_execution, worker_execution| wrap_in_app_executor do thread_execution.perform ensure - available_threads.increment - mutex.synchronize { on_idle.try(:call) if idle? } + wrap_in_app_executor do + execution.job.unblock_next_blocked_job + end + + if worker_execution.oom? + worker_execution.recycle(execution) + else + available_threads.increment + mutex.synchronize { on_idle.try(:call) if idle? } + end end end diff --git a/lib/solid_queue/processes/recyclable.rb b/lib/solid_queue/processes/recyclable.rb new file mode 100644 index 00000000..9493c860 --- /dev/null +++ b/lib/solid_queue/processes/recyclable.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +require "active_support/concern" + +module SolidQueue::Processes + module Recyclable + extend ActiveSupport::Concern + + included do + attr_reader :max_memory, :calc_memory_usage + end + + def recyclable_setup(**options) + return unless configured?(options) + + set_max_memory(options[:recycle_on_oom]) + set_calc_memory_usage if max_memory + SolidQueue.logger.error { "Recycle on OOM is disabled for worker #{pid}" } unless oom_configured? + end + + def recycle(execution = nil) + return false if !oom_configured? || stopped? + + memory_used = calc_memory_usage.call(pid) + return false unless memory_exceeded?(memory_used) + + SolidQueue.instrument(:recycle_worker, process: self, memory_used: memory_used, class_name: execution&.job&.class_name) do + pool.shutdown + stop + end + + true + end + + def oom? + oom_configured? && calc_memory_usage.call(pid) > max_memory + end + + private + + def configured?(options) + options.key?(:recycle_on_oom) + end + + def oom_configured? + @oom_configured ||= max_memory.present? && calc_memory_usage.present? + end + + def memory_exceeded?(memory_used) + memory_used > max_memory + end + + def set_max_memory(max_memory) + if max_memory > 0 + @max_memory = max_memory + else + SolidQueue.logger.error { "Invalid value for recycle_on_oom: #{max_memory}." } + end + end + + def set_calc_memory_usage + if SolidQueue.calc_memory_usage.respond_to?(:call) + @calc_memory_usage = SolidQueue.calc_memory_usage + else + SolidQueue.logger.error { "SolidQueue.calc_memory_usage provider not configured." } + end + end + end +end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index fc203774..df341856 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -3,6 +3,7 @@ module SolidQueue class Worker < Processes::Poller include LifecycleHooks + include Processes::Recyclable after_boot :run_start_hooks before_shutdown :run_stop_hooks @@ -11,6 +12,7 @@ class Worker < Processes::Poller def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) + recyclable_setup(**options) @queues = Array(options[:queues]) @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) @@ -19,14 +21,15 @@ def initialize(**options) end def metadata - super.merge(queues: queues.join(","), thread_pool_size: pool.size) + super.then { _1.merge(queues: queues.join(","), thread_pool_size: pool.size) } + .then { oom_configured? ? _1.merge(recycle_on_oom: max_memory) : _1 } end private def poll claim_executions.then do |executions| executions.each do |execution| - pool.post(execution) + pool.post(execution, self) end executions.size diff --git a/test/dummy/app/jobs/recycle_job.rb b/test/dummy/app/jobs/recycle_job.rb new file mode 100644 index 00000000..f936ba00 --- /dev/null +++ b/test/dummy/app/jobs/recycle_job.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class RecycleJob < ApplicationJob + def perform(nap = nil) + sleep(nap) unless nap.nil? + end +end diff --git a/test/dummy/app/jobs/recycle_with_concurrency_job.rb b/test/dummy/app/jobs/recycle_with_concurrency_job.rb new file mode 100644 index 00000000..09db9147 --- /dev/null +++ b/test/dummy/app/jobs/recycle_with_concurrency_job.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class RecycleWithConcurrencyJob < ApplicationJob + limits_concurrency key: ->(nap = nil) { } + + def perform(nap = nil) + sleep(nap) unless nap.nil? + end +end diff --git a/test/integration/recycle_worker_test.rb b/test/integration/recycle_worker_test.rb new file mode 100644 index 00000000..a9165edb --- /dev/null +++ b/test/integration/recycle_worker_test.rb @@ -0,0 +1,198 @@ +# frozen_string_literal: true + +require "test_helper" +require "thread" + +class RecycleWorkerTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + attr_accessor :pid + + def setup + @pid = nil + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "recycle_on_oom set via worker config" do + @pid, _count = start_solid_queue(default_worker, calc_memory_usage_oom) + + SolidQueue::Process.where(kind: "Worker").each do |worker| + assert worker.metadata.has_key?("recycle_on_oom"), "Worker not configured for recycle_on_oom #{worker.id} #{worker.metadata}\n" + end + end + + test "recycle_on_oom via worker config per worker" do + workers = [ + { queues: "with", polling_interval: 0.1, processes: 3, threads: 2, recycle_on_oom: 1 }, + { queues: "without", polling_interval: 0.1, processes: 3, threads: 2 } + ] + + @pid, _count = start_solid_queue(workers, calc_memory_usage_oom) + + process_with, process_without = SolidQueue::Process.where(kind: "Worker").partition { _1.metadata["queues"] == "with" } + + assert process_with.all? { _1.metadata.has_key?("recycle_on_oom") }, "Worker unexpectedly configured without recycle_on_oom" + assert process_without.none? { _1.metadata.has_key?("recycle_on_oom") }, "Worker unexpectedly configured with recycle_on_oom" + end + + test "recycle_on_oom is an optional config parameter" do + worker_without_recycle = default_worker.tap { _1.delete(:recycle_on_oom) } + @pid, _count = start_solid_queue(worker_without_recycle, calc_memory_usage_oom) + + SolidQueue::Process.where(kind: "Worker").each do |worker| + assert_not worker.metadata.has_key?("recycle_on_oom"), "Worker configured for recycle_on_oom #{worker.id} #{worker.metadata}\n" + end + end + + test "recycle_on_oom is off globally without setting calc_memory_usage (default)" do + @pid, _count = start_solid_queue(default_worker, calc_memory_usage_off) + + SolidQueue::Process.where(kind: "Worker").each do |worker| + assert_not worker.metadata.has_key?("recycle_on_oom"), "Worker configured for recycle_on_oom #{worker.id} #{worker.metadata}\n" + end + end + + test "Workers don't recycle unless configured" do + @pid, count = start_solid_queue(default_worker, calc_memory_usage_off) # this turns recycle OFF + + _before_id, before_pid = worker_process + assert_not before_pid.nil?, "Before PID nil" + + jobs = 0.upto(5).map { RecycleJob.new } + ActiveJob.perform_all_later(jobs) + + wait_for_jobs_to_finish_for(6.seconds) + assert_no_unfinished_jobs + wait_for_registered_processes(count, timeout: 1.second) + + _before_id, after_pid = worker_process + assert_not after_pid.nil?, "After PID nil" + + assert before_pid == after_pid, "Worker unexpectedly recycled" + end + + test "Worker recycles on OOM condition" do + @pid, count = start_solid_queue(default_worker, calc_memory_usage_oom) + + before_id, before_pid = worker_process + assert_not before_pid.nil?, "Before PID nil" + + jobs = 0.upto(5).map { RecycleJob.new } + ActiveJob.perform_all_later(jobs) + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + wait_for_registered_processes(count, timeout: 1.second) + + after_id, after_pid = worker_process + + assert_not after_pid.nil?, "After PID nil" + assert before_pid != after_pid, "Worker didn't recycled" + end + + test "Worker don't recycle without OOM condition" do + @pid, count = start_solid_queue(default_worker, calc_memory_usage_not_oom) + + before_id, before_pid = worker_process + assert_not before_pid.nil?, "Before PID nil" + + jobs = 0.upto(5).map { RecycleJob.new } + ActiveJob.perform_all_later(jobs) + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + wait_for_registered_processes(count, timeout: 1.second) + + after_id, after_pid = worker_process + + assert_not after_pid.nil?, "After PID nil" + assert before_pid == after_pid, "Worker unexpectedly recycled PID" + assert before_id == after_id, "Worker unexpectedly created new Process row" + end + + test "Jobs on threads finish even when worker recycles on OOM" do + workers = { queues: %w[fast slow], polling_interval: 0.1, processes: 1, threads: 2, recycle_on_oom: 1 } + @pid, count = start_solid_queue(workers, calc_memory_usage_oom) + + before_id, before_pid = worker_process + assert_not before_pid.nil?, "Before PID nil" + + RecycleJob.set(queue: "slow").perform_later(2) + RecycleJob.set(queue: "fast").perform_later(0) + RecycleJob.set(queue: "slow").perform_later(2) + RecycleJob.set(queue: "fast").perform_later(0) + + wait_for_jobs_to_finish_for(10.seconds) + assert_no_unfinished_jobs + wait_for_registered_processes(count, timeout: 1.second) + + after_id, after_pid = worker_process + + assert_not after_pid.nil?, "After PID nil" + assert before_pid != after_pid, "Worker didn't create new PID" + assert before_id != after_id, "Worker did not created new Process row" + end + + test "Jobs that hold locks are released on OOM recycle " do + @pid, count = start_solid_queue(default_worker, calc_memory_usage_oom) + + before_id, before_pid = worker_process + assert_not before_pid.nil?, "Before PID nil" + + jobs = 0.upto(9).map { RecycleJob.new } + ActiveJob.perform_all_later(jobs) + + wait_for_jobs_to_finish_for(15.seconds) + assert_no_unfinished_jobs + wait_for_registered_processes(count, timeout: 1.second) + + after_id, after_pid = worker_process + + assert_not after_pid.nil?, "After PID nil" + assert before_pid != after_pid, "Worker didn't change PID" + end + + private + def start_solid_queue(workers, calc_memory_usage) + w = workers.is_a?(Array) ? workers : [ workers ] + + pid = fork do + SolidQueue.calc_memory_usage = calc_memory_usage + SolidQueue.shutdown_timeout = 1.second + + SolidQueue::Supervisor.start(workers: w, dispatchers: default_dispatcher, skip_recurring: true) + end + + expected_process_count = w.sum { _1.fetch(:processes, 1) } + 2 # supervisor + dispatcher + wait_for_registered_processes(expected_process_count, timeout: 0.5.second) # 3 workers working the default queue + dispatcher + supervisor + + [ pid, expected_process_count ] + end + + def worker_process + SolidQueue::Process.find_by(kind: "Worker")&.slice(:id, :pid)&.values + end + + def calc_memory_usage_oom + ->(_pid) { 2 } + end + + def calc_memory_usage_not_oom + ->(_pid) { 0 } + end + + def calc_memory_usage_off + nil + end + + def default_dispatcher + [ { polling_interval: 0.1, batch_size: 100, concurrency_maintenance_interval: 600 } ] + end + + def default_worker + { queues: "default", polling_interval: 0.1, processes: 3, threads: 1, recycle_on_oom: 1 } + end +end