diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..d2e7774 --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,31 @@ +# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Java CI with Maven + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + cache: maven + - name: Build with Maven + run: mvn -B package --file pom.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b7f94a2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ +dependency-reduced-pom.xml + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +.idea/ +output + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f8fe6f1 --- /dev/null +++ b/README.md @@ -0,0 +1,813 @@ +# Task Scheduler + +The goal of this project is to provide a hands-on guide on building a scalable, distributed, fault-tolerant, task +scheduler platform using Zookeeper in Java. + +## Maven Dependency + +Use the following maven dependency + +``` + + com.snehasishroy + TaskScheduler + 1.0 + +``` + +## Starting Zookeeper Server + +This service utilizes TTL Nodes which requires Zookeeper Server >= 3.5.4. + +It also requires the `extendedTypesEnabled` to be set while starting the ZK Server. + +``` +vim {ZK}/bin/zkEnv.sh + +export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Dzookeeper.extendedTypesEnabled=true" +``` + +Sample `zoo.cfg` + +``` +tickTime = 200 +dataDir = /data/zookeeper +clientPort = 2181 +initLimit = 5 +syncLimit = 2 +``` + +Starting the ZK Server + +``` +sudo ./zkServer.sh start-foreground +``` + +## Tools for Zookeeper Visualization + +https://github.com/elkozmon/zoonavigator + + img.png + +## Running application + +``` +Main class: com.snehasishroy.taskscheduler.App +Arguments: server local.yml +``` + +If you want to run multiple instances of the application, just change both the ports present in `local.yml` and run the +application. + +img.png + +## Architecture + +img.png + +## Low level System Design + +https://snehasishroy.com/build-a-distributed-task-scheduler-using-zookeeper + +### What do we want? + +We want to create a Task Execution Service that can execute tasks in a fault-tolerant manner. The service should +dynamically discover workers and assign tasks to them. If a worker dies while executing a task, the service should be +able to find that a worker has died (*or stopped responding*) and reassign the task to a new worker, providing at least +once guarantee for job execution. All these should be done in a highly scalable and distributed manner. + +This will be a *hands-on guide* on implementing a distributed job execution service - so get your coffee mug ready. + +
+
💡
+
The code used in this article can be found here https://github.com/snehasishroy/TaskScheduler
+
+ +### Architecture + +* Clients submit job details to Zookeeper and listen to the status updates again via Zookeeper. + +* Multiple worker instances utilizes Zookeeper to perform leader election. + +* The leader instances watches job path to listen for upcoming job and assigns the jobs to the available workers. + +* The worker instances watches their assignment mapping path. When a new job is found, it gets executed and the + completion status is updated. + +* The client instances gets notified upon task completion by Zookeeper. + +### Zookeeper + +Zookeeper is a robust service that aims to **deliver coordination among distributed systems.** It's widely used in +open-source projects like Kafka and HBase as the central coordination service. + +We will use CuratorFramework in our project as it provides high-level API's for interacting with Zookeeper. + +> This blog won't deep dive into the internals of Zookeeper. Readers are expected to know the basics of Zookeeper before +> proceeding to the implementation part. + +### Implementation Details + +Let's look at the `ClientResource` - which provides a facade for task submission. + +```java + +@Slf4j +@Path("/v1/client") +public class Client { + private final ClientService clientService; + + @Inject + public Client(CuratorFramework curator) { + this.clientService = new ClientService(curator); + } + + @POST + public String createSumTask(@QueryParam("first") int a, @QueryParam("second") int b) { + Runnable jobDetail = + (Runnable & Serializable) + (() -> System.out.println("Sum of " + a + " and " + b + " is " + (a + b))); + return clientService.registerJob(jobDetail); + } +} +``` + +The above code, allows clients to submit a sample runnable task that computes the sum of two numbers and prints it - +this provides an easy way for input via Swagger. But the design is extensible - the client can submit any instance of +the `Runnable` as a Job. + +> Instead of providing a Runnable, we could have designed our service to work with +> Dockerfile - leading to a generic task execution system! but we wanted to focus only on Zookeeper in this +> article. + +Now let's look at the `ClientService` + +```java +public class ClientService { + private final CuratorFramework curator; + + public ClientService(CuratorFramework curator) { + this.curator = curator; + } + + public String registerJob(Runnable jobDetail) { + String jobId = UUID.randomUUID().toString(); + syncCreate(ZKUtils.getJobsPath() + "/" + jobId, jobDetail); + return jobId; + } + + private void syncCreate(String path, Runnable runnable) { + // create the ZNode along with the Runnable instance as data + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(runnable); + curator + .create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .forPath(path, byteArrayOutputStream.toByteArray()); + } catch (Exception e) { + log.error("Unable to create {}", path, e); + throw new RuntimeException(e); + } + } +} +``` + +Once a job is registered, a unique ID is assigned to it and a **Persistent** node is registered on the Zookeeper with +the randomly generated job ID in the path e.g. `/jobs/{job-id}`. Do notice that the `runnable` is serialized to a byte +array and stored in the ZNode directly. + +Notice that we are creating the ZNode *synchronously* i.e. the function `syncCreate` will block until the ZNode is not +created. In the later section, you will notice that we have used asynchronous operations to improve throughput. + +Why are we creating paths? So that we can set up *watches* on it. Watches allow us to be notified of any changes under +the watched path. Zookeeper will invoke the `JobsListener` whenever a new node is *created or destroyed* under +the `/jobs` path. + +> What would happen if the client is disconnected from the Zookeeper when a new job is registered? In such cases, the +> watch won't be triggered and the client won't be notified. The Curator will automatically attempt to recreate the +> watches upon reconnection. + +```java +public class JobsListener implements CuratorCacheListener { + private final CuratorFramework curator; + private final CuratorCache workersCache; + private final ExecutorService executorService; + private final WorkerPickerStrategy workerPickerStrategy; + + public JobsListener( + CuratorFramework curator, + CuratorCache workersCache, + WorkerPickerStrategy workerPickerStrategy) { + this.curator = curator; + this.workersCache = workersCache; + executorService = Executors.newSingleThreadExecutor(); + this.workerPickerStrategy = workerPickerStrategy; + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED && data.getPath().length() > ZKUtils.JOBS_ROOT.length()) { + String jobID = ZKUtils.extractNode(data.getPath()); + log.info("found new job {}, passing it to executor service", jobID); + // an executor service is used in order to avoid blocking the watcher thread as the job + // execution can be time consuming + // and we don't want to skip handling new jobs during that time + executorService.submit( + new JobAssigner(jobID, data.getData(), curator, workersCache, workerPickerStrategy)); + } + } +} +``` + +When a new job is found, we hand over the Job ID to a different thread because we don't want to block the watcher +thread. + +> All ZooKeeper watchers are serialized and processed by a single thread. Thus, no other watchers can be processed while +> your watcher is running. Hence it's vital not to block the watcher +> +thread. https://cwiki.apache.org/confluence/display/CURATOR/TN1 + +We are setting up the watcher using `CuratorCache` - which will be explained later on. + +--- + +### JobAssigner + +Once a job has been created, we need to execute it by finding an eligible worker based on a strategy. We can either +choose a worker randomly or in a round-robin manner. Once a worker is chosen, we need to create an assignment between a +JobID and a Worker ID - we do so by creating a Persistent ZNode on the path `/assignments/{worker-id}/{job-id}` . Once +the assignment is created, we *delete* the `/jobs/{job-id}` path. + +> Deletion of job details of the assigned job eases the recoverability. If a leader dies and a new leader is elected, it +> does not have to look at all the jobs present under /jobs/ and figure out which one is left unassigned. +> Any +> jobs present under/jobs/ are guaranteed to be unassigned - assuming that the assignment and +> deletion have happened atomically. + +```java +public class JobAssigner implements Runnable { + + private final CuratorFramework curator; + private final String jobID; + private final CuratorCache workersCache; + private final WorkerPickerStrategy workerPickerStrategy; + private final byte[] jobData; + private String workerName; + + public JobAssigner( + String jobID, + byte[] jobData, + CuratorFramework curator, + CuratorCache workersCache, + WorkerPickerStrategy workerPickerStrategy) { + this.jobID = jobID; + this.curator = curator; + this.workersCache = workersCache; + this.workerPickerStrategy = workerPickerStrategy; + this.jobData = jobData; + } + + @Override + public void run() { + // from the list of workers, pick a worker based on the provided strategy and assign the + // incoming job to that worker + List workers = + workersCache.stream() + .filter(childData -> (childData.getPath().length() > ZKUtils.WORKERS_ROOT.length())) + .toList(); + ChildData chosenWorker = workerPickerStrategy.evaluate(workers); + workerName = ZKUtils.extractNode(chosenWorker.getPath()); + log.info( + "Found total workers {}, Chosen worker index {}, worker name {}", + workers.size(), + chosenWorker, + workerName); + asyncCreateAssignment(); + } + + private void asyncCreateAssignment() { + try { + curator + .create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info( + "Assignment created successfully for JobID {} with WorkerID {}", + jobID, + workerName); + log.info( + "Performing async deletion of {}", ZKUtils.getJobsPath() + "/" + jobID); + asyncDelete(ZKUtils.getJobsPath() + "/" + jobID); + } + case CONNECTIONLOSS -> { + log.error( + "Lost connection to ZK while creating {}, retrying", event.getPath()); + asyncCreateAssignment(); + } + case NODEEXISTS -> { + log.warn("Assignment already exists for path {}", event.getPath()); + } + case NONODE -> { + log.error("Trying to create an assignment for a worker which does not exist {}", event); + } + default -> log.error("Unhandled event {} ", event); + } + } + }) + .forPath(ZKUtils.ASSIGNMENT_ROOT + "/" + workerName + "/" + jobID, jobData); + // Storing the job data along with the assignment, so that the respective worker need not + // perform an additional call to get the job details. + // This also simplifies the design - because we can delete the /jobs/{jobID} path once the + // assignment is completed - indicating that if an entry is present under /jobs, it's + // assignment is not yet done. + // This makes the recovery/reconciliation process much easier. Once a leader is elected, it + // has to only perform liveliness check for the existing assignments. + // TODO: Use MultiOp to perform assignment and deletion atomically + } catch (Exception e) { + log.error("Error while creating assignment for {} with {}", jobID, workerName, e); + throw new RuntimeException(e); + } + } + + private void asyncDelete(String path) { + // delete the provided ZNode + try { + curator + .delete() + .idempotent() + .guaranteed() + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Path deleted successfully {}", event.getPath()); + } + case CONNECTIONLOSS -> { + log.info( + "Lost connection to ZK while deleting {}, retrying", event.getPath()); + asyncDelete(event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(path); + } catch (Exception e) { + log.error("Unable to delete {} due to ", path, e); + throw new RuntimeException(e); + } + } +} +``` + +> We are using asynchronous operations to create a ZNode to increase throughput. Being asynchronous, we don't know +> whether our operation actually succeeded or not, hence we have to deal with failure scenarios i.e. +> ConnectionLoss and whether the Node already exists. + +### WorkerPickerStrategy + +We are using `Strategy` pattern to dynamically change the way we can choose a worker at runtime. The important thing to +notice is that we have used *compare and swap* as a way to perform optimistic locking for `RoundRobinWorker` . + +```java +public interface WorkerPickerStrategy { + ChildData evaluate(List workers); +} + +// choose workers based on random strategy +public class RandomWorker implements WorkerPickerStrategy { + @Override + public ChildData evaluate(List workers) { + int chosenWorker = (int) (Math.random() * workers.size()); + return workers.get(chosenWorker); + } +} + +// choose workers based on round robin strategy +public class RoundRobinWorker implements WorkerPickerStrategy { + AtomicInteger index = + new AtomicInteger(0); // atomic because this will be accessed from multiple threads + + @Override + public ChildData evaluate(List workers) { + int chosenIndex; + while (true) { // repeat this until compare and set operation is succeeded + chosenIndex = index.get(); + int nextIndex = (chosenIndex + 1) < workers.size() ? (chosenIndex + 1) : 0; + // in case of concurrent updates, this can fail, hence we have to retry until success + if (index.compareAndSet(chosenIndex, nextIndex)) { + break; + } + } + return workers.get(chosenIndex); + } +} +``` + +> Optimistic locking is a very powerful construct and can be found in various places e.g. ElasticSearch natively +> provides compare and swap operations while updating documents. Zookeeper also maintains a version number with each +> ZNode - which can be used to perform a conditional +> +update. https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.htmlhttps://zookeeper.apache.org/doc/current/zookeeperProgrammers.html + +--- + +### WorkerService + +Since the `JobHandler` creates an assignment using ZNode of form `/assignments/{worker-id}/{job-id}` , if a worker has +to listen to upcoming assignments, a watch needs to be set on the `/assignments/{worker-id}` path. + +Once the worker service is notified of the new assignment, it fetches the job details, deserializes it to an instance of +Runnable, and passes it to an `ExecutorService` which performs the execution. + +Once the runnable has been executed, we chain the future by updating the status of the job id. The status of a job ID is +reflected by *asynchronously* creating an entry in `/status/{job-id}` . Once the entry is created, we perform the last +operation in this orchestra - deletion of the assignment mapping. + + +> We have deliberately chosen the deletion of the assignment mapping as the last operation. In case, a worker dies +> during task execution, the leader can perform failure recovery and assign all the tasks that the dead worker was +> assigned, to a new worker instance. + +```java +public class AssignmentListener implements CuratorCacheListener { + private final CuratorFramework curator; + private final ExecutorService executorService; + + public AssignmentListener(CuratorFramework curator) { + this.curator = curator; + this.executorService = Executors.newFixedThreadPool(10); + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED) { + if (data.getPath().indexOf('/', 1) == data.getPath().lastIndexOf('/')) { + // This filters out the root path /assignment/{worker-id} which does not contains any job id + return; + } + String jobId = data.getPath().substring(data.getPath().lastIndexOf('/') + 1); + log.info("Assignment found for job id {}", jobId); + + try { + byte[] bytes = data.getData(); + ObjectInputStream objectInputStream = + new ObjectInputStream(new ByteArrayInputStream(bytes)); + Runnable jobDetail = (Runnable) objectInputStream.readObject(); + log.info("Deserialized the JobId {} to {}", jobId, jobDetail); + CompletableFuture future = CompletableFuture.runAsync(jobDetail, executorService); + // Actual execution of the job will be performed in a separate thread to avoid blocking of + // watcher thread + log.info("Job submitted for execution"); + // once the job has been executed, we need to ensure the assignment is deleted and the + // status of job has been updated. Currently there is no guarantee that post the execution, + // this cleanup happens. + // TODO: Implement a daemon service which performs cleanup + future.thenAcceptAsync(__ -> asyncCreate(jobId, data.getPath()), executorService); + } catch (Exception e) { + log.error("Unable to fetch data for job id {}", jobId, e); + } + } + } + + private void asyncCreate(String jobId, String assignmentPath) { + log.info("JobID {} has been executed, moving on to update its status", jobId); + // create the ZNode, no need to set any data with this znode + try { + curator + .create() + .withTtl(ZKUtils.STATUS_TTL_MILLIS) + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT_WITH_TTL) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Status updated successfully {}", event.getPath()); + log.info("Performing deletion of assignment path {}", assignmentPath); + asyncDelete(assignmentPath); + } + case CONNECTIONLOSS -> { + log.error( + "Lost connection to ZK while creating {}, retrying", event.getPath()); + asyncCreate(jobId, assignmentPath); + } + case NODEEXISTS -> { + log.warn("Node already exists for path {}", event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(ZKUtils.getStatusPath(jobId), "Completed".getBytes()); + } catch (Exception e) { + log.error("Unable to create {} due to ", ZKUtils.getStatusPath(jobId), e); + throw new RuntimeException(e); + } + } + + private void asyncDelete(String path) { + // delete the provided ZNode + try { + curator + .delete() + .idempotent() + .guaranteed() + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Path deleted successfully {}", event.getPath()); + } + case CONNECTIONLOSS -> { + log.info( + "Lost connection to ZK while deleting {}, retrying", event.getPath()); + asyncDelete(event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(path); + } catch (Exception e) { + log.error("Unable to delete {} due to ", path, e); + throw new RuntimeException(e); + } + } +} +``` + +### WorkersListener + +When a worker is lost due to network partition, or application shutdown, the leader instance is notified using a watcher +event. The leader then looks at all the tasks that were assigned to the lost worker by iterating over the assignment +mappings `/assignment/{worker-id}/{job-id}` . + +All the tasks are then recreated by re-creating an entry in the `/jobs/{job-id}` . This recreation triggers the entire +workflow from the start. + +```java +public class WorkersListener implements CuratorCacheListener { + + private final CuratorCache assignmentCache; + private final CuratorFramework curator; + + public WorkersListener(CuratorCache assignmentCache, CuratorFramework curator) { + this.assignmentCache = assignmentCache; + this.curator = curator; + } + + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED) { + log.info("New worker found {} ", data.getPath()); + } else if (type == Type.NODE_DELETED) { + // notice we have to check oldData because data will be null + log.info("Lost worker {}", oldData.getPath()); + String lostWorkerID = oldData.getPath().substring(oldData.getPath().lastIndexOf('/') + 1); + // map of job ids -> job data, which was assigned to the lost worker + Map assignableJobIds = new HashMap<>(); + assignmentCache.stream() + .forEach( + childData -> { + String path = childData.getPath(); + int begin = path.indexOf('/') + 1; + int end = path.indexOf('/', begin); + String pathWorkerID = path.substring(begin, end); + if (pathWorkerID.equals(lostWorkerID)) { + String jobID = path.substring(end + 1); + log.info("Found {} assigned to lost worker {}", jobID, lostWorkerID); + assignableJobIds.put(jobID, childData.getData()); + } + }); + // Assuming atomic creation of assignment path and deletion of tasks path (using MultiOp), we + // can safely assume that no entry exists under /jobs for the assigned tasks. + // So we can simulate job creation by recreating an entry in the /jobs entry. + assignableJobIds.forEach( + (jobId, jobData) -> asyncCreateJob(ZKUtils.getJobsPath() + "/" + jobId, jobData)); + } + } + + private void asyncCreateJob(String path, byte[] data) { + try { + curator + .create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Job repaired successfully for {}", path); + } + case CONNECTIONLOSS -> { + log.error( + "Lost connection to ZK while repairing job {}, retrying", + event.getPath()); + asyncCreateJob(event.getPath(), (byte[]) event.getContext()); + } + case NODEEXISTS -> { + log.warn("Job already exists for path {}", event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }, + data) + .forPath(path, data); + } catch (Exception e) { + log.error("Error while repairing job {}", path, e); + throw new RuntimeException(e); + } + } +} +``` + +### WorkerService - The humble plumber + +Throughout the article, you might have noticed that we talked about a leader instance performing some work but never +explained it. So let's talk about what a leader instance is and how an instance becomes a leader. + +When a worker instance comes up - it enqueues itself for a chance to become a leader. We perform leader elections using +the Curator framework ensuring that only a single instance can become a leader amongst the members. + +The leader is entrusted to perform critical actions like watching the `/jobs/` path and the `/workers/` path. *The +remaining instances do not set up watches on these paths because we want to ensure a task is assigned to only one worker +instance*. If multiple instances were trying to perform the assignment, it would be difficult to coordinate among them +without taking a lock. This is where the Zookeeper comes in and acts as the trusty coordination service. + +```java +public class WorkerService implements LeaderSelectorListener, Closeable { + public WorkerService(CuratorFramework curator, String path) { + this.curator = curator; + leaderSelector = new LeaderSelector(curator, path, this); + // the selection for this instance doesn't start until the leader selector is started + // leader selection is done in the background so this call to leaderSelector.start() returns + // immediately + leaderSelector.start(); + // this is important as it automatically handles failure scenarios i.e. starts leadership after + // the reconnected state + // https://www.mail-archive.com/user@curator.apache.org/msg00903.html + leaderSelector.autoRequeue(); + setup(); + workerPickerStrategy = new RoundRobinWorker(); + } + + private void setup() { + registerWorker(); + asyncCreate(ZKUtils.getJobsPath(), CreateMode.PERSISTENT, null); + asyncCreate(ZKUtils.getAssignmentPath(name), CreateMode.PERSISTENT, null); + asyncCreate(ZKUtils.STATUS_ROOT, CreateMode.PERSISTENT, null); + } + + private void registerWorker() { + if (registrationRequired.get()) { + log.info("Attempting worker registration"); + name = UUID.randomUUID().toString(); + log.info("Generated a new random name to the worker {}", name); + asyncCreate(ZKUtils.getWorkerPath(name), CreateMode.EPHEMERAL, registrationRequired); + asyncCreate(ZKUtils.getAssignmentPath(name), CreateMode.PERSISTENT, null); + watchAssignmentPath(); + // irrespective of whether this node is a leader or not, we need to watch the assignment path + } + } + + // only the leader worker will watch for incoming jobs and changes to available workers + private void watchJobsAndWorkersPath() { + // in case leadership is reacquired, repeat the setup of the watches + workersCache = CuratorCache.build(curator, ZKUtils.WORKERS_ROOT); + workersCache.start(); + log.info("Watching workers root path {}", ZKUtils.WORKERS_ROOT); + workersListener = new WorkersListener(assignmentCache, curator); + workersCache.listenable().addListener(workersListener); + + jobsCache = CuratorCache.build(curator, ZKUtils.JOBS_ROOT); + log.info("Watching jobs root path {}", ZKUtils.getJobsPath()); + jobsCache.start(); + jobsListener = new JobsListener(curator, workersCache, workerPickerStrategy); + jobsCache.listenable().addListener(jobsListener); + } + + private void watchAssignmentPath() { + // No need to check for null here because once a session is reconnected after a loss + // we need to start the assignment listener on the new worker id + assignmentCache = CuratorCache.build(curator, ZKUtils.getAssignmentPath(name)); + log.info("Watching {}", ZKUtils.getAssignmentPath(name)); + assignmentCache.start(); + assignmentListener = new AssignmentListener(curator); + assignmentCache.listenable().addListener(assignmentListener); + } + + @Override + public void takeLeadership(CuratorFramework client) { + // we are now the leader. This method should not return until we want to relinquish leadership, + // which will only happen, if someone has signalled us to stop + log.info("{} is now the leader", name); + // only the leader should watch the jobs and workers path + watchJobsAndWorkersPath(); + lock.lock(); + try { + // sleep until signalled to stop + while (!shouldStop.get()) { + condition.await(); + } + if (shouldStop.get()) { + log.warn("{} is signalled to stop!", name); + leaderSelector.close(); + } + } catch (InterruptedException e) { // this is propagated from cancel leadership election + log.error("Thread is interrupted, need to exit the leadership", e); + } finally { + // finally is called before the method return + lock.unlock(); + } + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.RECONNECTED) { + log.error("Reconnected to ZK, Received {}", newState); + // no need to start the leadership again as it is auto requeued but worker re-registration is + // still required which will create a ZNode in /workers and /assignments path + registerWorker(); + } else if (newState == ConnectionState.LOST) { + log.error("Connection suspended/lost to ZK, giving up leadership {}", newState); + registrationRequired.set(true); + // This is required as the assignment cache listens on the {worker id} which is ephemeral + // In case of a lost session, it's guaranteed that the {worker id} would have expired + // Once the session is reconnected, we need to set up the assignment listener again on a new + // worker id + // TODO: Figure out a way to simulate the disconnection from zookeeper only by one instance + log.info("Removing the watcher set on the assignment listener"); + assignmentCache.listenable().removeListener(assignmentListener); + assignmentCache.close(); + if (workersCache != null) { + log.info("Removing the watcher set on the workers listener"); + workersCache.listenable().removeListener(workersListener); + workersCache.close(); + } + if (jobsCache != null) { + log.info("Removing the watcher set on the jobs listener"); + jobsCache.listenable().removeListener(jobsListener); + jobsCache.close(); + } + // throwing this specific exception would cause the current thread to interrupt and would + // cause and InterruptedException + throw new CancelLeadershipException(); + } else if (newState == ConnectionState.SUSPENDED) { + // https://stackoverflow.com/questions/41042798/how-to-handle-apache-curator-distributed-lock-loss-of-connection + log.error("Connection has been suspended to ZK {}", newState); + // TODO: After increasing the time out, verify whether no other instance gets the lock before + // the connection is marked as LOST + } + } +} + +``` + +> It's critical for the LeaderSelector instances to pay attention to any connection state changes. If an +> instance becomes the leader, it should respond to notification of being SUSPENDED or LOST Zookeeper session. If the +> SUSPENDED state is reported, the instance must assume it might no longer be the leader until it receives a RECONNECTED +> state. If the LOST state is reported, the instance is no longer the leader and its takeLeadership method +> should exit. + +When we detect that our instance has lost its connection from Zookeeper, we remove any watches that have been set up and +throw a `CancelLeadershipException`. And then we wait until we are reconnected to the Zookeeper. + +Once reconnected, we generate a new name for the worker and set up appropriate watches. Since `autoRequeue()` was +enabled during the leader election, the instance will enqueue itself for a chance of becoming a leader. + +--- + +### Conclusion + + + +If you have read so far, I appreciate your patience. Hope you learnt something new today. Thank you for reading. + +Please feel free to ask any questions you might have in the comments. + +--- + +### Appendix + +* [ZooKeeper watches are single-threaded.](https://cwiki.apache.org/confluence/display/CURATOR/TN1) + +* [Link to the Code Repository](https://github.com/snehasishroy/TaskScheduler) \ No newline at end of file diff --git a/docs/architecture.png b/docs/architecture.png new file mode 100644 index 0000000..0e940ff Binary files /dev/null and b/docs/architecture.png differ diff --git a/docs/run.png b/docs/run.png new file mode 100644 index 0000000..49fb4f4 Binary files /dev/null and b/docs/run.png differ diff --git a/docs/zoonavigator.png b/docs/zoonavigator.png new file mode 100644 index 0000000..6976773 Binary files /dev/null and b/docs/zoonavigator.png differ diff --git a/local.yml b/local.yml new file mode 100644 index 0000000..81ca2fa --- /dev/null +++ b/local.yml @@ -0,0 +1,10 @@ +server: + applicationConnectors: + - type: http + port: 7889 + adminConnectors: + - type: http + port: 7890 + +swagger: + resourcePackage: com.umar.taskscheduler.resources \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..81e95e4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,261 @@ + + + 4.0.0 + + com.umar + TaskScheduler + 1.0 + jar + TaskScheduler + https://github.com/umar/TaskScheduler + Reference guide on building a scalable, distributed, fault-tolerant, task scheduler platform using + Zookeeper in Java + + 2024 + + + GitHub Issues + https://github.com/umar/TaskScheduler/issues + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + scm:git:https://github.com/umar/TaskScheduler.git + scm:git:https://github.com/umar/TaskScheduler.git + HEAD + https://github.com/umar/TaskScheduler + + + + + umar + Snehasish Roy + umar39@gmail.com + + + + + UTF-8 + UTF-8 + com.umar.taskscheduler.App + 17 + 17 + 4.0.1 + UTF-8 + + + + + + io.dropwizard + dropwizard-dependencies + ${dropwizard.version} + pom + import + + + + + + org.apache.curator + curator-recipes + 5.5.0 + + + org.apache.curator + curator-x-discovery + 5.5.0 + + + org.apache.curator + curator-x-discovery-server + 5.5.0 + + + javax.ws.rs + jsr311-api + + + + + org.projectlombok + lombok + 1.18.30 + provided + + + com.smoketurner + dropwizard-swagger + 4.0.0-1 + + + io.dropwizard + dropwizard-core + ${dropwizard.version} + + + ru.vyarus + dropwizard-guicey + 7.1.0 + + + com.fasterxml.jackson.core + jackson-annotations + 2.15.2 + + + org.junit.jupiter + junit-jupiter-api + 5.8.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.8.2 + test + + + junit + junit + 4.13.2 + test + + + + org.slf4j + slf4j-api + 1.7.36 + + + + org.slf4j + slf4j-simple + 1.7.36 + + + + + + + + + maven-surefire-plugin + 3.1.2 + + + org.apache.maven.plugins + maven-source-plugin + 3.2.0 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + true + + --pinentry-mode + loopback + + + + + + + + org.sonatype.central + central-publishing-maven-plugin + 0.4.0 + true + + central + true + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + true + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + ${mainClass} + + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.6.3 + + all,-missing + + + + attach-javadocs + + jar + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/umar/taskscheduler/App.java b/src/main/java/com/umar/taskscheduler/App.java new file mode 100644 index 0000000..af6850e --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/App.java @@ -0,0 +1,92 @@ +package com.umar.taskscheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.umar.taskscheduler.callbacks.AssignmentListener; +import com.umar.taskscheduler.module.GuiceModule; +import com.umar.taskscheduler.resources.Job; +import com.umar.taskscheduler.resources.Worker; + +import io.dropwizard.core.Application; +import io.dropwizard.core.setup.Bootstrap; +import io.dropwizard.core.setup.Environment; +import io.federecio.dropwizard.swagger.SwaggerBundle; +import io.federecio.dropwizard.swagger.SwaggerBundleConfiguration; +import lombok.extern.slf4j.Slf4j; +import ru.vyarus.dropwizard.guice.GuiceBundle; + +/** + * Main application class for the Distributed Task Scheduler. + * Initializes the application with the necessary configurations, bundles, and modules. + * + * @author Umar Mohammad + */ +@Slf4j +public class App extends Application { + + private static final Logger log = LoggerFactory.getLogger(App.class); + + /** + * Main entry point for the application. + * + * @param args Command line arguments passed to the application. + * @throws Exception if any error occurs during the application start. + */ + public static void main(String[] args) throws Exception { + new App().run(args); + } + + /** + * Initializes the application by adding necessary bundles. + * This method is responsible for setting up Guice and Swagger bundles. + * + * @param bootstrap The Bootstrap object for the Dropwizard application. + */ + @Override + public void initialize(Bootstrap bootstrap) { + // Add Guice and Swagger bundles to the application. + bootstrap.addBundle(guiceBundle()); + bootstrap.addBundle( + new SwaggerBundle<>() { + @Override + protected SwaggerBundleConfiguration getSwaggerBundleConfiguration( + AppConfiguration appConfiguration) { + return appConfiguration.getSwaggerBundleConfiguration(); + } + }); + } + + /** + * Runs the application by registering REST resources. + * + * @param c The AppConfiguration object containing the application's configuration. + * @param e The Environment object representing the current environment of the application. + */ + @Override + public void run(AppConfiguration c, Environment e) { + log.info("Registering REST resources"); + // Register REST resources for Worker and Job services + e.jersey().register(Worker.class); + e.jersey().register(Job.class); + } + + /** + * Creates a new instance of the Guice module for dependency injection. + * + * @return A new GuiceModule instance. + */ + private GuiceModule createGuiceModule() { + return new GuiceModule(); + } + + /** + * Configures and returns the Guice bundle for the application. + * This method enables automatic configuration of Guice modules. + * + * @return A configured GuiceBundle object. + */ + private GuiceBundle guiceBundle() { + return GuiceBundle.builder().enableAutoConfig().modules(new GuiceModule()).build(); + } +} diff --git a/src/main/java/com/umar/taskscheduler/AppConfiguration.java b/src/main/java/com/umar/taskscheduler/AppConfiguration.java new file mode 100644 index 0000000..33064f6 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/AppConfiguration.java @@ -0,0 +1,48 @@ +package com.umar.taskscheduler; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.dropwizard.core.Configuration; +import io.federecio.dropwizard.swagger.SwaggerBundleConfiguration; +import lombok.*; + +/** + * Configuration class for the Distributed Task Scheduler application. + * This class handles the application's custom configurations, including Swagger. + * + * The configurations are read from the application configuration file and + * mapped to this class. It extends Dropwizard's {@link Configuration} class. + * + * Jackson annotations are used to handle the JSON mapping of the configuration properties. + * + * Lombok annotations such as {@link Data}, {@link Builder}, {@link AllArgsConstructor}, + * and {@link NoArgsConstructor} are used to generate boilerplate code for getter, setter, + * constructor, and builder pattern automatically. + * + * Swagger configuration is handled through the {@link SwaggerBundleConfiguration} property. + * + * @author Umar Mohammad + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class AppConfiguration extends Configuration { + + /** + * Swagger configuration for the API documentation. + * This field is mapped from the configuration file using the {@code swagger} property. + */ + @JsonProperty("swagger") + private SwaggerBundleConfiguration swaggerBundleConfiguration; + + /** + * Getter for the Swagger configuration. + * + * @return the SwaggerBundleConfiguration object containing Swagger settings. + */ + public SwaggerBundleConfiguration getSwaggerBundleConfiguration() { + return swaggerBundleConfiguration; + } +} diff --git a/src/main/java/com/umar/taskscheduler/DistributedTaskScheduler/App.java b/src/main/java/com/umar/taskscheduler/DistributedTaskScheduler/App.java new file mode 100644 index 0000000..988e727 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/DistributedTaskScheduler/App.java @@ -0,0 +1,26 @@ +package com.umar.taskscheduler.DistributedTaskScheduler; + +/** + * Main entry point for the Distributed Task Scheduler application. + * + * This application serves as a starting point for initializing and running the task scheduler. + * Additional setup and configuration will be needed for actual distributed task scheduling functionality. + * + * Author: Umar Mohammad + */ +public class App { + + /** + * Main method that serves as the entry point for the application. + * + * @param args Command line arguments passed during application startup. + */ + public static void main(String[] args) { + // TODO: Initialize necessary components for the distributed task scheduler + System.out.println("Distributed Task Scheduler is starting..."); + + // This is where the actual initialization logic for the task scheduler should go + // For now, it's a placeholder to print a simple message. + System.out.println("Hello World!"); + } +} diff --git a/src/main/java/com/umar/taskscheduler/JobDetail.java b/src/main/java/com/umar/taskscheduler/JobDetail.java new file mode 100644 index 0000000..72101c6 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/JobDetail.java @@ -0,0 +1,21 @@ +package com.umar.taskscheduler; + +import java.io.Serializable; + +/** + * JobDetail interface that represents the details of a job to be executed in the Distributed Task Scheduler. + * + * This interface extends {@link Runnable} for task execution and {@link Serializable} + * to allow job details to be transmitted or stored as serialized objects. + * + * Any class implementing this interface should define the logic for the job's execution + * in the {@link Runnable#run()} method and ensure the job details can be serialized. + * + * Implementations of this interface are expected to be used for scheduling and executing + * tasks within the distributed system. + * + * @author Umar Mohammad + */ +public interface JobDetail extends Runnable, Serializable { + // No additional methods; combines Runnable and Serializable for job execution and serialization. +} diff --git a/src/main/java/com/umar/taskscheduler/callbacks/AssignmentListener.java b/src/main/java/com/umar/taskscheduler/callbacks/AssignmentListener.java new file mode 100644 index 0000000..fda40e7 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/callbacks/AssignmentListener.java @@ -0,0 +1,171 @@ +package com.umar.taskscheduler.callbacks; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.umar.taskscheduler.util.ZKUtils; + +/** + * AssignmentListener is responsible for listening to ZooKeeper events related to job assignments. + * + * This listener detects newly created nodes representing job assignments, deserializes the job details, + * executes the job asynchronously, and updates the job's status in ZooKeeper upon completion. + * + * It leverages Curator's {@link CuratorCacheListener} to monitor node changes and uses + * an {@link ExecutorService} to execute jobs in separate threads to avoid blocking the watcher thread. + * + * Future improvements include adding a daemon service for cleanup of completed jobs. + * + * @author Umar Mohammad + */ +@Slf4j +public class AssignmentListener implements CuratorCacheListener { + + private final CuratorFramework curator; + private final ExecutorService executorService; + private static final Logger log = LoggerFactory.getLogger(AssignmentListener.class); + + /** + * Constructor for AssignmentListener. + * + * @param curator CuratorFramework instance to interact with ZooKeeper. + */ + public AssignmentListener(CuratorFramework curator) { + this.curator = curator; + this.executorService = Executors.newFixedThreadPool(10); + } + + /** + * Handles ZooKeeper events related to job assignments. + * + *

When a new assignment node is created, this method deserializes the job, + * executes it asynchronously, and updates the status of the job upon completion. + * + * @param type The type of event (e.g., NODE_CREATED). + * @param oldData Data before the change (if applicable). + * @param data The current data of the node. + */ + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED) { + if (data.getPath().indexOf('/', 1) == data.getPath().lastIndexOf('/')) { + // Skip root path /assignment/{worker-id}, which contains no job id + return; + } + String jobId = data.getPath().substring(data.getPath().lastIndexOf('/') + 1); + log.info("Assignment found for job id {}", jobId); + + try { + // Deserialize job detail from ZooKeeper data + byte[] bytes = data.getData(); + ObjectInputStream objectInputStream = + new ObjectInputStream(new ByteArrayInputStream(bytes)); + Runnable jobDetail = (Runnable) objectInputStream.readObject(); + log.info("Deserialized the JobId {} to {}", jobId, jobDetail); + + // Execute the job asynchronously + CompletableFuture future = CompletableFuture.runAsync(jobDetail, executorService); + log.info("Job submitted for execution"); + + // Upon completion, update job status and perform cleanup + future.thenAcceptAsync(__ -> asyncCreate(jobId, data.getPath()), executorService); + } catch (Exception e) { + log.error("Unable to fetch data for job id {}", jobId, e); + } + } + } + + /** + * Updates the job status in ZooKeeper after execution. + * + *

This method creates a persistent node with TTL (Time To Live) in ZooKeeper + * to mark the job as completed. It also retries in case of connection loss. + * + * @param jobId The ID of the job being executed. + * @param assignmentPath The path of the assignment node in ZooKeeper. + */ + private void asyncCreate(String jobId, String assignmentPath) { + log.info("JobID {} has been executed, moving on to update its status", jobId); + + try { + // Create status node with TTL in ZooKeeper + curator + .create() + .withTtl(ZKUtils.STATUS_TTL_MILLIS) + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT_WITH_TTL) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Status updated successfully {}", event.getPath()); + log.info("Performing deletion of assignment path {}", assignmentPath); + asyncDelete(assignmentPath); + } + case CONNECTIONLOSS -> { + log.error("Lost connection to ZK while creating {}, retrying", event.getPath()); + asyncCreate(jobId, assignmentPath); + } + case NODEEXISTS -> log.warn("Node already exists for path {}", event.getPath()); + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(ZKUtils.getStatusPath(jobId), "Completed".getBytes()); + } catch (Exception e) { + log.error("Unable to create {} due to ", ZKUtils.getStatusPath(jobId), e); + throw new RuntimeException(e); + } + } + + /** + * Deletes the assignment node after job execution. + * + *

This method deletes the ZooKeeper node for the job assignment + * and retries if there is a connection loss. + * + * @param path The path of the assignment node to delete. + */ + private void asyncDelete(String path) { + try { + // Delete assignment node in ZooKeeper + curator + .delete() + .idempotent() + .guaranteed() + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> log.info("Path deleted successfully {}", event.getPath()); + case CONNECTIONLOSS -> { + log.info("Lost connection to ZK while deleting {}, retrying", event.getPath()); + asyncDelete(event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(path); + } catch (Exception e) { + log.error("Unable to delete {} due to ", path, e); + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/callbacks/JobAssigner.java b/src/main/java/com/umar/taskscheduler/callbacks/JobAssigner.java new file mode 100644 index 0000000..68ae241 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/callbacks/JobAssigner.java @@ -0,0 +1,175 @@ +package com.umar.taskscheduler.callbacks; + +import com.umar.taskscheduler.strategy.WorkerPickerStrategy; +import com.umar.taskscheduler.util.ZKUtils; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JobAssigner is responsible for assigning jobs to available workers in the distributed task scheduler. + * + * It uses a {@link WorkerPickerStrategy} to select a worker from the list of available workers + * and assigns the job to the chosen worker. This class also handles creating the assignment in + * ZooKeeper and cleaning up job-related data once the assignment is created. + * + * The assignments and job details are stored in ZooKeeper nodes, and any ZooKeeper-related + * operations (such as node creation and deletion) are performed asynchronously. + * + * Error handling is provided for connection loss and retries in case of failure during ZooKeeper interactions. + * + * Future improvements include atomic operations for assignment and deletion using MultiOp. + * + * @author Umar Mohammad + */ +@Slf4j +public class JobAssigner implements Runnable { + + private final CuratorFramework curator; + private final String jobID; + private final CuratorCache workersCache; + private final WorkerPickerStrategy workerPickerStrategy; + private final byte[] jobData; + private String workerName; + private static final Logger log = LoggerFactory.getLogger(JobAssigner.class); + + /** + * Constructor for JobAssigner. + * + * @param jobID The unique identifier of the job to be assigned. + * @param jobData The serialized data representing the job details. + * @param curator CuratorFramework instance to interact with ZooKeeper. + * @param workersCache Cache of available workers stored in ZooKeeper. + * @param workerPickerStrategy Strategy used to select a worker from the available workers. + */ + public JobAssigner( + String jobID, + byte[] jobData, + CuratorFramework curator, + CuratorCache workersCache, + WorkerPickerStrategy workerPickerStrategy) { + this.jobID = jobID; + this.curator = curator; + this.workersCache = workersCache; + this.workerPickerStrategy = workerPickerStrategy; + this.jobData = jobData; + } + + /** + * Run method that is executed when the job assigner is run in a separate thread. + * + *

This method selects a worker from the available workers using the provided strategy + * and assigns the incoming job to the selected worker. After assigning, it triggers the + * asynchronous creation of the assignment node in ZooKeeper. + */ + @Override + public void run() { + // Filter and collect available workers from the ZooKeeper cache + List workers = + workersCache.stream() + .filter(childData -> (childData.getPath().length() > ZKUtils.WORKERS_ROOT.length())) + .toList(); + + // Pick a worker using the provided strategy + ChildData chosenWorker = workerPickerStrategy.evaluate(workers); + workerName = ZKUtils.extractNode(chosenWorker.getPath()); + + log.info( + "Found total workers {}, Chosen worker index {}, worker name {}", + workers.size(), + chosenWorker, + workerName); + + // Asynchronously create the assignment for the chosen worker + asyncCreateAssignment(); + } + + /** + * Asynchronously creates an assignment for the selected worker in ZooKeeper. + * + *

This method creates a new persistent node in ZooKeeper representing the assignment + * of the job to the worker. It stores the job data directly in the assignment node to + * avoid additional network calls by the worker. The assignment node is deleted after + * successful job execution. + */ + private void asyncCreateAssignment() { + try { + curator + .create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info( + "Assignment created successfully for JobID {} with WorkerID {}", + jobID, + workerName); + + log.info("Performing async deletion of {}", ZKUtils.getJobsPath() + "/" + jobID); + asyncDelete(ZKUtils.getJobsPath() + "/" + jobID); + } + case CONNECTIONLOSS -> { + log.error("Lost connection to ZK while creating {}, retrying", event.getPath()); + asyncCreateAssignment(); + } + case NODEEXISTS -> log.warn("Assignment already exists for path {}", event.getPath()); + case NONODE -> log.error("Trying to create an assignment for a worker that does not exist {}", event); + default -> log.error("Unhandled event {} ", event); + } + } + }) + .forPath(ZKUtils.ASSIGNMENT_ROOT + "/" + workerName + "/" + jobID, jobData); + // Store the job data along with the assignment to avoid additional calls for job details. + // This simplifies recovery since unassigned jobs remain under /jobs. + } catch (Exception e) { + log.error("Error while creating assignment for {} with {}", jobID, workerName, e); + throw new RuntimeException(e); + } + } + + /** + * Asynchronously deletes the ZooKeeper node representing the job after assignment. + * + *

This method deletes the job node in ZooKeeper and retries in case of connection loss. + * + * @param path The path of the job node to be deleted in ZooKeeper. + */ + private void asyncDelete(String path) { + try { + curator + .delete() + .idempotent() + .guaranteed() + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> log.info("Path deleted successfully {}", event.getPath()); + case CONNECTIONLOSS -> { + log.info("Lost connection to ZK while deleting {}, retrying", event.getPath()); + asyncDelete(event.getPath()); + } + default -> log.error("Unhandled event {}", event); + } + } + }) + .forPath(path); + } catch (Exception e) { + log.error("Unable to delete {} due to ", path, e); + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/callbacks/JobsListener.java b/src/main/java/com/umar/taskscheduler/callbacks/JobsListener.java new file mode 100644 index 0000000..f82017a --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/callbacks/JobsListener.java @@ -0,0 +1,73 @@ +package com.umar.taskscheduler.callbacks; + +import com.umar.taskscheduler.strategy.WorkerPickerStrategy; +import com.umar.taskscheduler.util.ZKUtils; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JobsListener listens for newly created job nodes in ZooKeeper and assigns them to available workers. + * + * It uses a separate thread pool (via ExecutorService) to ensure that job assignment is handled + * asynchronously and doesn't block the ZooKeeper watcher thread. The job assignment is performed + * by the JobAssigner class, which assigns jobs based on a specified worker-picking strategy. + * + * This listener ensures that new jobs are processed efficiently without missing any events. + * + * Author: Umar Mohammad + */ +@Slf4j +public class JobsListener implements CuratorCacheListener { + + private final CuratorFramework curator; + private final CuratorCache workersCache; + private final ExecutorService executorService; + private final WorkerPickerStrategy workerPickerStrategy; + private static final Logger log = LoggerFactory.getLogger(JobsListener.class); + + /** + * Constructor for JobsListener. + * + * @param curator CuratorFramework instance to interact with ZooKeeper. + * @param workersCache Cache of available workers stored in ZooKeeper. + * @param workerPickerStrategy Strategy used to select a worker from the available workers. + */ + public JobsListener( + CuratorFramework curator, + CuratorCache workersCache, + WorkerPickerStrategy workerPickerStrategy) { + this.curator = curator; + this.workersCache = workersCache; + this.executorService = Executors.newSingleThreadExecutor(); // Use a single-threaded executor for job assignment + this.workerPickerStrategy = workerPickerStrategy; + } + + /** + * Handles ZooKeeper events related to jobs. + * + * When a new job node is created in ZooKeeper, this method extracts the job ID and submits it + * to the ExecutorService for assignment, ensuring that job assignment runs asynchronously. + * + * @param type The type of event (e.g., NODE_CREATED). + * @param oldData Data before the change (if applicable). + * @param data The current data of the node. + */ + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED && data.getPath().length() > ZKUtils.JOBS_ROOT.length()) { + String jobID = ZKUtils.extractNode(data.getPath()); + log.info("Found new job {}, passing it to executor service", jobID); + + // Submit the job to the executor service for assignment to a worker. + executorService.submit( + new JobAssigner(jobID, data.getData(), curator, workersCache, workerPickerStrategy)); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/callbacks/WorkersListener.java b/src/main/java/com/umar/taskscheduler/callbacks/WorkersListener.java new file mode 100644 index 0000000..748dd31 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/callbacks/WorkersListener.java @@ -0,0 +1,121 @@ +package com.umar.taskscheduler.callbacks; + +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.umar.taskscheduler.util.ZKUtils; + +/** + * WorkersListener listens for changes in worker nodes within ZooKeeper. It handles events such as the addition + * and removal of workers. In the event of a worker being lost (NODE_DELETED), the listener identifies jobs + * assigned to the lost worker and reassigns them by recreating job entries in ZooKeeper. + * + * Author: Umar Mohammad + */ +@Slf4j +public class WorkersListener implements CuratorCacheListener { + + private final CuratorCache assignmentCache; + private final CuratorFramework curator; + private static final Logger log = LoggerFactory.getLogger(WorkersListener.class); + + /** + * Constructor for WorkersListener. + * + * @param assignmentCache CuratorCache instance containing job assignments. + * @param curator CuratorFramework instance to interact with ZooKeeper. + */ + public WorkersListener(CuratorCache assignmentCache, CuratorFramework curator) { + this.assignmentCache = assignmentCache; + this.curator = curator; + } + + /** + * Handles ZooKeeper events related to workers. + * + * When a worker node is created, logs the event. When a worker node is deleted, it identifies jobs + * assigned to the lost worker and attempts to recreate them under the /jobs path for reassignment. + * + * @param type The type of event (e.g., NODE_CREATED, NODE_DELETED). + * @param oldData Data before the change (for NODE_DELETED). + * @param data The current data of the node (for NODE_CREATED). + */ + @Override + public void event(Type type, ChildData oldData, ChildData data) { + if (type == Type.NODE_CREATED) { + log.info("New worker found {} ", data.getPath()); + } else if (type == Type.NODE_DELETED) { + // Handle the event where a worker is lost + log.info("Lost worker {}", oldData.getPath()); + String lostWorkerID = oldData.getPath().substring(oldData.getPath().lastIndexOf('/') + 1); + Map assignableJobIds = new HashMap<>(); + + // Find jobs assigned to the lost worker + assignmentCache.stream() + .forEach(childData -> { + String path = childData.getPath(); + int begin = path.indexOf('/') + 1; + int end = path.indexOf('/', begin); + String pathWorkerID = path.substring(begin, end); + if (pathWorkerID.equals(lostWorkerID)) { + String jobID = path.substring(end + 1); + log.info("Found job {} assigned to lost worker {}", jobID, lostWorkerID); + assignableJobIds.put(jobID, childData.getData()); + } + }); + + // Recreate job entries in the /jobs path for reassignment + assignableJobIds.forEach( + (jobId, jobData) -> asyncCreateJob(ZKUtils.getJobsPath() + "/" + jobId, jobData)); + } + } + + /** + * Asynchronously recreates a job in ZooKeeper under the /jobs path. + * + * This method is used to recover jobs that were assigned to a lost worker, allowing the job to be reassigned + * to another worker. It handles retries in case of connection loss. + * + * @param path The path in ZooKeeper where the job will be recreated. + * @param data The serialized job data. + */ + private void asyncCreateJob(String path, byte[] data) { + try { + curator + .create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .inBackground( + new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> log.info("Job repaired successfully for {}", event.getPath()); + case CONNECTIONLOSS -> { + log.error("Lost connection to ZK while repairing job {}, retrying", event.getPath()); + asyncCreateJob(event.getPath(), (byte[]) event.getContext()); + } + case NODEEXISTS -> log.warn("Job already exists for path {}", event.getPath()); + default -> log.error("Unhandled event {}", event); + } + } + }, + data) + .forPath(path, data); + } catch (Exception e) { + log.error("Error while repairing job {}", path, e); + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/core/ZKDao.java b/src/main/java/com/umar/taskscheduler/core/ZKDao.java new file mode 100644 index 0000000..2ffd408 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/core/ZKDao.java @@ -0,0 +1,107 @@ +package com.umar.taskscheduler.core; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZKDao is responsible for interacting with ZooKeeper to perform CRUD operations. + * It provides methods to create, read, update, and delete nodes in ZooKeeper. + * + * This class serves as a DAO (Data Access Object) for abstracting ZooKeeper operations, + * ensuring separation of concerns and making the code more maintainable. + * + * Author: Umar Mohammad + */ +public class ZKDao { + + private final CuratorFramework curator; + private static final Logger log = LoggerFactory.getLogger(ZKDao.class); + + /** + * Constructor for ZKDao. + * + * @param curator CuratorFramework instance to interact with ZooKeeper. + */ + public ZKDao(CuratorFramework curator) { + this.curator = curator; + } + + /** + * Creates a persistent node in ZooKeeper. + * + * @param path The ZooKeeper path where the node will be created. + * @param data The data to store in the node. + * @throws Exception if an error occurs during node creation. + */ + public void createNode(String path, byte[] data) throws Exception { + try { + curator.create().forPath(path, data); + log.info("Node created successfully at path {}", path); + } catch (KeeperException.NodeExistsException e) { + log.warn("Node already exists at path {}", path); + } catch (Exception e) { + log.error("Error creating node at path {}", path, e); + throw e; + } + } + + /** + * Reads data from a node in ZooKeeper. + * + * @param path The ZooKeeper path of the node. + * @return The data stored in the node. + * @throws Exception if an error occurs during data retrieval. + */ + public byte[] getNodeData(String path) throws Exception { + try { + byte[] data = curator.getData().forPath(path); + log.info("Data retrieved successfully from path {}", path); + return data; + } catch (KeeperException.NoNodeException e) { + log.warn("Node not found at path {}", path); + return null; + } catch (Exception e) { + log.error("Error retrieving data from path {}", path, e); + throw e; + } + } + + /** + * Updates data in a node in ZooKeeper. + * + * @param path The ZooKeeper path of the node. + * @param data The new data to update in the node. + * @throws Exception if an error occurs during the update. + */ + public void updateNode(String path, byte[] data) throws Exception { + try { + curator.setData().forPath(path, data); + log.info("Node data updated successfully at path {}", path); + } catch (KeeperException.NoNodeException e) { + log.warn("Node not found at path {}", path); + } catch (Exception e) { + log.error("Error updating node at path {}", path, e); + throw e; + } + } + + /** + * Deletes a node in ZooKeeper. + * + * @param path The ZooKeeper path of the node to be deleted. + * @throws Exception if an error occurs during node deletion. + */ + public void deleteNode(String path) throws Exception { + try { + curator.delete().forPath(path); + log.info("Node deleted successfully at path {}", path); + } catch (KeeperException.NoNodeException e) { + log.warn("Node not found at path {}", path); + } catch (Exception e) { + log.error("Error deleting node at path {}", path, e); + throw e; + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/module/GuiceModule.java b/src/main/java/com/umar/taskscheduler/module/GuiceModule.java new file mode 100644 index 0000000..230e823 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/module/GuiceModule.java @@ -0,0 +1,40 @@ +package com.umar.taskscheduler.module; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +/** + * GuiceModule configures dependencies for the Distributed Task Scheduler application. + * + * This module provides an instance of CuratorFramework, which is used to interact with ZooKeeper. + * It uses a namespace to avoid conflicts in shared ZooKeeper clusters and implements a retry policy + * to handle connection issues. + * + * Author: Umar Mohammad + */ +public class GuiceModule extends AbstractModule { + + /** + * Provides an instance of CuratorFramework, which manages the connection to ZooKeeper. + * + * The CuratorFramework instance is configured with a retry policy and a namespace to + * handle failures and ensure task scheduler operations do not conflict with other applications. + * + * @return CuratorFramework instance for interacting with ZooKeeper. + */ + @Provides + public CuratorFramework curatorFramework() { + CuratorFramework curatorFramework = + CuratorFrameworkFactory.builder() + .namespace("TaskSchedulerV0") // Ensures isolation in shared ZooKeeper clusters + .connectString("127.0.0.1:2181") // Connects to local ZooKeeper instance + .retryPolicy(new ExponentialBackoffRetry(10, 1)) // Retry policy for handling connection issues + .build(); + + curatorFramework.start(); // Start the CuratorFramework client + return curatorFramework; + } +} diff --git a/src/main/java/com/umar/taskscheduler/resources/Client.java b/src/main/java/com/umar/taskscheduler/resources/Client.java new file mode 100644 index 0000000..4627d05 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/resources/Client.java @@ -0,0 +1,56 @@ +package com.umar.taskscheduler.resources; + +import com.google.inject.Inject; +import com.umar.taskscheduler.service.ClientService; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.QueryParam; +import java.io.Serializable; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; + +/** + * Client resource for handling client-related tasks in the Distributed Task Scheduler. + * + * This resource allows clients to create tasks via HTTP POST requests. + * It provides an endpoint to register a sum task, where two integers are added, and the result is logged. + * + * Author: Umar Mohammad + */ +@Slf4j +@Path("/v1/client") +public class Client { + + private final ClientService clientService; + + /** + * Constructor for Client resource, which initializes the ClientService using dependency injection. + * + * @param curator CuratorFramework instance used by the ClientService to interact with ZooKeeper. + */ + @Inject + public Client(CuratorFramework curator) { + this.clientService = new ClientService(curator); + } + + /** + * Registers a sum task as a job. + * + * This endpoint allows a client to submit two integers as query parameters, + * and a job is created to compute their sum. The job is registered using the ClientService. + * + * @param a The first integer. + * @param b The second integer. + * @return A response containing the job registration status or ID. + */ + @POST + public String createSumTask(@QueryParam("first") int a, @QueryParam("second") int b) { + // Create a job to compute the sum of two integers + Runnable jobDetail = (Runnable & Serializable) + (() -> System.out.println("Sum of " + a + " and " + b + " is " + (a + b))); + + // Register the job using ClientService and return the job status + return clientService.registerJob(jobDetail); + } +} diff --git a/src/main/java/com/umar/taskscheduler/resources/Job.java b/src/main/java/com/umar/taskscheduler/resources/Job.java new file mode 100644 index 0000000..36999bd --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/resources/Job.java @@ -0,0 +1,64 @@ +package com.umar.taskscheduler.resources; + +import com.google.inject.Inject; +import com.umar.taskscheduler.util.ZKUtils; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job resource for handling job creation in the Distributed Task Scheduler. + * + * This resource provides an endpoint for clients to submit job data via HTTP POST requests. + * The job is then registered in ZooKeeper using CuratorFramework, and a unique job ID is generated. + * + * Author: Umar Mohammad + */ +@Path("/v1/jobs") +@Slf4j +public class Job { + + private final CuratorFramework curator; + private static final Logger log = LoggerFactory.getLogger(Job.class); + + /** + * Constructor for the Job resource. + * + * @param curator CuratorFramework instance used for interacting with ZooKeeper. + */ + @Inject + public Job(CuratorFramework curator) { + this.curator = curator; + } + + /** + * Handles job creation requests. + * + * This method accepts job data in the form of a string, generates a unique job ID, + * and stores the job in ZooKeeper under the /jobs path. + * + * @param jobData The data associated with the job being submitted. + */ + @POST + public void createJob(String jobData) { + log.info("Received job data: {}", jobData); + try { + // Create a persistent node in ZooKeeper with a unique job ID + String jobPath = ZKUtils.getJobsPath() + "/" + UUID.randomUUID(); + curator.create() + .withMode(CreateMode.PERSISTENT) + .forPath(jobPath, jobData.getBytes()); + + log.info("Job created successfully at path {}", jobPath); + } catch (Exception e) { + log.error("Unable to submit job", e); + throw new RuntimeException("Job creation failed", e); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/resources/Worker.java b/src/main/java/com/umar/taskscheduler/resources/Worker.java new file mode 100644 index 0000000..7c8462e --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/resources/Worker.java @@ -0,0 +1,74 @@ +package com.umar.taskscheduler.resources; + +import com.google.inject.Inject; +import com.umar.taskscheduler.service.WorkerService; +import com.umar.taskscheduler.util.ZKUtils; + +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import org.apache.curator.framework.CuratorFramework; + +/** + * Worker resource for managing worker nodes in the Distributed Task Scheduler. + * + * This resource provides endpoints to stop a worker and retrieve the leader of the worker cluster. + * It interacts with ZooKeeper through CuratorFramework and utilizes the WorkerService to perform + * these actions. + * + * Author: Umar Mohammad + */ +@Path("/v1/workers") +public class Worker { + + private final CuratorFramework curator; + private WorkerService worker; + + /** + * Constructor for Worker resource. + * + * Initializes the WorkerService, which handles worker operations such as stopping a worker + * and retrieving the leader. + * + * @param curator CuratorFramework instance for interacting with ZooKeeper. + */ + @Inject + public Worker(CuratorFramework curator) { + this.curator = curator; + initWorker(); + } + + /** + * Initializes the WorkerService with the provided CuratorFramework and leader path. + */ + private void initWorker() { + worker = new WorkerService(curator, ZKUtils.LEADER_ROOT); + } + + /** + * Stops a worker based on the provided worker ID. + * + * This method calls the WorkerService to stop the worker associated with the given ID. + * + * @param id The ID of the worker to stop. + */ + @DELETE + @Path("/{id}") + public void stopWorker(@PathParam("id") String id) { + worker.stop(); + } + + /** + * Retrieves the ID of the current leader in the worker cluster. + * + * This method queries the WorkerService to return the leader's ID, or a message if no leader is found. + * + * @return The leader's ID, or "No leader found" if none is available. + */ + @GET + @Path("/leader") + public String getLeaderId() { + return worker.getLeader().orElse("No leader found"); + } +} diff --git a/src/main/java/com/umar/taskscheduler/service/ClientService.java b/src/main/java/com/umar/taskscheduler/service/ClientService.java new file mode 100644 index 0000000..954493d --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/service/ClientService.java @@ -0,0 +1,82 @@ +package com.umar.taskscheduler.service; + +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.umar.taskscheduler.util.ZKUtils; + +/** + * ClientService is responsible for registering jobs in ZooKeeper for the Distributed Task Scheduler. + * + * This service serializes job details (Runnable) and creates a persistent ZooKeeper node to store the job. + * It ensures each job is assigned a unique job ID, which is used as the path for the job in ZooKeeper. + * + * Author: Umar Mohammad + */ +@Slf4j +public class ClientService { + + private final CuratorFramework curator; + private static final Logger log = LoggerFactory.getLogger(ClientService.class); + + /** + * Constructor for ClientService. + * + * Initializes the service with a CuratorFramework instance for interacting with ZooKeeper. + * + * @param curator CuratorFramework instance to manage ZooKeeper interactions. + */ + public ClientService(CuratorFramework curator) { + this.curator = curator; + } + + /** + * Registers a job in ZooKeeper. + * + * The job is assigned a unique job ID, and the job details are serialized and stored + * in ZooKeeper at a path corresponding to the job ID. + * + * @param jobDetail The Runnable job to be registered. + * @return The unique job ID assigned to the registered job. + */ + public String registerJob(Runnable jobDetail) { + String jobId = UUID.randomUUID().toString(); // Generate a unique job ID + syncCreate(ZKUtils.getJobsPath() + "/" + jobId, jobDetail); // Create a ZNode for the job + return jobId; + } + + /** + * Synchronously creates a ZooKeeper node to store the job data. + * + * This method serializes the job details (Runnable) into a byte array and stores it + * in a persistent ZooKeeper node. It ensures the job is stored at the specified path. + * + * @param path The ZooKeeper path where the job will be stored. + * @param runnable The job to be serialized and stored. + */ + private void syncCreate(String path, Runnable runnable) { + try { + // Serialize the job details + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(runnable); + + // Create a persistent ZooKeeper node with the serialized job data + curator.create() + .idempotent() + .withMode(CreateMode.PERSISTENT) + .forPath(path, byteArrayOutputStream.toByteArray()); + + log.info("Job created successfully at path {}", path); + } catch (Exception e) { + log.error("Unable to create node at path {}", path, e); + throw new RuntimeException(e); // Rethrow the exception as a runtime exception + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/service/WorkerService.java b/src/main/java/com/umar/taskscheduler/service/WorkerService.java new file mode 100644 index 0000000..0168770 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/service/WorkerService.java @@ -0,0 +1,285 @@ +package com.umar.taskscheduler.service; + +import com.umar.taskscheduler.callbacks.AssignmentListener; +import com.umar.taskscheduler.callbacks.JobsListener; +import com.umar.taskscheduler.callbacks.WorkersListener; +import com.umar.taskscheduler.strategy.RoundRobinWorker; +import com.umar.taskscheduler.strategy.WorkerPickerStrategy; +import com.umar.taskscheduler.util.ZKUtils; + +import java.io.Closeable; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.leader.CancelLeadershipException; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * WorkerService manages the lifecycle of a worker node in the Distributed Task Scheduler. + * It handles the worker's registration, leader election, assignment watching, and interaction with ZooKeeper. + * The class implements {@link LeaderSelectorListener} for leader election events and {@link Closeable} + * to manage resource cleanup when a worker stops. + * + * Author: Umar Mohammad + */ +@Slf4j +@Getter +public class WorkerService implements LeaderSelectorListener, Closeable { + + private final LeaderSelector leaderSelector; + private final AtomicBoolean shouldStop = new AtomicBoolean(false); + private final CuratorFramework curator; + private final AtomicBoolean registrationRequired = new AtomicBoolean(true); + private final WorkerPickerStrategy workerPickerStrategy; + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private volatile String name; + private CuratorCache workersCache; + private CuratorCache jobsCache; + private CuratorCache assignmentCache; + private CuratorCacheListener workersListener; + private CuratorCacheListener jobsListener; + private CuratorCacheListener assignmentListener; + private static final Logger log = LoggerFactory.getLogger(WorkerService.class); + + /** + * Constructor for WorkerService. + * + * Initializes the worker node and starts leader election through the {@link LeaderSelector}. + * The worker node is registered in ZooKeeper, and background watches are set up. + * + * @param curator CuratorFramework instance for interacting with ZooKeeper. + * @param path The path for leader election in ZooKeeper. + */ + public WorkerService(CuratorFramework curator, String path) { + this.curator = curator; + leaderSelector = new LeaderSelector(curator, path, this); + leaderSelector.start(); + leaderSelector.autoRequeue(); + setup(); + workerPickerStrategy = new RoundRobinWorker(); + } + + /** + * Initializes worker registration and sets up ZooKeeper paths for jobs, assignments, and statuses. + */ + private void setup() { + registerWorker(); + asyncCreate(ZKUtils.getJobsPath(), CreateMode.PERSISTENT, null); + asyncCreate(ZKUtils.getAssignmentPath(name), CreateMode.PERSISTENT, null); + asyncCreate(ZKUtils.STATUS_ROOT, CreateMode.PERSISTENT, null); + } + + /** + * Registers the worker by creating an ephemeral node in ZooKeeper. + * + * The worker is assigned a unique name, and the assignment path is watched for job assignments. + */ + private void registerWorker() { + if (registrationRequired.get()) { + log.info("Attempting worker registration"); + name = UUID.randomUUID().toString(); + log.info("Generated a new random name for the worker: {}", name); + asyncCreate(ZKUtils.getWorkerPath(name), CreateMode.EPHEMERAL, registrationRequired); + asyncCreate(ZKUtils.getAssignmentPath(name), CreateMode.PERSISTENT, null); + watchAssignmentPath(); + } + } + + /** + * Asynchronously creates a ZooKeeper node with the specified path and mode. + * + * @param path The path of the ZooKeeper node. + * @param mode The mode for node creation (e.g., PERSISTENT, EPHEMERAL). + * @param context Optional context used for setting additional state. + */ + private void asyncCreate(String path, CreateMode mode, Object context) { + try { + curator.create() + .idempotent() + .creatingParentsIfNeeded() + .withMode(mode) + .inBackground((client, event) -> { + switch (KeeperException.Code.get(event.getResultCode())) { + case OK -> { + log.info("Path created successfully: {}", event.getPath()); + if (context != null) { + log.info("Setting the registration required field to false"); + ((AtomicBoolean) context).set(false); + } + } + case CONNECTIONLOSS -> { + log.error("Lost connection to ZooKeeper while creating {}, retrying", event.getPath()); + asyncCreate(event.getPath(), mode, context); + } + case NODEEXISTS -> log.warn("Node already exists at path: {}", event.getPath()); + default -> log.error("Unhandled event: {}", event); + } + }, context) + .forPath(path); + } catch (Exception e) { + log.error("Unable to create node at path: {}", path, e); + throw new RuntimeException(e); + } + } + + /** + * Watches the jobs and workers paths in ZooKeeper. Only the leader worker will perform this action. + */ + private void watchJobsAndWorkersPath() { + workersCache = CuratorCache.build(curator, ZKUtils.WORKERS_ROOT); + workersCache.start(); + log.info("Watching workers root path: {}", ZKUtils.WORKERS_ROOT); + workersListener = new WorkersListener(assignmentCache, curator); + workersCache.listenable().addListener(workersListener); + + jobsCache = CuratorCache.build(curator, ZKUtils.JOBS_ROOT); + log.info("Watching jobs root path: {}", ZKUtils.getJobsPath()); + jobsCache.start(); + jobsListener = new JobsListener(curator, workersCache, workerPickerStrategy); + jobsCache.listenable().addListener(jobsListener); + } + + /** + * Watches the assignment path for job assignments. + */ + private void watchAssignmentPath() { + assignmentCache = CuratorCache.build(curator, ZKUtils.getAssignmentPath(name)); + log.info("Watching assignment path: {}", ZKUtils.getAssignmentPath(name)); + assignmentCache.start(); + assignmentListener = new AssignmentListener(curator); + assignmentCache.listenable().addListener(assignmentListener); + } + + /** + * Deletes the worker node and removes the listeners for jobs and workers. + */ + private void destroy() { + log.info("Deleting worker path: {}", ZKUtils.getWorkerPath(name)); + try { + curator.delete().forPath(ZKUtils.getWorkerPath(name)); + } catch (Exception e) { + log.error("Unable to delete worker path: {}", ZKUtils.getWorkerPath(name), e); + } + log.info("Removing workers listener"); + workersCache.listenable().removeListener(workersListener); + workersCache.close(); + log.info("Removing jobs listener"); + jobsCache.listenable().removeListener(jobsListener); + jobsCache.close(); + } + + /** + * Closes the LeaderSelector and relinquishes leadership. + */ + @Override + public void close() { + leaderSelector.close(); + } + + /** + * Takes leadership and watches the job and worker paths. + * This method does not return until leadership is relinquished. + */ + @Override + public void takeLeadership(CuratorFramework client) { + log.info("{} is now the leader", name); + watchJobsAndWorkersPath(); + lock.lock(); + try { + while (!shouldStop.get()) { + condition.await(); + } + if (shouldStop.get()) { + log.warn("{} is signaled to stop!", name); + leaderSelector.close(); + } + } catch (InterruptedException e) { + log.error("Thread interrupted, exiting leadership", e); + } finally { + lock.unlock(); + } + } + + /** + * Handles connection state changes in ZooKeeper. + * + * Re-registers the worker on reconnection and manages the leadership state on connection loss. + */ + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.RECONNECTED) { + log.info("Reconnected to ZooKeeper, state: {}", newState); + registerWorker(); + } else if (newState == ConnectionState.LOST) { + log.error("Connection lost to ZooKeeper, giving up leadership: {}", newState); + registrationRequired.set(true); + assignmentCache.listenable().removeListener(assignmentListener); + assignmentCache.close(); + if (workersCache != null) { + workersCache.listenable().removeListener(workersListener); + workersCache.close(); + } + if (jobsCache != null) { + jobsCache.listenable().removeListener(jobsListener); + jobsCache.close(); + } + throw new CancelLeadershipException(); + } else if (newState == ConnectionState.SUSPENDED) { + log.error("Connection suspended to ZooKeeper: {}", newState); + } + } + + /** + * Stops the worker and relinquishes leadership if held. + */ + public void stop() { + log.warn("Sending stop signal to {}", name); + destroy(); + shouldStop.compareAndSet(false, true); + if (leaderSelector.hasLeadership()) { + log.warn("Giving up leadership: {}", name); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); + } + } else { + leaderSelector.close(); + } + } + + /** + * Returns the ID of the current leader in the worker cluster. + * + * @return The leader's ID, or an empty Optional if no leader is available. + */ + public Optional getLeader() { + try { + return Optional.of(leaderSelector.getLeader().getId()); + } catch (Exception e) { + log.error("Unable to retrieve leader information", e); + return Optional.empty(); + } + } +} diff --git a/src/main/java/com/umar/taskscheduler/strategy/RandomWorker.java b/src/main/java/com/umar/taskscheduler/strategy/RandomWorker.java new file mode 100644 index 0000000..f059995 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/strategy/RandomWorker.java @@ -0,0 +1,30 @@ +package com.umar.taskscheduler.strategy; + +import java.util.List; +import org.apache.curator.framework.recipes.cache.ChildData; + +/** + * RandomWorker is a strategy for selecting a worker node at random from the available workers. + * + * It implements the {@link WorkerPickerStrategy} interface and uses Java's {@link Math#random()} + * to randomly pick a worker from the list of workers. + * + * Author: Umar Mohammad + */ +public class RandomWorker implements WorkerPickerStrategy { + + /** + * Selects a random worker from the provided list of workers. + * + * This method uses {@link Math#random()} to generate a random index and selects the corresponding + * worker from the list. The chosen worker is returned for task assignment. + * + * @param workers The list of available workers. + * @return The randomly selected {@link ChildData} representing the chosen worker. + */ + @Override + public ChildData evaluate(List workers) { + int chosenWorker = (int) (Math.random() * workers.size()); + return workers.get(chosenWorker); + } +} diff --git a/src/main/java/com/umar/taskscheduler/strategy/RoundRobinWorker.java b/src/main/java/com/umar/taskscheduler/strategy/RoundRobinWorker.java new file mode 100644 index 0000000..70e7175 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/strategy/RoundRobinWorker.java @@ -0,0 +1,48 @@ +package com.umar.taskscheduler.strategy; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.framework.recipes.cache.ChildData; + +/** + * RoundRobinWorker is a strategy for selecting a worker node in a round-robin fashion. + * + * This class uses an {@link AtomicInteger} to keep track of the current index of the worker. + * The index is incremented each time a worker is selected, and it wraps around when the end of the list is reached. + * The atomic nature of the index ensures that it can safely be accessed from multiple threads. + * + * Author: Umar Mohammad + */ +public class RoundRobinWorker implements WorkerPickerStrategy { + + // AtomicInteger to keep track of the current worker index for thread-safe operations + private final AtomicInteger index = new AtomicInteger(0); + + /** + * Selects a worker in a round-robin manner from the provided list of workers. + * + * The method ensures that the worker index is updated atomically to allow for safe concurrent access. + * The index wraps around to zero when the end of the list is reached, ensuring all workers are selected evenly. + * + * @param workers The list of available workers. + * @return The selected {@link ChildData} representing the chosen worker. + */ + @Override + public ChildData evaluate(List workers) { + int chosenIndex; + + // Loop until a successful atomic update of the index is achieved + while (true) { + chosenIndex = index.get(); + int nextIndex = (chosenIndex + 1) < workers.size() ? (chosenIndex + 1) : 0; + + // Perform an atomic compare-and-set operation to update the index + if (index.compareAndSet(chosenIndex, nextIndex)) { + break; + } + } + + // Return the worker at the chosen index + return workers.get(chosenIndex); + } +} diff --git a/src/main/java/com/umar/taskscheduler/strategy/WorkerPickerStrategy.java b/src/main/java/com/umar/taskscheduler/strategy/WorkerPickerStrategy.java new file mode 100644 index 0000000..b9f93cc --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/strategy/WorkerPickerStrategy.java @@ -0,0 +1,23 @@ +package com.umar.taskscheduler.strategy; + +import java.util.List; +import org.apache.curator.framework.recipes.cache.ChildData; + +/** + * WorkerPickerStrategy defines the contract for selecting a worker from a list of available workers. + * + * Implementing classes will provide specific strategies (e.g., random, round-robin) for worker selection + * in a distributed task scheduler. The strategy determines which worker should be assigned a task. + * + * Author: Umar Mohammad + */ +public interface WorkerPickerStrategy { + + /** + * Evaluates the list of available workers and selects one based on the specific strategy. + * + * @param workers The list of available workers. + * @return The selected {@link ChildData} representing the chosen worker. + */ + ChildData evaluate(List workers); +} diff --git a/src/main/java/com/umar/taskscheduler/util/ZKUtils.java b/src/main/java/com/umar/taskscheduler/util/ZKUtils.java new file mode 100644 index 0000000..e98e671 --- /dev/null +++ b/src/main/java/com/umar/taskscheduler/util/ZKUtils.java @@ -0,0 +1,80 @@ +package com.umar.taskscheduler.util; + +import java.util.concurrent.TimeUnit; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * ZKUtils is a utility class that provides constants and helper methods for managing ZooKeeper paths + * and Time-To-Live (TTL) configurations for the Distributed Task Scheduler. + * + * This class includes predefined paths for workers, jobs, assignments, and status, + * along with methods to construct paths dynamically. + * + * Author: Umar Mohammad + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) // Private constructor to prevent instantiation +public class ZKUtils { + + // ZooKeeper root paths for workers, jobs, assignments, and status + public static final String WORKERS_ROOT = "/workers"; + public static final String JOBS_ROOT = "/jobs"; + public static final String ASSIGNMENT_ROOT = "/assignments"; + public static final String STATUS_ROOT = "/status"; + public static final String LEADER_ROOT = "/leader"; + + // Time-to-Live (TTL) for status nodes in milliseconds + public static final long STATUS_TTL_MILLIS = TimeUnit.MINUTES.toMillis(10); + + /** + * Constructs the ZooKeeper path for a specific worker. + * + * @param name The name or ID of the worker. + * @return The full ZooKeeper path for the worker node. + */ + public static String getWorkerPath(String name) { + return WORKERS_ROOT + "/" + name; + } + + /** + * Constructs the ZooKeeper path for a specific assignment. + * + * @param name The name or ID of the worker. + * @return The full ZooKeeper path for the assignment node. + */ + public static String getAssignmentPath(String name) { + return ASSIGNMENT_ROOT + "/" + name; + } + + /** + * Returns the root path for jobs in ZooKeeper. + * + * @return The root path for jobs. + */ + public static String getJobsPath() { + return JOBS_ROOT; + } + + /** + * Extracts the node name from a full ZooKeeper path. + * + * This method is useful for retrieving the worker ID or job ID from a full path. + * + * @param workerPath The full ZooKeeper path. + * @return The extracted node name (e.g., worker ID or job ID). + */ + public static String extractNode(String workerPath) { + int start = workerPath.lastIndexOf('/'); + return workerPath.substring(start + 1); + } + + /** + * Constructs the ZooKeeper path for a job's status. + * + * @param jobId The ID of the job. + * @return The full ZooKeeper path for the job's status. + */ + public static String getStatusPath(String jobId) { + return STATUS_ROOT + "/" + jobId; + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..cd064c3 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n diff --git a/src/test/java/com/umar/taskscheduler/DistributedTaskScheduler/AppTest.java b/src/test/java/com/umar/taskscheduler/DistributedTaskScheduler/AppTest.java new file mode 100644 index 0000000..657dd49 --- /dev/null +++ b/src/test/java/com/umar/taskscheduler/DistributedTaskScheduler/AppTest.java @@ -0,0 +1,44 @@ +package com.umar.taskscheduler.DistributedTaskScheduler; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for the App class in the Distributed Task Scheduler. + * + * This class uses JUnit to provide basic test functionality for the application. + * It extends {@link TestCase} to enable JUnit-style testing and includes a test suite + * for grouping test cases. + * + * Author: Umar Mohammad + */ +public class AppTest extends TestCase { + + /** + * Constructor for AppTest. + * + * @param testName The name of the test case. + */ + public AppTest(String testName) { + super(testName); + } + + /** + * Creates a test suite for the AppTest class. + * + * @return The suite of tests being tested. + */ + public static Test suite() { + return new TestSuite(AppTest.class); + } + + /** + * A simple test case to validate the App functionality. + * + * This test checks if a basic assertion passes, ensuring the setup is working correctly. + */ + public void testApp() { + assertTrue(true); + } +} diff --git a/src/test/java/com/umar/taskscheduler/strategy/RoundRobinWorkerTest.java b/src/test/java/com/umar/taskscheduler/strategy/RoundRobinWorkerTest.java new file mode 100644 index 0000000..e2302c8 --- /dev/null +++ b/src/test/java/com/umar/taskscheduler/strategy/RoundRobinWorkerTest.java @@ -0,0 +1,90 @@ +package com.umar.taskscheduler.strategy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.zookeeper.data.Stat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for the RoundRobinWorker class. + * + * These tests validate the round-robin worker selection strategy, ensuring that workers are selected + * in sequence both serially and concurrently. + * + * Author: Umar Mohammad + */ +public class RoundRobinWorkerTest { + + private List workers; + + /** + * Setup method to initialize the list of workers before each test. + */ + @BeforeEach + public void setup() { + workers = List.of( + new ChildData("/a", new Stat(), new byte[0]), + new ChildData("/b", new Stat(), new byte[0]), + new ChildData("/c", new Stat(), new byte[0]), + new ChildData("/d", new Stat(), new byte[0]), + new ChildData("/e", new Stat(), new byte[0]) + ); + } + + /** + * Test the round-robin evaluation in a serial fashion. + * + * This test ensures that workers are selected in sequence, wrapping around once the list is exhausted. + */ + @Test + public void testEvaluateSerially() { + RoundRobinWorker roundRobinWorker = new RoundRobinWorker(); + for (int i = 0; i < 6; i++) { + ChildData result = roundRobinWorker.evaluate(workers); + Assertions.assertEquals(workers.get(i % workers.size()), result); + } + } + + /** + * Test the round-robin evaluation in a concurrent setting. + * + * This test ensures that the worker selection strategy works correctly even when accessed concurrently. + * The test uses multiple threads to call the evaluate method simultaneously and validates that the correct + * workers are selected in sequence. + */ + @Test + public void testEvaluateConcurrently() throws ExecutionException, InterruptedException { + RoundRobinWorker roundRobinWorker = new RoundRobinWorker(); + ExecutorService executorService = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + + // Create concurrent tasks for worker selection + for (int i = 0; i < 6; i++) { + CompletableFuture future = CompletableFuture.supplyAsync(() -> roundRobinWorker.evaluate(workers), executorService); + futures.add(future); + } + + // Aggregate the results from the futures + CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); + for (CompletableFuture future : futures) { + aggregate = aggregate.thenCompose(list -> { + try { + list.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(list); + }); + } + + // Validate that workers were selected correctly in sequence + List results = aggregate.join(); + for (int i = 0; i < 6; i++) { + Assertions.assertEquals(workers.get(i % workers.size()), results.get(i)); + } + } +}