Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling loader #2111

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,12 @@ services:
depends_on:
- activemq

gtas-loader-1:
container_name: loader-runner-1
gtas-loader:
image: wcogtas/gtas-loader-runner:dev
deploy:
replicas: 1
build:
context: .
dockerfile: ./gtas-parent/gtas-loader-runner/gtas-loader-runner.Dockerfile
environment:
LOADER_NAME: GTAS_LOADER_1
LOADER_COUNTRY: USA
LOADER_PERMITS: 300
INBOUND_MESSAGE_FOLDER: /usr/local/gtas-data/input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,50 @@ public class LoaderWorker extends BaseEntityAudit {
@Column(name="bucketCount")
private int bucketCount = 0;

@Column(name="active")
private Boolean active;

@Column(name="assigned_queue")
private String assignedQueue;

@Column(name="permits_free")
private int permitsFree = 0;

/**
* @return the permitsFree
*/
public int getPermitsFree() {
return permitsFree;
}

/**
* @param permitsFree the permitsFree to set
*/
public void setPermitsFree(int permitsFree) {
this.permitsFree = permitsFree;
}

/**
* @return the assignedQueue
*/
public String getAssignedQueue() {
return assignedQueue;
}

/**
* @param assignedQueue the assignedQueue to set
*/
public void setAssignedQueue(String assignedQueue) {
this.assignedQueue = assignedQueue;
}

/**
* @param active the active to set
*/
public void setActive(Boolean active) {
this.active = active;
}

@Override
public int hashCode() {
return Objects.hash(workerName);
Expand Down Expand Up @@ -49,6 +93,20 @@ public String getWorkerName() {
return workerName;
}

/**
* @return active status
*/
public Boolean getActive() {
return active;
}

/**
* @return set active status
*/
public void setActive(boolean active) {
this.active = active;
}

/**
* @param workerName the workerName to set
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.gtas.repository;

import java.util.List;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
Expand All @@ -10,5 +11,8 @@ public interface LoaderWorkerRepository extends CrudRepository<LoaderWorker, Lon

@Query("Select lw from LoaderWorker lw where lw.workerName = :name")
public LoaderWorker findByWorkerName(@Param("name")String name);

@Query("Select lw from LoaderWorker lw where lw.active = true")
public List<LoaderWorker> loadActiveWorkers();

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
application.version=1.14
entitymanager.packages.to.scan=gov.gtas
site.language=en
loader.queue.names=GTAS_LOADER_1,GTAS_LOADER_2,GTAS_LOADER_3,GTAS_LOADER_4,GTAS_LOADER_5
loader.queue.names=5
##Web app settings##
ui.address=http://localhost:3000

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package gov.gtas.job.scheduler;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;

import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -40,9 +46,7 @@ public class LoaderDistributor {

private String errorstr;

private LinkedMap<String, String> loaderNames;

private List<String> allQueueNames;
private LinkedMap<String, LoaderWorker> loaderNames;

private int indexNumber = 0;

Expand All @@ -51,44 +55,54 @@ public class LoaderDistributor {
private final LoaderWorkerRepository loaderWorkerRepository;

private final ManualMover manualMover;

private final List<String> loaderQueueNameOptions;

public LoaderDistributor(JmsTemplate jmsTemplateFile, FlightLoaderRepository flightLoaderRepository,
EventIdentifierFactory eventIdentifierFactory, @Value("${message.dir.error}") String errorstr,
LoaderWorkerRepository loaderWorkerRepository, @Value("${activemq.broker.url}")String brokerUrl,
EventIdentifierFactory eventIdentifierFactory,
@Value("${message.dir.error}") String errorstr,
LoaderWorkerRepository loaderWorkerRepository,
@Value("${activemq.broker.url}")String brokerUrl,
@Value("${inbound.loader.jms.queue}")String distributorQueue,
@Value("#{'${loader.queue.names}'.split(',')}") List<String> allQueueNames) {
@Value("${loader.queue.names}") Integer numberOfQueues) {
this.jmsTemplateFile = jmsTemplateFile;
this.flightLoaderRepository = flightLoaderRepository;
this.eventIdentifierFactory = eventIdentifierFactory;
this.errorstr = errorstr;
this.loaderNames = new LinkedMap<>();
this.allQueueNames = allQueueNames;
this.loaderWorkerRepository = loaderWorkerRepository;
//We do not want spring to manage this as it creates and destroys connections to the queue.
this.manualMover = new ManualMover(brokerUrl, distributorQueue);
List<String> loaderQueueNameOptions = new ArrayList<>();
for (int i = 1; i <= numberOfQueues; i++) {
loaderQueueNameOptions.add("GTAS_LOADER_" + i);
}
this.loaderQueueNameOptions = loaderQueueNameOptions;

}

@Scheduled(fixedDelayString = "${loader.purge.check}", initialDelayString = "30000")
public void checkAllQueues() {
logger.info("Starting queue check job");
// Check all queues to retrieve stale messages
for (String lwName : this.allQueueNames) {
Iterable<LoaderWorker> loaderWorkers = loaderWorkerRepository.findAll();
Map<String, LoaderWorker> loadersInUse = new HashMap<>();
for (LoaderWorker lw : loaderWorkers) {
if (lw.getAssignedQueue() != null && StringUtils.isNotBlank(lw.getAssignedQueue())) {
loadersInUse.put(lw.getAssignedQueue(), lw);
}
}
for (String queueName : loaderQueueNameOptions) {
try {
if (!loaderNames.containsKey(lwName)) {
logger.info("Queue " + lwName + " not active - Moving any old messages from " + lwName + " to GTAS distributor.");
manualMover.purgeQueue(lwName);
if (!loadersInUse.containsKey(queueName)) {
logger.info("Queue " + queueName + " not active - Moving any old messages from " + queueName + " to GTAS distributor.");
manualMover.purgeQueue(queueName);
} else {
LoaderWorker lw = loaderWorkerRepository.findByWorkerName(lwName);
if (lw == null) {
logger.info("Stale registration/queue detected, moving messages to gtas distributor and removing queue from GTAS distributor: " + lwName);
manualMover.purgeQueue(lwName);
removeQueue(lwName);
} else {
logger.info("Active loader: " + lwName + ". No action taken.");
}
LoaderWorker lw = loadersInUse.get(queueName);
logger.info("Active loader: " + lw.getWorkerName() + " working on assigned queue " + lw.getAssignedQueue() + " . No action taken.");
}
} catch (JMSException jme) {
logger.error("Failed queue purge for queue: " + lwName, jme);
logger.error("Failed queue purge for queue: " + queueName, jme);
}
}
logger.info("Queue check job completed");
Expand All @@ -100,33 +114,84 @@ public void loaderHealthChecks() {
Date now = new Date();
Date healthCheckCutOff = DateUtils.addMinutes(now, -5);
Iterable<LoaderWorker> loaderWorkers = loaderWorkerRepository.findAll();
Map<String, LoaderWorker> loadersInUse = new HashMap<>();
Set<LoaderWorker> duplicateHashLoaders = new HashSet<>();
for (LoaderWorker lw : loaderWorkers) {
if (lw.getAssignedQueue() != null && StringUtils.isNotBlank(lw.getAssignedQueue())) {
if (loadersInUse.containsKey(lw.getAssignedQueue()) || "DUPLICATE".equals(lw.getAssignedQueue())) {
logger.info("Duplicate key detected! Reassignment of loader worker needed!");
lw.setAssignedQueue(null);
duplicateHashLoaders.add(lw);
} else {
loadersInUse.put(lw.getAssignedQueue(), lw);
}
}
}
Set<String> queuesInUse = new HashSet<>();

for (LoaderWorker lw : loaderWorkers) {
String lwName = lw.getWorkerName();
if (!this.loaderNames.containsKey(lwName)) {
// Condition 1, new loaderworker:
this.loaderNames.put(lw.getWorkerName(), lwName);
logger.info("Registered new loader worker " + lwName);
} else if (lw.getUpdatedAt().before(healthCheckCutOff)) {
// Condition 2, old loaderworker
String lwName = lw.getAssignedQueue();

//Assign and register new queues as needed.
if (StringUtils.isBlank(lwName) || "DUPLICATE".equals(lwName)) {
String assignedQueue = assignQueue(lw);
if (StringUtils.isBlank(assignedQueue) || "DUPLICATE".equals(assignedQueue)) {
logger.info("Unable to assign new queue for duplicate! Setting or keeping as DUPLICATE queue!");
lw.setAssignedQueue("DUPLICATE");
lwName = "DUPLICATE";
} else {
lw.setAssignedQueue(assignedQueue);
lwName = lw.getAssignedQueue();
logger.info("Registered new loader worker " + lw.getAssignedQueue());

}
loaderWorkerRepository.save(lw);
}

if (lwName != null && !"DUPLICATE".equals(lwName)
&& !this.loaderNames.containsKey(lwName)) {
logger.info("Caught orphan queue - registered queue " + lwName);
addQueue(lw);
}

if (lw.getUpdatedAt().before(healthCheckCutOff)) {
// Condition 1, old loaderworker needs to be deregistered.
if (this.loaderNames.containsKey(lwName)) {
removeQueue(lwName);
}
loaderWorkerRepository.delete(lw);
logger.info("Removed old loader worker " + lwName);
} else {
// Condition 3, loader in use.
} else if (this.loaderNames.containsKey(lwName)){
// Condition 2, loader in use.
queuesInUse.add(lwName);
logger.info("Queue in use: " + lwName + " processing "
+ lw.getBucketCount() + " files.");
+ lw.getBucketCount() + " flights with " + lw.getPermitsFree() + " permits free.");
} else {
// Condition 3, loader worker unable to register to a queue.
logger.info("\n**************************** \n"
+ "Unable to register worker to queue. \nEither increase number of allowed queues or decrease loader workers. \n"
+ "This does not effect the system integrity but does waste system resources. \n"
+ "Queue impacted: " + lw.getWorkerName() + ".\nIf not duplicate queue setting to DUPLICATE.\n"
+ "**************************** \n");
lw.setAssignedQueue("DUPLICATE");
loaderWorkerRepository.save(lw);
}
}
//Copy the key set - otherwise removing will impact the LinkedMap.
Set<String> keys = new HashSet<>(this.loaderNames.keySet());
keys.removeAll(queuesInUse);
for(String orphanQueue : keys) {
this.loaderNames.remove(orphanQueue);
logger.info("removed orphan " + orphanQueue);
}
logger.info("Register/De-register job completed");
}

public void distributeToQueue(MessageWrapper mw) throws InterruptedException {
EventIdentifier ei = null;
lock.lock();
try {
while (loaderNames.isEmpty()) {
while (this.loaderNames.isEmpty()) {
logger.error("No Loaders!!!! Sleeping thread for 1 minute and trying again.");
Thread.sleep(60000);
}
Expand Down Expand Up @@ -159,23 +224,23 @@ public void distributeToQueue(MessageWrapper mw) throws InterruptedException {
fl = new FlightLoader();
fl.setCreatedBy("LOADER");
fl.setIdTag(ei.getIdentifier());
if (indexNumber == loaderNames.size()) {
if (indexNumber == this.loaderNames.size()) {
indexNumber = 0;
}
loaderDestination = loaderNames.get(indexNumber);
loaderDestination = this.loaderNames.get(indexNumber);
indexNumber++;
fl.setLoaderName(loaderDestination);
flightLoaderRepository.save(fl);
} else if (loaderNames.containsKey(fl.getLoaderName())) {
} else if (this.loaderNames.containsKey(fl.getLoaderName())) {
logger.info("Association found, forwarding message");
loaderDestination = fl.getLoaderName();
} else {
logger.info("Loader assocation for flight " + ei.getIdentifier()
+ " is stale. Creating new loader association!");
if (indexNumber >= loaderNames.size()) {
if (indexNumber >= this.loaderNames.size()) {
indexNumber = 0;
}
loaderDestination = loaderNames.get(indexNumber);
loaderDestination = this.loaderNames.get(indexNumber);
indexNumber++;
fl.setLoaderName(loaderDestination);
fl.setUpdatedBy("LOADER");
Expand Down Expand Up @@ -218,21 +283,37 @@ public void removeQueue(String queueName) {
}
}

public String assignQueue() {
private void addQueue(LoaderWorker lw) {
lock.lock();
try {
if (lw.getAssignedQueue() == null) {
throw new RuntimeException("Can not register null queue!");
}
this.loaderNames.put(lw.getAssignedQueue(), lw);
} catch(Exception e) {
logger.error("", e);
}
finally {
lock.unlock();
}
}

private String assignQueue(LoaderWorker lw) {
lock.lock();
String openQueue = "";
try {
// Always give the first open queue
boolean thereIsAnOpenQueue = false;
for (String queueName : allQueueNames) {
if (!loaderNames.containsKey(queueName)) {
for (String queueOption : loaderQueueNameOptions) {
if (!this.loaderNames.containsKey(queueOption)) {
thereIsAnOpenQueue = true;
openQueue = queueName;
openQueue = queueOption;
lw.setAssignedQueue(openQueue);
break;
}
}
if (thereIsAnOpenQueue) {
loaderNames.put(openQueue, openQueue);
this.loaderNames.put(openQueue, lw);
logger.info("New queue assigned - " + openQueue);
} else {
logger.error("There are no open queues");
Expand Down
Loading