Skip to content

Commit

Permalink
fix(changelog): cannot rollback update changelog status (#4198)
Browse files Browse the repository at this point in the history
* revert commit and catch exception in alterScheduleTask

* rsp comments
  • Loading branch information
guowl3 authored Jan 23, 2025
1 parent 3b1b109 commit ef2f090
Showing 1 changed file with 67 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;

import com.alibaba.fastjson.JSONObject;
Expand Down Expand Up @@ -226,6 +227,8 @@ public class ScheduleService {

@Autowired
private ScheduleDescriptionGenerator descriptionGenerator;
@Autowired
private TransactionTemplate txTemplate;

private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE;

Expand Down Expand Up @@ -431,71 +434,77 @@ private void validateTriggerConfig(TriggerConfig triggerConfig) {
}
}

@Transactional(rollbackFor = Exception.class)
public void executeChangeSchedule(ScheduleChangeParams req) {
try {
// start change quartzJob
boolean isSuccess = Boolean.TRUE.equals(txTemplate.execute(status -> {
Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId());
// start to change schedule
switch (req.getOperationType()) {
case CREATE:
case RESUME: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED);
break;
}
case UPDATE: {
ScheduleEntity entity = nullSafeGetById(req.getScheduleId());
entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters()));
entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig()));
entity.setDescription(req.getUpdateScheduleReq().getDescription());
entity.setStatus(ScheduleStatus.ENABLED);
PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq");
if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) {
DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq()
.getParameters();
parameters.getRateLimit().setOrderId(req.getScheduleId());
dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit());
try {
// start to change schedule
switch (req.getOperationType()) {
case CREATE:
case RESUME: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED);
break;
}
if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) {
DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq()
.getParameters();
parameters.getRateLimit().setOrderId(req.getScheduleId());
dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit());
case UPDATE: {
ScheduleEntity entity = nullSafeGetById(req.getScheduleId());
entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters()));
entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig()));
entity.setDescription(req.getUpdateScheduleReq().getDescription());
entity.setStatus(ScheduleStatus.ENABLED);
PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq");
if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) {
DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq()
.getParameters();
parameters.getRateLimit().setOrderId(req.getScheduleId());
dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit());
}
if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) {
DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq()
.getParameters();
parameters.getRateLimit().setOrderId(req.getScheduleId());
dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit());
}
targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity));
break;
}
targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity));
break;
}
case PAUSE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE);
break;
}
case TERMINATE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED);
break;
}
case DELETE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED);
break;
case PAUSE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE);
break;
}
case TERMINATE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED);
break;
}
case DELETE: {
scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED);
break;
}
default:
throw new UnsupportedException();
}
default:
throw new UnsupportedException();

// start change quartzJob
ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam();
quartzJobReq.setOperationType(req.getOperationType());
quartzJobReq.setJobName(targetSchedule.getId().toString());
quartzJobReq.setJobGroup(targetSchedule.getType().name());
quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig());
quartzJobService.changeQuartzJob(quartzJobReq);
return true;
} catch (Exception e) {
log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(),
req.getOperationType(), req.getScheduleChangeLogId(), e);
status.setRollbackOnly();
return false;
}
}));

// start change quartzJob
ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam();
quartzJobReq.setOperationType(req.getOperationType());
quartzJobReq.setJobName(targetSchedule.getId().toString());
quartzJobReq.setJobGroup(targetSchedule.getType().name());
quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig());
quartzJobService.changeQuartzJob(quartzJobReq);
scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.SUCCESS);
log.info("Change schedule success,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(),
req.getOperationType(), req.getScheduleChangeLogId());
} catch (Exception e) {
log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", req.getScheduleId(),
req.getOperationType(), req.getScheduleChangeLogId(), e);
scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.FAILED);
throw e;
}
scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(),
isSuccess ? ScheduleChangeStatus.SUCCESS : ScheduleChangeStatus.FAILED);
log.info("Change schedule completed,scheduleId={},operationType={},changelogId={},status={}",
req.getScheduleId(),
req.getOperationType(), req.getScheduleChangeLogId(), isSuccess ? "SUCCESS" : "FAILED");

}

Expand Down

0 comments on commit ef2f090

Please sign in to comment.