From 1404673054b1c73e4fd6c062d85a33e3fe8e985d Mon Sep 17 00:00:00 2001 From: MahdiBM <54685446+MahdiBM@users.noreply.github.com> Date: Thu, 22 Apr 2021 20:56:36 +0430 Subject: [PATCH] fix for issue #85 and #94 --- Sources/Queues/QueuesCommand.swift | 2 +- Sources/Queues/ScheduledJob.swift | 25 ++++++++++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index b9ff152..679adc6 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -196,7 +196,7 @@ public final class QueuesCommand: Command { } // stop all scheduled jobs self.scheduledTasks.values.forEach { - $0.task.syncCancel(on: self.eventLoopGroup.next()) + $0.task.cancel() } } diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index 6c57c72..04b6e6a 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -1,7 +1,8 @@ -import class NIO.RepeatedTask +import struct NIO.Scheduled /// Describes a job that can be scheduled and repeated public protocol ScheduledJob { + /// The name unique to this `ScheduledJob` var name: String { get } /// The method called when the job is run /// - Parameter context: A `JobContext` that can be used @@ -24,10 +25,10 @@ class AnyScheduledJob { extension AnyScheduledJob { struct Task { - let task: RepeatedTask + let task: Scheduled let done: EventLoopFuture } - + func schedule(context: QueueContext) -> Task? { context.logger.trace("Beginning the scheduler process") guard let date = self.scheduler.nextDate() else { @@ -36,15 +37,21 @@ extension AnyScheduledJob { } context.logger.debug("Scheduling \(self.job.name) to run at \(date)") let promise = context.eventLoop.makePromise(of: Void.self) - let task = context.eventLoop.scheduleRepeatedTask( - initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), - delay: .seconds(0) - ) { task in - // always cancel - task.cancel() + let initialDelay: TimeAmount = .nanoseconds( + Int64(date.timeIntervalSinceNow * 1000 * 1000 * 1000)) + let task = context.eventLoop.scheduleTask(in: initialDelay) { context.logger.trace("Running the scheduled job \(self.job.name)") self.job.run(context: context).cascade(to: promise) } +// let task = context.eventLoop.scheduleRepeatedTask( +// initialDelay: .nanoseconds(Int64(date.timeIntervalSinceNow * 1000 * 1000 * 1000)), +// delay: .zero +// ) { task in +// // always cancel +// task.cancel() +// context.logger.trace("Running the scheduled job \(self.job.name)") +// self.job.run(context: context).cascade(to: promise) +// } return .init(task: task, done: promise.futureResult) } }