diff --git a/src/main/java/com/uber/cadence/migration/MigrationActivities.java b/src/main/java/com/uber/cadence/migration/MigrationActivities.java new file mode 100644 index 000000000..841fcf332 --- /dev/null +++ b/src/main/java/com/uber/cadence/migration/MigrationActivities.java @@ -0,0 +1,31 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionResponse; +import com.uber.cadence.activity.ActivityMethod; + +public interface MigrationActivities { + @ActivityMethod + StartWorkflowExecutionResponse startWorkflowInNewDomain(StartWorkflowExecutionRequest request); + + @ActivityMethod + void cancelWorkflowInCurrentDomain(RequestCancelWorkflowExecutionRequest request); +} diff --git a/src/main/java/com/uber/cadence/migration/MigrationActivitiesImpl.java b/src/main/java/com/uber/cadence/migration/MigrationActivitiesImpl.java new file mode 100644 index 000000000..95606f041 --- /dev/null +++ b/src/main/java/com/uber/cadence/migration/MigrationActivitiesImpl.java @@ -0,0 +1,53 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionResponse; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.workflow.Workflow; + +public class MigrationActivitiesImpl implements MigrationActivities { + private final WorkflowClient clientInCurrDomain, clientInNewDomain; + + public MigrationActivitiesImpl( + WorkflowClient clientInCurrDomain, WorkflowClient clientInNewDomain) { + this.clientInCurrDomain = clientInCurrDomain; + this.clientInNewDomain = clientInNewDomain; + } + + @Override + public StartWorkflowExecutionResponse startWorkflowInNewDomain( + StartWorkflowExecutionRequest request) { + try { + return clientInNewDomain.getService().StartWorkflowExecution(request); + } catch (Exception e) { + throw Workflow.wrap(e); + } + } + + @Override + public void cancelWorkflowInCurrentDomain(RequestCancelWorkflowExecutionRequest request) { + try { + clientInCurrDomain.getService().RequestCancelWorkflowExecution(request); + } catch (Exception e) { + throw Workflow.wrap(e); + } + } +} diff --git a/src/main/java/com/uber/cadence/migration/MigrationIWorkflowService.java b/src/main/java/com/uber/cadence/migration/MigrationIWorkflowService.java new file mode 100644 index 000000000..d08017ff9 --- /dev/null +++ b/src/main/java/com/uber/cadence/migration/MigrationIWorkflowService.java @@ -0,0 +1,422 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import com.google.common.base.Strings; +import com.uber.cadence.*; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.IWorkflowServiceBase; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.apache.thrift.TException; + +public class MigrationIWorkflowService extends IWorkflowServiceBase { + + private IWorkflowService serviceOld, serviceNew; + private String domainOld, domainNew; + private static final int _defaultPageSize = 10; + private static final String _listWorkflow = "_listWorkflow"; + private static final String _scanWorkflow = "_scanWorkflow"; + byte[] _marker = "to".getBytes(); + + public MigrationIWorkflowService( + IWorkflowService serviceOld, + String domainOld, + IWorkflowService serviceNew, + String domainNew) { + this.serviceOld = serviceOld; + this.domainOld = domainOld; + this.serviceNew = serviceNew; + this.domainNew = domainNew; + } + + @Override + public StartWorkflowExecutionResponse StartWorkflowExecution( + StartWorkflowExecutionRequest startRequest) throws TException { + + if (shouldStartInNew(startRequest.getWorkflowId())) + return serviceNew.StartWorkflowExecution(startRequest); + + return serviceOld.StartWorkflowExecution(startRequest); + } + + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) throws TException { + if (shouldStartInNew(signalWithStartRequest.getWorkflowId())) + return serviceNew.SignalWithStartWorkflowExecution(signalWithStartRequest); + return serviceOld.SignalWithStartWorkflowExecution(signalWithStartRequest); + } + + @Override + public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest getRequest) throws TException { + if (shouldStartInNew(getRequest.execution.getWorkflowId())) + return serviceNew.GetWorkflowExecutionHistory(getRequest); + return serviceOld.GetWorkflowExecutionHistory(getRequest); + } + + private ListWorkflowExecutionsResponse callOldCluster( + ListWorkflowExecutionsRequest listWorkflowExecutionsRequest, + int pageSizeOverride, + String searchType) + throws TException { + + if (pageSizeOverride != 0) { + listWorkflowExecutionsRequest.setPageSize(pageSizeOverride); + } + ListWorkflowExecutionsResponse response = new ListWorkflowExecutionsResponse(); + if (searchType.equals(_listWorkflow)) { + response = serviceOld.ListWorkflowExecutions(listWorkflowExecutionsRequest); + } else if (searchType.equals(_scanWorkflow)) { + response = serviceOld.ScanWorkflowExecutions(listWorkflowExecutionsRequest); + } + return response; + } + + private ListWorkflowExecutionsResponse appendResultsFromOldCluster( + ListWorkflowExecutionsRequest listWorkflowExecutionsRequest, + ListWorkflowExecutionsResponse response, + String searchType) + throws TException { + int responsePageSize = response.getExecutions().size(); + int neededPageSize = listWorkflowExecutionsRequest.getPageSize() - responsePageSize; + + ListWorkflowExecutionsResponse fromResponse = + callOldCluster(listWorkflowExecutionsRequest, neededPageSize, searchType); + + // if old cluster is empty + if (fromResponse == null) { + return response; + } + + fromResponse.getExecutions().addAll(response.getExecutions()); + return fromResponse; + } + + public boolean hasPrefix(byte[] s, byte[] prefix) { + return s == null + ? false + : s.length >= prefix.length + && Arrays.equals(Arrays.copyOfRange(s, 0, prefix.length), prefix); + } + + /** + * This method handles pagination and combines results from both the new and old workflow service + * clusters. The method first checks if the nextPageToken is not set or starts with the marker + * (_marker) to determine if it should query the new cluster (serviceNew) or combine results from + * both the new and old clusters. If nextPageToken is set and doesn't start with the marker, it + * queries the old cluster (serviceOld). In case the response from the new cluster is null, it + * retries the request on the old cluster. If the number of workflow executions returned by the + * new cluster is less than the pageSize, it appends results from the old cluster to the response. + * + * @param listRequest The ListWorkflowExecutionsRequest containing the query parameters, including + * domain, nextPageToken, pageSize, and other filtering options. + * @return The ListWorkflowExecutionsResponse containing a list of WorkflowExecutionInfo + * representing the workflow executions that match the query criteria. The response also + * includes a nextPageToken to support pagination. + * @throws TException if there's any communication error with the underlying workflow service. + * @throws BadRequestError if the provided ListWorkflowExecutionsRequest is invalid (null or lacks + * a domain). + */ + @Override + public ListWorkflowExecutionsResponse ListWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest) throws TException { + + if (listRequest == null) { + throw new BadRequestError("List request is null"); + } else if (Strings.isNullOrEmpty(listRequest.getDomain())) { + throw new BadRequestError("Domain is null or empty"); + } + if (!listRequest.isSetPageSize()) { + listRequest.pageSize = _defaultPageSize; + } + + if (!listRequest.isSetNextPageToken() + || listRequest.getNextPageToken().length == 0 + || hasPrefix(listRequest.getNextPageToken(), _marker)) { + if (hasPrefix(listRequest.getNextPageToken(), _marker) == true) { + listRequest.setNextPageToken( + Arrays.copyOfRange( + listRequest.getNextPageToken(), + _marker.length, + listRequest.getNextPageToken().length)); + } + ListWorkflowExecutionsResponse response = serviceNew.ListWorkflowExecutions(listRequest); + if (response == null) return callOldCluster(listRequest, 0, _listWorkflow); + + if (response.getExecutions().size() < listRequest.getPageSize()) { + return appendResultsFromOldCluster(listRequest, response, _listWorkflow); + } + + byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length]; + System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length); + System.arraycopy( + response.getNextPageToken(), + 0, + combinedNextPageToken, + _marker.length, + response.getNextPageToken().length); + response.setNextPageToken(combinedNextPageToken); + return response; + } + return callOldCluster(listRequest, 0, _listWorkflow); + } + + /** + * Scans workflow executions based on the provided request parameters, handling pagination and + * combining results from the new and old clusters. The method queries the new cluster + * (serviceNew) if nextPageToken is not set or starts with the marker (_marker). Otherwise, it + * queries the old cluster (serviceOld). Results from the old cluster are appended if needed to + * maintain correct pagination. + * + * @param listRequest The ListWorkflowExecutionsRequest containing query parameters. + * @return The ListWorkflowExecutionsResponse with WorkflowExecutionInfo and nextPageToken. + * @throws TException if there's any communication error with the workflow service. + * @throws BadRequestError if the provided ListWorkflowExecutionsRequest is invalid. + */ + @Override + public ListWorkflowExecutionsResponse ScanWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest) throws TException { + ListWorkflowExecutionsResponse response; + if (listRequest == null) { + throw new BadRequestError("List request is null"); + } else if (Strings.isNullOrEmpty(listRequest.getDomain())) { + throw new BadRequestError("Domain is null or empty"); + } + if (!listRequest.isSetPageSize()) { + listRequest.pageSize = _defaultPageSize; + } + + if (!listRequest.isSetNextPageToken() + || listRequest.getNextPageToken().length == 0 + || hasPrefix(listRequest.getNextPageToken(), _marker)) { + if (hasPrefix(listRequest.getNextPageToken(), _marker)) { + listRequest.setNextPageToken( + Arrays.copyOfRange( + listRequest.getNextPageToken(), + _marker.length, + listRequest.getNextPageToken().length)); + } + response = serviceNew.ScanWorkflowExecutions(listRequest); + if (response == null) return callOldCluster(listRequest, 0, _scanWorkflow); + + if (response.getExecutions().size() < listRequest.getPageSize()) { + return appendResultsFromOldCluster(listRequest, response, _scanWorkflow); + } + + byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length]; + System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length); + System.arraycopy( + response.getNextPageToken(), + 0, + combinedNextPageToken, + _marker.length, + response.getNextPageToken().length); + response.setNextPageToken(combinedNextPageToken); + return response; + } + return callOldCluster(listRequest, 0, _scanWorkflow); + } + + @Override + public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions( + ListOpenWorkflowExecutionsRequest listRequest) throws TException { + ListOpenWorkflowExecutionsResponse response; + if (listRequest == null) { + throw new BadRequestError("List request is null"); + } else if (Strings.isNullOrEmpty(listRequest.getDomain())) { + throw new BadRequestError("Domain is null or empty"); + } + if (!listRequest.isSetMaximumPageSize()) { + listRequest.maximumPageSize = _defaultPageSize; + } + + if (!listRequest.isSetNextPageToken() + || listRequest.getNextPageToken().length == 0 + || hasPrefix(listRequest.getNextPageToken(), _marker)) { + if (hasPrefix(listRequest.getNextPageToken(), _marker)) { + listRequest.setNextPageToken( + Arrays.copyOfRange( + listRequest.getNextPageToken(), + _marker.length, + listRequest.getNextPageToken().length)); + } + response = serviceNew.ListOpenWorkflowExecutions(listRequest); + if (response == null) return serviceOld.ListOpenWorkflowExecutions(listRequest); + + if (response.getExecutionsSize() < listRequest.getMaximumPageSize()) { + int neededPageSize = listRequest.getMaximumPageSize() - response.getExecutionsSize(); + ListOpenWorkflowExecutionsRequest copiedRequest = + new ListOpenWorkflowExecutionsRequest(listRequest); + copiedRequest.maximumPageSize = neededPageSize; + ListOpenWorkflowExecutionsResponse fromResponse = + serviceOld.ListOpenWorkflowExecutions(copiedRequest); + if (fromResponse == null) return response; + + fromResponse.getExecutions().addAll(response.getExecutions()); + return fromResponse; + } + + byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length]; + System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length); + System.arraycopy( + response.getNextPageToken(), + 0, + combinedNextPageToken, + _marker.length, + response.getNextPageToken().length); + response.setNextPageToken(combinedNextPageToken); + return response; + } + return serviceOld.ListOpenWorkflowExecutions(listRequest); + } + + @Override + public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions( + ListClosedWorkflowExecutionsRequest listRequest) throws TException { + ListClosedWorkflowExecutionsResponse response; + if (listRequest == null) { + throw new BadRequestError("List request is null"); + } else if (Strings.isNullOrEmpty(listRequest.getDomain())) { + throw new BadRequestError("Domain is null or empty"); + } + if (!listRequest.isSetMaximumPageSize()) { + listRequest.maximumPageSize = _defaultPageSize; + } + + if (!listRequest.isSetNextPageToken() + || listRequest.getNextPageToken().length == 0 + || hasPrefix(listRequest.getNextPageToken(), _marker)) { + if (hasPrefix(listRequest.getNextPageToken(), _marker)) { + listRequest.setNextPageToken( + Arrays.copyOfRange( + listRequest.getNextPageToken(), + _marker.length, + listRequest.getNextPageToken().length)); + } + response = serviceNew.ListClosedWorkflowExecutions(listRequest); + if (response == null) return serviceOld.ListClosedWorkflowExecutions(listRequest); + + if (response.getExecutionsSize() < listRequest.getMaximumPageSize()) { + int neededPageSize = listRequest.getMaximumPageSize() - response.getExecutionsSize(); + ListClosedWorkflowExecutionsRequest copiedRequest = + new ListClosedWorkflowExecutionsRequest(listRequest); + copiedRequest.maximumPageSize = neededPageSize; + ListClosedWorkflowExecutionsResponse fromResponse = + serviceOld.ListClosedWorkflowExecutions(copiedRequest); + if (fromResponse == null) return response; + + fromResponse.getExecutions().addAll(response.getExecutions()); + return fromResponse; + } + + byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length]; + System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length); + System.arraycopy( + response.getNextPageToken(), + 0, + combinedNextPageToken, + _marker.length, + response.getNextPageToken().length); + response.setNextPageToken(combinedNextPageToken); + return response; + } + return serviceOld.ListClosedWorkflowExecutions(listRequest); + } + + @Override + public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) throws TException { + + try { + if (shouldStartInNew(queryRequest.getExecution().getWorkflowId())) + return serviceNew.QueryWorkflow(queryRequest); + return serviceOld.QueryWorkflow(queryRequest); + } catch (NullPointerException e) { + throw new NullPointerException( + "Query does not have workflowID associated: " + e.getMessage()); + } + } + + @Override + public CountWorkflowExecutionsResponse CountWorkflowExecutions( + CountWorkflowExecutionsRequest countRequest) throws TException { + + CountWorkflowExecutionsResponse countResponseNew = + serviceNew.CountWorkflowExecutions(countRequest); + CountWorkflowExecutionsResponse countResponseOld = + serviceOld.CountWorkflowExecutions(countRequest); + if (countResponseNew == null) return countResponseOld; + if (countResponseOld == null) return countResponseNew; + + countResponseOld.setCount(countResponseOld.getCount() + countResponseNew.getCount()); + return countResponseOld; + } + + @Override + public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest) + throws TException { + try { + serviceNew.TerminateWorkflowExecution(terminateRequest); + } catch (EntityNotExistsError e) { + serviceOld.TerminateWorkflowExecution(terminateRequest); + } + } + + private Boolean shouldStartInNew(String workflowID) throws TException { + try { + return describeWorkflowExecution(serviceNew, domainNew, workflowID) + .thenCombine( + describeWorkflowExecution(serviceOld, domainOld, workflowID), + (respNew, respOld) -> + respNew != null // execution already in new + || respOld == null // execution not exist in new and not exist in old + || (respOld.isSetWorkflowExecutionInfo() + && respOld + .getWorkflowExecutionInfo() + .isSetCloseStatus()) // execution not exist in new and execution is + // closed in old + ) + .get(); + } catch (CompletionException e) { + throw e.getCause() instanceof TException + ? (TException) e.getCause() + : new TException("unknown error: " + e.getMessage()); + } catch (Exception e) { + throw new TException("Unknown error: " + e.getMessage()); + } + } + + private CompletableFuture describeWorkflowExecution( + IWorkflowService service, String domain, String workflowID) { + return CompletableFuture.supplyAsync( + () -> { + try { + return service.DescribeWorkflowExecution( + new DescribeWorkflowExecutionRequest() + .setDomain(domain) + .setExecution(new WorkflowExecution().setWorkflowId(workflowID))); + } catch (EntityNotExistsError e) { + return null; + } catch (Exception e) { + throw new CompletionException(e); + } + }); + } +} diff --git a/src/main/java/com/uber/cadence/migration/MigrationInterceptor.java b/src/main/java/com/uber/cadence/migration/MigrationInterceptor.java new file mode 100644 index 000000000..fc0c0f922 --- /dev/null +++ b/src/main/java/com/uber/cadence/migration/MigrationInterceptor.java @@ -0,0 +1,232 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import com.google.common.base.Strings; +import com.uber.cadence.*; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.internal.sync.SyncWorkflowDefinition; +import com.uber.cadence.workflow.*; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; + +public class MigrationInterceptor extends WorkflowInterceptorBase { + private final WorkflowInterceptor next; + private final String domainNew; + private WorkflowClient clientInNewDomain; + + private static final String versionChangeID = "cadenceMigrationInterceptor"; + private static final int versionV1 = 1; + private final ActivityOptions activityOptions = + new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build(); + + private class MigrationDecision { + boolean shouldMigrate; + String reason; + + public MigrationDecision(boolean shouldMigrate, String reason) { + this.shouldMigrate = shouldMigrate; + this.reason = reason; + } + } + + public MigrationInterceptor(WorkflowInterceptor next, WorkflowClient clientInNewDomain) { + super(next); + this.next = next; + this.domainNew = clientInNewDomain.getOptions().getDomain(); + } + + /** + * MigrationInterceptor intercept executeWorkflow method to identify cron scheduled workflows. + * + *

Steps to migrate a cron workflow: 1. Identify cron and non-child workflows from start event + * 2. Start execution in the new domain 3. Cancel current workflow execution 4. If anything + * failed, fallback to cron workflow execution + * + *

If successful, the current cron workflow should be canceled with migration reason and a new + * workflow execution with the same workflow-id and input should start in the new domain. + * + *

WARNING: it's possible to have both workflows running at the same time, if cancel step + * failed. + * + * @param workflowDefinition + * @param input + * @return workflow result + */ + @Override + public byte[] executeWorkflow( + SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) { + + WorkflowInfo workflowInfo = Workflow.getWorkflowInfo(); + // Versioning to ensure replay is deterministic + int version = getVersion(versionChangeID, Workflow.DEFAULT_VERSION, versionV1); + switch (version) { + case versionV1: + // Skip migration on non-cron and child workflows + WorkflowExecutionStartedEventAttributes startedEventAttributes = + input.getWorkflowExecutionStartedEventAttributes(); + if (!isCronSchedule(startedEventAttributes)) + return next.executeWorkflow(workflowDefinition, input); + if (isChildWorkflow(startedEventAttributes)) { + return next.executeWorkflow(workflowDefinition, input); + } + + // deterministically make migration decision by a SideEffect + MigrationDecision decision = + Workflow.sideEffect( + MigrationDecision.class, () -> shouldMigrate(workflowDefinition, input)); + if (decision.shouldMigrate) { + MigrationActivities activities = + Workflow.newActivityStub(MigrationActivities.class, activityOptions); + try { + // start new workflow in new domain + activities.startWorkflowInNewDomain( + new StartWorkflowExecutionRequest() + .setDomain(domainNew) + .setWorkflowId(workflowInfo.getWorkflowId()) + .setTaskList(new TaskList().setName(startedEventAttributes.taskList.getName())) + .setInput(input.getInput()) + .setWorkflowType(new WorkflowType().setName(input.getWorkflowType().getName())) + .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.TerminateIfRunning) + .setRetryPolicy(startedEventAttributes.getRetryPolicy()) + .setRequestId(UUID.randomUUID().toString()) + .setIdentity(startedEventAttributes.getIdentity()) + .setMemo(startedEventAttributes.getMemo()) + .setCronSchedule(startedEventAttributes.getCronSchedule()) + .setHeader(startedEventAttributes.getHeader()) + .setSearchAttributes(startedEventAttributes.getSearchAttributes()) + .setExecutionStartToCloseTimeoutSeconds( + startedEventAttributes.getExecutionStartToCloseTimeoutSeconds()) + .setTaskStartToCloseTimeoutSeconds( + startedEventAttributes.getTaskStartToCloseTimeoutSeconds())); + + // cancel current workflow + cancelCurrentWorkflow(); + } catch (ActivityException e) { + // fallback if start workflow in new domain failed + return next.executeWorkflow(workflowDefinition, input); + } + } + default: + return next.executeWorkflow(workflowDefinition, input); + } + } + + private MigrationDecision shouldMigrate( + SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) { + return new MigrationDecision(true, ""); + } + + /** + * MigrationInterceptor intercepts continueAsNew method to migrate workflows that explicitly wants + * to continue as new. + * + *

Steps to migrate a continue-as-new workflow: 1. workflow execution is already finished and + * is about to continue as new 2. + * + *

NOTE: For cron workflows, this method will NOT be called but is handled by executeWorkflow + * + *

WARNING: Like cron-workflow migration, it's possible to have two continue-as-new workflows + * running in two domains if cancellation fails. + * + * @param workflowType + * @param options + * @param args + */ + @Override + public void continueAsNew( + Optional workflowType, Optional options, Object[] args) { + + // Versioning to ensure replay is deterministic + int version = getVersion(versionChangeID, Workflow.DEFAULT_VERSION, versionV1); + switch (version) { + case versionV1: + WorkflowInfo workflowInfo = Workflow.getWorkflowInfo(); + WorkflowExecutionStartedEventAttributes startedEventAttributes = + workflowInfo.getWorkflowExecutionStartedEventAttributes(); + if (isChildWorkflow(startedEventAttributes)) { + next.continueAsNew(workflowType, options, args); + } + MigrationDecision decision = + Workflow.sideEffect(MigrationDecision.class, () -> new MigrationDecision(true, "")); + if (decision.shouldMigrate) { + try { + MigrationActivities activities = + Workflow.newActivityStub(MigrationActivities.class, activityOptions); + activities.startWorkflowInNewDomain( + new StartWorkflowExecutionRequest() + .setDomain(domainNew) + .setWorkflowId(workflowInfo.getWorkflowId()) + .setTaskList(new TaskList().setName(startedEventAttributes.taskList.getName())) + .setInput(workflowInfo.getDataConverter().toData(args)) + .setWorkflowType( + new WorkflowType() + .setName(startedEventAttributes.getWorkflowType().getName())) + .setWorkflowIdReusePolicy(WorkflowIdReusePolicy.TerminateIfRunning) + .setRetryPolicy(startedEventAttributes.getRetryPolicy()) + .setRequestId(UUID.randomUUID().toString()) + .setIdentity(startedEventAttributes.getIdentity()) + .setMemo(startedEventAttributes.getMemo()) + .setCronSchedule(startedEventAttributes.getCronSchedule()) + .setHeader(startedEventAttributes.getHeader()) + .setSearchAttributes(startedEventAttributes.getSearchAttributes()) + .setExecutionStartToCloseTimeoutSeconds( + startedEventAttributes.getExecutionStartToCloseTimeoutSeconds()) + .setTaskStartToCloseTimeoutSeconds( + startedEventAttributes.getTaskStartToCloseTimeoutSeconds())); + cancelCurrentWorkflow(); + } catch (ActivityException e) { + // fallback if start workflow in new domain failed + next.continueAsNew(workflowType, options, args); + } + } + default: + next.continueAsNew(workflowType, options, args); + } + } + + private boolean isChildWorkflow(WorkflowExecutionStartedEventAttributes startedEventAttributes) { + return startedEventAttributes.isSetParentWorkflowExecution() + && !startedEventAttributes.getParentWorkflowExecution().isSetWorkflowId(); + } + + private boolean isCronSchedule(WorkflowExecutionStartedEventAttributes startedEventAttributes) { + return !Strings.isNullOrEmpty(startedEventAttributes.cronSchedule); + } + + private void cancelCurrentWorkflow() { + // a detached scope is needed otherwise there will be a race condition between + // completion of activity and the workflow cancellation event + WorkflowInfo workflowInfo = Workflow.getWorkflowInfo(); + MigrationActivities activities = + Workflow.newActivityStub(MigrationActivities.class, activityOptions); + Workflow.newDetachedCancellationScope( + () -> { + activities.cancelWorkflowInCurrentDomain( + new RequestCancelWorkflowExecutionRequest() + .setDomain(workflowInfo.getDomain()) + .setWorkflowExecution( + new WorkflowExecution() + .setWorkflowId(workflowInfo.getWorkflowId()) + .setRunId(workflowInfo.getRunId()))); + }) + .run(); + } +} diff --git a/src/main/java/com/uber/cadence/migration/MigrationInterceptorFactory.java b/src/main/java/com/uber/cadence/migration/MigrationInterceptorFactory.java new file mode 100644 index 000000000..e92a83269 --- /dev/null +++ b/src/main/java/com/uber/cadence/migration/MigrationInterceptorFactory.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.workflow.WorkflowInterceptor; +import java.util.function.Function; + +public class MigrationInterceptorFactory + implements Function { + private final WorkflowClient clientInNewDomain; + + public MigrationInterceptorFactory(WorkflowClient clientInNewDomain) { + this.clientInNewDomain = clientInNewDomain; + } + + @Override + public WorkflowInterceptor apply(WorkflowInterceptor next) { + return new MigrationInterceptor(next, this.clientInNewDomain); + } +} diff --git a/src/main/java/com/uber/cadence/serviceclient/IWorkflowServiceBase.java b/src/main/java/com/uber/cadence/serviceclient/IWorkflowServiceBase.java new file mode 100644 index 000000000..f24e78cb8 --- /dev/null +++ b/src/main/java/com/uber/cadence/serviceclient/IWorkflowServiceBase.java @@ -0,0 +1,645 @@ +/* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.serviceclient; + +import com.uber.cadence.*; +import java.util.concurrent.CompletableFuture; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; + +public class IWorkflowServiceBase implements IWorkflowService { + + @Override + public void RegisterDomain(RegisterDomainRequest registerRequest) + throws BadRequestError, DomainAlreadyExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void DeprecateDomain(DeprecateDomainRequest deprecateRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public StartWorkflowExecutionResponse StartWorkflowExecution( + StartWorkflowExecutionRequest startRequest) + throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError, + DomainNotActiveError, LimitExceededError, EntityNotExistsError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest getRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskRequest pollRequest) + throws BadRequestError, ServiceBusyError, LimitExceededError, EntityNotExistsError, + DomainNotActiveError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completeRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public PollForActivityTaskResponse PollForActivityTask(PollForActivityTaskRequest pollRequest) + throws BadRequestError, ServiceBusyError, LimitExceededError, EntityNotExistsError, + DomainNotActiveError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( + RecordActivityTaskHeartbeatRequest heartbeatRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest completeRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCompletedByID( + RespondActivityTaskCompletedByIDRequest completeRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskFailed(RespondActivityTaskFailedRequest failRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskFailedByID(RespondActivityTaskFailedByIDRequest failRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCanceled(RespondActivityTaskCanceledRequest canceledRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCanceledByID( + RespondActivityTaskCanceledByIDRequest canceledRequest) + throws BadRequestError, EntityNotExistsError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest) + throws BadRequestError, EntityNotExistsError, CancellationAlreadyRequestedError, + ServiceBusyError, DomainNotActiveError, LimitExceededError, + ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + LimitExceededError, ClientVersionNotSupportedError, + WorkflowExecutionAlreadyCompletedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + LimitExceededError, WorkflowExecutionAlreadyStartedError, ClientVersionNotSupportedError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ResetWorkflowExecutionResponse ResetWorkflowExecution( + ResetWorkflowExecutionRequest resetRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + LimitExceededError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError, + LimitExceededError, ClientVersionNotSupportedError, + WorkflowExecutionAlreadyCompletedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions( + ListOpenWorkflowExecutionsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, LimitExceededError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions( + ListClosedWorkflowExecutionsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListWorkflowExecutionsResponse ListWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListArchivedWorkflowExecutionsResponse ListArchivedWorkflowExecutions( + ListArchivedWorkflowExecutionsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListWorkflowExecutionsResponse ScanWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public CountWorkflowExecutionsResponse CountWorkflowExecutions( + CountWorkflowExecutionsRequest countRequest) + throws BadRequestError, EntityNotExistsError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public GetSearchAttributesResponse GetSearchAttributes() + throws ServiceBusyError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeRequest) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + DomainNotActiveError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + DomainNotActiveError, ClientVersionNotSupportedError, + WorkflowExecutionAlreadyCompletedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) + throws BadRequestError, EntityNotExistsError, QueryFailedError, LimitExceededError, + ServiceBusyError, ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public DescribeWorkflowExecutionResponse DescribeWorkflowExecution( + DescribeWorkflowExecutionRequest describeRequest) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public DescribeTaskListResponse DescribeTaskList(DescribeTaskListRequest request) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ClusterInfo GetClusterInfo() throws InternalServiceError, ServiceBusyError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public GetTaskListsByDomainResponse GetTaskListsByDomain(GetTaskListsByDomainRequest request) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + ClientVersionNotSupportedError, TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public ListTaskListPartitionsResponse ListTaskListPartitions( + ListTaskListPartitionsRequest request) + throws BadRequestError, EntityNotExistsError, LimitExceededError, ServiceBusyError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request) + throws BadRequestError, DomainNotActiveError, ServiceBusyError, EntityNotExistsError, + TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RegisterDomain( + RegisterDomainRequest registerRequest, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void DescribeDomain( + DescribeDomainRequest describeRequest, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListDomains(ListDomainsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void UpdateDomain(UpdateDomainRequest updateRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void DeprecateDomain( + DeprecateDomainRequest deprecateRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void StartWorkflowExecution( + StartWorkflowExecutionRequest startRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest getRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void PollForDecisionTask( + PollForDecisionTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondDecisionTaskCompleted( + RespondDecisionTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondDecisionTaskFailed( + RespondDecisionTaskFailedRequest failedRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void PollForActivityTask( + PollForActivityTaskRequest pollRequest, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RecordActivityTaskHeartbeat( + RecordActivityTaskHeartbeatRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RecordActivityTaskHeartbeatByID( + RecordActivityTaskHeartbeatByIDRequest heartbeatRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCompleted( + RespondActivityTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCompletedByID( + RespondActivityTaskCompletedByIDRequest completeRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskFailed( + RespondActivityTaskFailedRequest failRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskFailedByID( + RespondActivityTaskFailedByIDRequest failRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCanceled( + RespondActivityTaskCanceledRequest canceledRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondActivityTaskCanceledByID( + RespondActivityTaskCanceledByIDRequest canceledRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RequestCancelWorkflowExecution( + RequestCancelWorkflowExecutionRequest cancelRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void SignalWorkflowExecution( + SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void SignalWithStartWorkflowExecution( + SignalWithStartWorkflowExecutionRequest signalWithStartRequest, + AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ResetWorkflowExecution( + ResetWorkflowExecutionRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void TerminateWorkflowExecution( + TerminateWorkflowExecutionRequest terminateRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListOpenWorkflowExecutions( + ListOpenWorkflowExecutionsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListClosedWorkflowExecutions( + ListClosedWorkflowExecutionsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListArchivedWorkflowExecutions( + ListArchivedWorkflowExecutionsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ScanWorkflowExecutions( + ListWorkflowExecutionsRequest listRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void CountWorkflowExecutions( + CountWorkflowExecutionsRequest countRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void GetSearchAttributes(AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RespondQueryTaskCompleted( + RespondQueryTaskCompletedRequest completeRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ResetStickyTaskList( + ResetStickyTaskListRequest resetRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void QueryWorkflow(QueryWorkflowRequest queryRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void DescribeWorkflowExecution( + DescribeWorkflowExecutionRequest describeRequest, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void DescribeTaskList(DescribeTaskListRequest request, AsyncMethodCallback resultHandler) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void GetClusterInfo(AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void GetTaskListsByDomain( + GetTaskListsByDomainRequest request, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void ListTaskListPartitions( + ListTaskListPartitionsRequest request, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void RefreshWorkflowTasks( + RefreshWorkflowTasksRequest request, AsyncMethodCallback resultHandler) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void close() { + throw new IllegalArgumentException(); + } + + @Override + public void StartWorkflowExecutionWithTimeout( + StartWorkflowExecutionRequest startRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout( + GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis) throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void GetWorkflowExecutionHistoryWithTimeout( + GetWorkflowExecutionHistoryRequest getRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public void SignalWorkflowExecutionWithTimeout( + SignalWorkflowExecutionRequest signalRequest, + AsyncMethodCallback resultHandler, + Long timeoutInMillis) + throws TException { + throw new IllegalArgumentException("unimplemented"); + } + + @Override + public CompletableFuture isHealthy() { + throw new IllegalArgumentException("unimplemented"); + } +} diff --git a/src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java b/src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java index 4f2e6fa65..c744ae88f 100644 --- a/src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java +++ b/src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java @@ -52,6 +52,10 @@ public WorkflowType getWorkflowType() { public byte[] getInput() { return input; } + + public WorkflowExecutionStartedEventAttributes getWorkflowExecutionStartedEventAttributes() { + return workflowEventStart; + } } final class WorkflowResult { diff --git a/src/test/java/com/uber/cadence/migration/MigrationIWorkflowServiceTest.java b/src/test/java/com/uber/cadence/migration/MigrationIWorkflowServiceTest.java new file mode 100644 index 000000000..acb1fb00b --- /dev/null +++ b/src/test/java/com/uber/cadence/migration/MigrationIWorkflowServiceTest.java @@ -0,0 +1,1132 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.migration; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.uber.cadence.*; +import com.uber.cadence.serviceclient.IWorkflowService; +import java.util.ArrayList; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class MigrationIWorkflowServiceTest { + + @Mock private IWorkflowService serviceOld; + + @Mock private IWorkflowService serviceNew; + private MigrationIWorkflowService migrationService; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + migrationService = + new MigrationIWorkflowService(serviceOld, "domainOld", serviceNew, "domainNew"); + } + + // No previous workflow found - launch a workflow in new cluster + @Test + public void testStartWorkflowExecution_startNewWorkflow() throws TException { + + StartWorkflowExecutionRequest startRequest = + new StartWorkflowExecutionRequest() + .setWorkflowId("123") + .setWorkflowType(new WorkflowType().setName("sampleWorkflow")) + .setRequestId("123"); + + DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = + new DescribeWorkflowExecutionRequest() + .setDomain("domainNew") + .setExecution(new WorkflowExecution().setWorkflowId("123")); + DescribeWorkflowExecutionRequest describeOldWorkflowExecutionRequest = + new DescribeWorkflowExecutionRequest() + .setDomain("domainOld") + .setExecution(new WorkflowExecution().setWorkflowId("123")); + + // Verify DescribeWorkflowExecution calls for both services return null + when(serviceNew.DescribeWorkflowExecution(describeWorkflowExecutionRequest)).thenReturn(null); + when(serviceOld.DescribeWorkflowExecution(describeOldWorkflowExecutionRequest)) + .thenReturn(null); + + StartWorkflowExecutionResponse responseNew = new StartWorkflowExecutionResponse(); + when(serviceNew.StartWorkflowExecution(startRequest)).thenReturn(responseNew); + + StartWorkflowExecutionResponse response = migrationService.StartWorkflowExecution(startRequest); + + verify(serviceNew, times(1)).DescribeWorkflowExecution(describeWorkflowExecutionRequest); + verify(serviceOld, times(1)).DescribeWorkflowExecution(describeOldWorkflowExecutionRequest); + + assertEquals(responseNew, response); + + // Verify that the StartWorkflowExecution method is only called once on serviceNew + verify(serviceNew, times(1)).StartWorkflowExecution(startRequest); + + // Verify that no other methods are called + verifyNoMoreInteractions(serviceNew); + verifyNoMoreInteractions(serviceOld); + } + + // Previous running workflow found: expected to launch a wf in the old cluster + @Test + public void testStartWorkflowExecution_startOldWorkflow() throws TException { + + StartWorkflowExecutionRequest startRequest = + new StartWorkflowExecutionRequest() + .setWorkflowId("123") + .setWorkflowType(new WorkflowType().setName("sampleWorkflow")) + .setRequestId("123"); + + when(serviceNew.DescribeWorkflowExecution(any())).thenReturn(null); + + DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = + new DescribeWorkflowExecutionResponse(); + when(serviceOld.DescribeWorkflowExecution(any())).thenReturn(describeWorkflowExecutionResponse); + + StartWorkflowExecutionResponse responseOld = new StartWorkflowExecutionResponse(); + when(serviceOld.StartWorkflowExecution(any())).thenReturn(responseOld); + + StartWorkflowExecutionResponse response = migrationService.StartWorkflowExecution(startRequest); + + // Verify interactions + verify(serviceNew, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).StartWorkflowExecution(any()); + + // Verify that no other methods are called + verifyNoMoreInteractions(serviceNew); + verifyNoMoreInteractions(serviceOld); + + // Assert the response + assertEquals(responseOld, response); + } + + @Test + public void testStartWorkflow_noWorkflowID() throws TException { + StartWorkflowExecutionRequest startRequest = + new StartWorkflowExecutionRequest() + .setWorkflowType(new WorkflowType().setName("sampleWorkflow")) + .setRequestId("123"); + + StartWorkflowExecutionResponse mockResponse = + new StartWorkflowExecutionResponse().setRunId("123"); + when(serviceNew.StartWorkflowExecution(any())).thenReturn(mockResponse); + StartWorkflowExecutionResponse startWorkflowExecutionResponse = + migrationService.StartWorkflowExecution(startRequest); + verify(serviceNew, times(1)).StartWorkflowExecution(any()); + + assertEquals(startWorkflowExecutionResponse, mockResponse); + } + + @Test + public void testStartWorkflow_errorInDescribeWorkflowExecution() throws TException { + + StartWorkflowExecutionRequest startRequest = + new StartWorkflowExecutionRequest() + .setWorkflowId("123") + .setWorkflowType(new WorkflowType().setName("sampleWorkflow")) + .setRequestId("123"); + + when(serviceNew.DescribeWorkflowExecution(any())).thenReturn(null); + when(serviceOld.DescribeWorkflowExecution(any())).thenReturn(null); + StartWorkflowExecutionResponse startWorkflowExecutionResponse = + migrationService.StartWorkflowExecution(startRequest); + // Verify interactions + verify(serviceNew, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).DescribeWorkflowExecution(any()); + + assertNull(startWorkflowExecutionResponse); + } + + @Test + public void testListWorkflows_InitialRequest() throws TException { + + String domainNew = "test"; + int one = 1; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(one) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + // fetch from new cluster for initial request + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + ListWorkflowExecutionsResponse response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // calling old cluster when new cluster returns empty response + @Test + public void testListWorkflow_OldClusterCall() throws TException { + + String domainNew = "test"; + int one = 1; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(one) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockEmptyResponse = + new ListWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListWorkflowExecutionsResponse response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + + when(serviceOld.ListWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // if fetching from new cluster result size is less than pageSize, fetch additional records from + // Old Cluster + @Test + public void testListWorkflow_fetchFromBothCluster() throws TException { + String domainNew = "test"; + int one = 1; + int two = 2; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(one) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsRequest requestTwoItems = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(two) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + when(serviceOld.ListWorkflowExecutions(request)).thenReturn(mockSingleResultResponse); + ListWorkflowExecutionsResponse response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + + when(serviceNew.ListWorkflowExecutions(requestTwoItems)).thenReturn(mockSingleResultResponse); + response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + @Test + public void testListWorkflows_emptyRequestTests() throws TException { + + // Test when request is null + try { + migrationService.ListWorkflowExecutions(null); + } catch (BadRequestError e) { + assertEquals("List request is null", e.getMessage()); + } + + // Test when domain is null + try { + migrationService.ListWorkflowExecutions(new ListWorkflowExecutionsRequest().setPageSize(10)); + } catch (BadRequestError e) { + assertEquals("Domain is null or empty", e.getMessage()); + } + } + + // Test when error returned from internal client, return same error + @Test + public void testListWorkflow_error() throws TException { + String domainNew = "test"; + + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(null); + ListWorkflowExecutionsResponse response = + migrationService.ListWorkflowExecutions( + new ListWorkflowExecutionsRequest().setDomain(domainNew)); + verify(serviceNew, times(1)).ListWorkflowExecutions(any()); + assertNull(response); + } + + @Test + public void testListWorkflow_FromClusterOnly() throws TException { + + String domain = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domain) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockEmptyResponse = + new ListWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + // Test fetch only from 'from' cluster + when(serviceOld.ListWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListWorkflowExecutionsResponse response = migrationService.ListWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + } + + @Test + public void testListWorkflows_ResponseWithToken() throws TException { + + String domainNew = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse expectedResponseWithToken = new ListWorkflowExecutionsResponse(); + expectedResponseWithToken.setExecutions(new ArrayList<>()); + WorkflowExecutionInfo executionInfo1 = new WorkflowExecutionInfo(); + executionInfo1.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + WorkflowExecutionInfo executionInfo2 = new WorkflowExecutionInfo(); + executionInfo2.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + expectedResponseWithToken.getExecutions().add(executionInfo1); + expectedResponseWithToken.getExecutions().add(executionInfo2); + expectedResponseWithToken.setNextPageToken("totestToken".getBytes()); + + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + ListWorkflowExecutionsResponse response = migrationService.ListWorkflowExecutions(request); + assertEquals(expectedResponseWithToken, response); + + ListWorkflowExecutionsRequest requestTwoItems = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(2) + .setNextPageToken((byte[]) null); + + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + response = migrationService.ListWorkflowExecutions(requestTwoItems); + assertEquals(expectedResponseWithToken, response); + } + + @Test + public void testScanWorkflow_InitialRequest() throws TException { + + String domainNew = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + // Mock the serviceNew to return the expected response for the initial request. + when(serviceNew.ScanWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + + // Perform the test and check if the response matches the expected result. + ListWorkflowExecutionsResponse response = migrationService.ScanWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // Test scanWorkflow when new cluster returns an empty response and it falls back to the old + // cluster. + @Test + public void testScanWorkflow_OldClusterCall() throws TException { + + String domainNew = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockEmptyResponse = + new ListWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + // Mock the serviceNew to return an empty response. + when(serviceNew.ScanWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + + // Perform the first test to check if the response is empty as the new cluster returned no + // results. + ListWorkflowExecutionsResponse response = migrationService.ScanWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + + // Mock the serviceOld to return the expected response. + when(serviceOld.ScanWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + + // Perform the second test to check if the response is now populated with data from the old + // cluster. + response = migrationService.ScanWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + @Test + public void testScanWorkflow_FetchFromBothClusters() throws TException { + + String domainNew = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockSingleResultResponse = + new ListWorkflowExecutionsResponse().setExecutions(new ArrayList<>()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + mockSingleResultResponse.setNextPageToken("testToken".getBytes()); + + // Mock the serviceOld to return the expected response. + when(serviceOld.ScanWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + + // Perform the first test to check if the response is populated with data from the old cluster. + ListWorkflowExecutionsResponse response = migrationService.ScanWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + + ListWorkflowExecutionsRequest requestTwoItems = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(2) + .setNextPageToken((byte[]) null); + + // Mock the serviceNew to return the expected response. + when(serviceNew.ScanWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + + // Perform the second test to check if the response is now populated with data from the new + // cluster. + response = migrationService.ScanWorkflowExecutions(requestTwoItems); + assertEquals(mockSingleResultResponse, response); + } + + @Test + public void testScanWorkflow_EmptyRequestTests() throws TException { + + // Test when the request is null. + try { + migrationService.ScanWorkflowExecutions(null); + } catch (BadRequestError e) { + assertEquals("List request is null", e.getMessage()); + } + + // Test when the domain is null. + try { + migrationService.ScanWorkflowExecutions(new ListWorkflowExecutionsRequest().setPageSize(10)); + } catch (BadRequestError e) { + assertEquals("Domain is null or empty", e.getMessage()); + } + } + + // Test when an error is returned from the internal client, and the response is null. + @Test + public void testScanWorkflow_Error() throws TException { + + String domainNew = "test"; + + when(serviceNew.ListWorkflowExecutions(any())).thenReturn(null); + ListWorkflowExecutionsResponse response = + migrationService.ScanWorkflowExecutions( + new ListWorkflowExecutionsRequest().setDomain(domainNew)); + verify(serviceNew, times(1)).ScanWorkflowExecutions(any()); + assertNull(response); + } + + // Test scanWorkflow when fetching only from the 'from' cluster. + @Test + public void testScanWorkflow_FromClusterOnly() throws TException { + + String domain = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domain) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse mockEmptyResponse = + new ListWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + // Mock the serviceOld to return an empty response. + when(serviceOld.ScanWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + + // Perform the test to check if the response is empty as the new cluster returned no results. + ListWorkflowExecutionsResponse response = migrationService.ScanWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + } + + @Test + public void testScanWorkflows_ResponseWithToken() throws TException { + + String domainNew = "test"; + + ListWorkflowExecutionsRequest request = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(1) + .setNextPageToken((byte[]) null); + + ListWorkflowExecutionsResponse expectedResponseWithToken = new ListWorkflowExecutionsResponse(); + expectedResponseWithToken.setExecutions(new ArrayList<>()); + WorkflowExecutionInfo executionInfo1 = new WorkflowExecutionInfo(); + executionInfo1.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + WorkflowExecutionInfo executionInfo2 = new WorkflowExecutionInfo(); + executionInfo2.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + expectedResponseWithToken.getExecutions().add(executionInfo1); + expectedResponseWithToken.getExecutions().add(executionInfo2); + expectedResponseWithToken.setNextPageToken("totestToken".getBytes()); + + // Mock the serviceNew to return the expected response with a token. + when(serviceNew.ScanWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + + // Perform the first test to check if the response contains a token from the new cluster. + ListWorkflowExecutionsResponse response = migrationService.ScanWorkflowExecutions(request); + assertEquals(expectedResponseWithToken, response); + + ListWorkflowExecutionsRequest requestTwoItems = + new ListWorkflowExecutionsRequest() + .setDomain(domainNew) + .setPageSize(2) + .setNextPageToken((byte[]) null); + + // Perform the second test to check if the response contains a token from the new cluster for a + // different page size. + when(serviceNew.ScanWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + response = migrationService.ScanWorkflowExecutions(requestTwoItems); + assertEquals(expectedResponseWithToken, response); + } + + @Test + public void testCountWorkflow_bothClusterSuccess() throws TException { + + String domain = "test"; + String query = ""; + + CountWorkflowExecutionsRequest request = + new CountWorkflowExecutionsRequest().setDomain(domain).setQuery(query); + CountWorkflowExecutionsResponse mockResponseOld = + new CountWorkflowExecutionsResponse().setCount(2); + CountWorkflowExecutionsResponse mockResponseNew = + new CountWorkflowExecutionsResponse().setCount(3); + + CountWorkflowExecutionsResponse expectedResponse = + new CountWorkflowExecutionsResponse(mockResponseNew); + expectedResponse.setCount(5); + + // both clusters return successful response + when(serviceOld.CountWorkflowExecutions(any())).thenReturn(mockResponseOld); + when(serviceNew.CountWorkflowExecutions(any())).thenReturn(mockResponseNew); + CountWorkflowExecutionsResponse response = migrationService.CountWorkflowExecutions(request); + assertEquals(expectedResponse, response); + } + + @Test + public void testCountWorkflow_errorInOneCluster() throws TException { + + String domain = "test"; + String query = ""; + + CountWorkflowExecutionsRequest request = + new CountWorkflowExecutionsRequest().setDomain(domain).setQuery(query); + CountWorkflowExecutionsResponse mockResponseOld = + new CountWorkflowExecutionsResponse().setCount(2); + + CountWorkflowExecutionsResponse expectedResponse = + new CountWorkflowExecutionsResponse(mockResponseOld); + expectedResponse.setCount(2); + + when(serviceOld.CountWorkflowExecutions(any())).thenReturn(mockResponseOld); + when(serviceNew.CountWorkflowExecutions(any())).thenReturn(null); + CountWorkflowExecutionsResponse response = migrationService.CountWorkflowExecutions(request); + assertEquals(expectedResponse, response); + } + + // query in the new cluster + @Test + public void testQueryWorkflow_queryWorkflowInNew() throws TException { + + String domain = "test"; + String wfID = "123"; + + QueryWorkflowRequest queryWorkflowRequest = + new QueryWorkflowRequest() + .setDomain(domain) + .setQuery(new WorkflowQuery()) + .setExecution(new WorkflowExecution().setWorkflowId(wfID)); + + DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = + new DescribeWorkflowExecutionResponse(); + + when(serviceNew.DescribeWorkflowExecution(any())).thenReturn(describeWorkflowExecutionResponse); + when(serviceOld.DescribeWorkflowExecution(any())).thenReturn(null); + + QueryWorkflowResponse mockResponse = new QueryWorkflowResponse(); + when(serviceNew.QueryWorkflow(any())).thenReturn(mockResponse); + + QueryWorkflowResponse response = migrationService.QueryWorkflow(queryWorkflowRequest); + + // Verify interactions + verify(serviceNew, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).DescribeWorkflowExecution(any()); + verify(serviceNew, times(1)).QueryWorkflow(any()); + + // Verify that no other methods are called + verifyNoMoreInteractions(serviceNew); + verifyNoMoreInteractions(serviceOld); + + // Assert the response + assertEquals(mockResponse, response); + } + + // query found in the old cluster + @Test + public void testQueryWorkflow_queryWorkflowInOld() throws TException { + + String domain = "test"; + String wfID = "123"; + + QueryWorkflowRequest queryWorkflowRequest = + new QueryWorkflowRequest() + .setDomain(domain) + .setQuery(new WorkflowQuery()) + .setExecution(new WorkflowExecution().setWorkflowId(wfID)); + + when(serviceNew.DescribeWorkflowExecution(any())).thenReturn(null); + + DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = + new DescribeWorkflowExecutionResponse(); + when(serviceOld.DescribeWorkflowExecution(any())).thenReturn(describeWorkflowExecutionResponse); + + QueryWorkflowResponse mockResponse = new QueryWorkflowResponse(); + when(serviceOld.QueryWorkflow(any())).thenReturn(mockResponse); + + QueryWorkflowResponse response = migrationService.QueryWorkflow(queryWorkflowRequest); + + // Verify interactions + verify(serviceNew, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).QueryWorkflow(any()); + + // Verify that no other methods are called + verifyNoMoreInteractions(serviceNew); + verifyNoMoreInteractions(serviceOld); + + // Assert the response + assertEquals(mockResponse, response); + } + + @Test + public void testQueryWorkflow_noWorkflowID() throws TException { + + String domain = "test"; + QueryWorkflowRequest request = new QueryWorkflowRequest().setDomain(domain); + QueryWorkflowResponse response = new QueryWorkflowResponse(); + try { + response = migrationService.QueryWorkflow(request); + assertNull(response); + } catch (NullPointerException e) { + assertNotNull(response); + } + } + + @Test + public void testQueryWorkflow_errorInDescribeWorkflowExecution() throws TException { + + String domain = "test"; + String wfID = "123"; + + QueryWorkflowRequest queryWorkflowRequest = + new QueryWorkflowRequest() + .setDomain(domain) + .setQuery(new WorkflowQuery()) + .setExecution(new WorkflowExecution().setWorkflowId(wfID)); + + when(serviceNew.DescribeWorkflowExecution(any())).thenReturn(null); + when(serviceOld.DescribeWorkflowExecution(any())).thenReturn(null); + QueryWorkflowResponse queryWorkflowResponse = + migrationService.QueryWorkflow(queryWorkflowRequest); + // Verify interactions + verify(serviceNew, times(1)).DescribeWorkflowExecution(any()); + verify(serviceOld, times(1)).DescribeWorkflowExecution(any()); + + assertNull(queryWorkflowResponse); + } + + @Test + public void testListOpenWorkflows_InitialRequest() throws TException { + + String domain = "test"; + + ListOpenWorkflowExecutionsRequest request = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListOpenWorkflowExecutionsResponse mockSingleResultResponse = + new ListOpenWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + + // fetch from new cluster for initial request + when(serviceNew.ListOpenWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // calling old cluster when new cluster returns empty response + @Test + public void testListOpenWorkflow_OldClusterCall() throws TException { + + String domain = "test"; + + ListOpenWorkflowExecutionsRequest request = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListOpenWorkflowExecutionsResponse mockEmptyResponse = + new ListOpenWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + ListOpenWorkflowExecutionsResponse mockSingleResultResponse = + new ListOpenWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + + when(serviceNew.ListOpenWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + + when(serviceOld.ListOpenWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + response = migrationService.ListOpenWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // if fetching from new cluster result size is less than pageSize, fetch additional records from + // Old Cluster + @Test + public void testListOpenWorkflow_fetchFromBothCluster() throws TException { + + String domain = "test"; + + ListOpenWorkflowExecutionsRequest requestTwoPages = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(2) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListOpenWorkflowExecutionsResponse mockTwoResultResponse = + new ListOpenWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockTwoResultResponse.getExecutions().add(executionInfo); + + when(serviceNew.ListOpenWorkflowExecutions(requestTwoPages)).thenReturn(mockTwoResultResponse); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions(requestTwoPages); + assertEquals(mockTwoResultResponse, response); + + when(serviceOld.ListOpenWorkflowExecutions(requestTwoPages)).thenReturn(mockTwoResultResponse); + response = migrationService.ListOpenWorkflowExecutions(requestTwoPages); + mockTwoResultResponse.getExecutions().add(executionInfo); + assertEquals(mockTwoResultResponse, response); + } + + @Test + public void testListOpenWorkflows_emptyRequestTests() throws TException { + + // Test when request is null + try { + migrationService.ListOpenWorkflowExecutions(null); + } catch (BadRequestError e) { + assertEquals("List request is null", e.getMessage()); + } + + // Test when domain is null + try { + migrationService.ListOpenWorkflowExecutions( + new ListOpenWorkflowExecutionsRequest().setMaximumPageSize(10)); + } catch (BadRequestError e) { + assertEquals("Domain is null or empty", e.getMessage()); + } + } + + // Test when error returned from internal client, return same error + @Test + public void testListOpenWorkflow_error() throws TException { + String domain = "test"; + + when(serviceNew.ListOpenWorkflowExecutions(any())).thenReturn(null); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions( + new ListOpenWorkflowExecutionsRequest().setDomain(domain)); + verify(serviceNew, times(1)).ListOpenWorkflowExecutions(any()); + assertNull(response); + } + + @Test + public void testListOpenWorkflow_FromClusterOnly() throws TException { + + String domain = "test"; + + ListOpenWorkflowExecutionsRequest request = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null); + + ListOpenWorkflowExecutionsResponse mockEmptyResponse = + new ListOpenWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + when(serviceOld.ListOpenWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + } + + @Test + public void testListOpenWorkflows_ResponseWithToken() throws TException { + + String domain = "test"; + + ListOpenWorkflowExecutionsRequest request = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null); + + ListOpenWorkflowExecutionsResponse expectedResponseWithToken = + new ListOpenWorkflowExecutionsResponse(); + expectedResponseWithToken.setExecutions(new ArrayList<>()); + WorkflowExecutionInfo executionInfo1 = new WorkflowExecutionInfo(); + executionInfo1.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + WorkflowExecutionInfo executionInfo2 = new WorkflowExecutionInfo(); + executionInfo2.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + expectedResponseWithToken.getExecutions().add(executionInfo1); + expectedResponseWithToken.getExecutions().add(executionInfo2); + expectedResponseWithToken.setNextPageToken("totestToken".getBytes()); + + when(serviceNew.ListOpenWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + ListOpenWorkflowExecutionsResponse response = + migrationService.ListOpenWorkflowExecutions(request); + assertEquals(expectedResponseWithToken, response); + + ListOpenWorkflowExecutionsRequest requestTwoItems = + new ListOpenWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(2) + .setNextPageToken((byte[]) null); + + when(serviceNew.ListOpenWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + response = migrationService.ListOpenWorkflowExecutions(requestTwoItems); + assertEquals(expectedResponseWithToken, response); + } + + @Test + public void testListClosedWorkflows_InitialRequest() throws TException { + + String domain = "test"; + + ListClosedWorkflowExecutionsRequest request = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListClosedWorkflowExecutionsResponse mockSingleResultResponse = + new ListClosedWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + + // fetch from new cluster for initial request + when(serviceNew.ListClosedWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // calling old cluster when new cluster returns empty response + @Test + public void testListClosedWorkflow_OldClusterCall() throws TException { + + String domain = "test"; + + ListClosedWorkflowExecutionsRequest request = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListClosedWorkflowExecutionsResponse mockEmptyResponse = + new ListClosedWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + ListClosedWorkflowExecutionsResponse mockSingleResultResponse = + new ListClosedWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockSingleResultResponse.getExecutions().add(executionInfo); + + when(serviceNew.ListClosedWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + + when(serviceOld.ListClosedWorkflowExecutions(any())).thenReturn(mockSingleResultResponse); + response = migrationService.ListClosedWorkflowExecutions(request); + assertEquals(mockSingleResultResponse, response); + } + + // if fetching from new cluster result size is less than pageSize, fetch additional records from + // Old Cluster + @Test + public void testListClosedWorkflow_fetchFromBothCluster() throws TException { + + String domain = "test"; + + ListClosedWorkflowExecutionsRequest requestTwoPages = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(2) + .setNextPageToken((byte[]) null) + .setExecutionFilter(null) + .setTypeFilter(null) + .setStartTimeFilter( + new StartTimeFilter().setEarliestTime(1000).setLatestTime(1000000000)); + + ListClosedWorkflowExecutionsResponse mockTwoResultResponse = + new ListClosedWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken("testToken".getBytes()); + + WorkflowExecutionInfo executionInfo = new WorkflowExecutionInfo(); + executionInfo.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + mockTwoResultResponse.getExecutions().add(executionInfo); + + when(serviceNew.ListClosedWorkflowExecutions(requestTwoPages)) + .thenReturn(mockTwoResultResponse); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions(requestTwoPages); + assertEquals(mockTwoResultResponse, response); + + when(serviceOld.ListClosedWorkflowExecutions(requestTwoPages)) + .thenReturn(mockTwoResultResponse); + response = migrationService.ListClosedWorkflowExecutions(requestTwoPages); + mockTwoResultResponse.getExecutions().add(executionInfo); + assertEquals(mockTwoResultResponse, response); + } + + @Test + public void testListClosedWorkflows_emptyRequestTests() throws TException { + + // Test when request is null + try { + migrationService.ListClosedWorkflowExecutions(null); + } catch (BadRequestError e) { + assertEquals("List request is null", e.getMessage()); + } + + // Test when domain is null + try { + migrationService.ListClosedWorkflowExecutions( + new ListClosedWorkflowExecutionsRequest().setMaximumPageSize(10)); + } catch (BadRequestError e) { + assertEquals("Domain is null or empty", e.getMessage()); + } + } + + // Test when error returned from internal client, return same error + @Test + public void testListClosedWorkflow_error() throws TException { + String domain = "test"; + + when(serviceNew.ListClosedWorkflowExecutions(any())).thenReturn(null); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions( + new ListClosedWorkflowExecutionsRequest().setDomain(domain)); + verify(serviceNew, times(1)).ListClosedWorkflowExecutions(any()); + assertNull(response); + } + + @Test + public void testListClosedWorkflow_FromClusterOnly() throws TException { + + String domain = "test"; + + ListClosedWorkflowExecutionsRequest request = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null); + + ListClosedWorkflowExecutionsResponse mockEmptyResponse = + new ListClosedWorkflowExecutionsResponse() + .setExecutions(new ArrayList<>()) + .setNextPageToken((byte[]) null); + + when(serviceOld.ListClosedWorkflowExecutions(any())).thenReturn(mockEmptyResponse); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions(request); + assertEquals(mockEmptyResponse, response); + } + + @Test + public void testListClosedWorkflows_ResponseWithToken() throws TException { + + String domain = "test"; + + ListClosedWorkflowExecutionsRequest request = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(1) + .setNextPageToken((byte[]) null); + + ListClosedWorkflowExecutionsResponse expectedResponseWithToken = + new ListClosedWorkflowExecutionsResponse(); + expectedResponseWithToken.setExecutions(new ArrayList<>()); + WorkflowExecutionInfo executionInfo1 = new WorkflowExecutionInfo(); + executionInfo1.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + WorkflowExecutionInfo executionInfo2 = new WorkflowExecutionInfo(); + executionInfo2.setExecution( + new WorkflowExecution().setWorkflowId("testWfId").setRunId("testRunId")); + expectedResponseWithToken.getExecutions().add(executionInfo1); + expectedResponseWithToken.getExecutions().add(executionInfo2); + expectedResponseWithToken.setNextPageToken("totestToken".getBytes()); + + when(serviceNew.ListClosedWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + ListClosedWorkflowExecutionsResponse response = + migrationService.ListClosedWorkflowExecutions(request); + assertEquals(expectedResponseWithToken, response); + + ListClosedWorkflowExecutionsRequest requestTwoItems = + new ListClosedWorkflowExecutionsRequest() + .setDomain(domain) + .setMaximumPageSize(2) + .setNextPageToken((byte[]) null); + + when(serviceOld.ListClosedWorkflowExecutions(any())).thenReturn(expectedResponseWithToken); + response = migrationService.ListClosedWorkflowExecutions(requestTwoItems); + assertEquals(expectedResponseWithToken, response); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java new file mode 100644 index 000000000..4c15a7ed8 --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java @@ -0,0 +1,201 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.workflow; + +import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.workflow.WorkflowTest.DOMAIN2; +import static junit.framework.TestCase.fail; + +import com.uber.cadence.*; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.common.CronSchedule; +import com.uber.cadence.internal.worker.PollerOptions; +import com.uber.cadence.migration.MigrationActivitiesImpl; +import com.uber.cadence.migration.MigrationIWorkflowService; +import com.uber.cadence.migration.MigrationInterceptorFactory; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactory; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.worker.WorkerOptions; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +public class WorkflowMigrationTest { + private WorkflowClient migrationWorkflowClient, workflowClientCurr, workflowClientNew; + private boolean useDockerService = Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); + private static final String TASKLIST = "TASKLIST"; + private TracingWorkflowInterceptorFactory tracer; + WorkerFactory factoryCurr, factoryNew; + Worker workerCurr, workerNew; + + @Before + public void setUp() { + IWorkflowService service = + new WorkflowServiceTChannel( + ClientOptions.newBuilder() + .setFeatureFlags( + new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) + .build()); + workflowClientCurr = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build()); + workflowClientNew = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setDomain(DOMAIN2).build()); + MigrationIWorkflowService migrationService = + new MigrationIWorkflowService( + service, DOMAIN, + service, DOMAIN2); + migrationWorkflowClient = + WorkflowClient.newInstance( + migrationService, WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build()); + WorkerFactoryOptions factoryOptions = WorkerFactoryOptions.newBuilder().build(); + + // tracer interceptor Factory + tracer = new TracingWorkflowInterceptorFactory(); + + // migration interceptor + MigrationInterceptorFactory migrator = new MigrationInterceptorFactory(workflowClientNew); + + // current domain worker + factoryCurr = new WorkerFactory(workflowClientCurr, factoryOptions); + workerCurr = + factoryCurr.newWorker( + TASKLIST, + WorkerOptions.newBuilder() + .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) + .setMaxConcurrentActivityExecutionSize(1000) + .setInterceptorFactory(migrator.andThen(tracer)) + .build()); + workerCurr.registerWorkflowImplementationTypes( + CronWorkflowImpl.class, ContinueAsNewWorkflowImpl.class); + workerCurr.registerActivitiesImplementations( + new MigrationActivitiesImpl(workflowClientCurr, workflowClientNew)); + factoryCurr.start(); + + // new domain worker + factoryNew = new WorkerFactory(workflowClientNew, factoryOptions); + workerNew = + factoryNew.newWorker( + TASKLIST, + WorkerOptions.newBuilder() + .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) + .setMaxConcurrentActivityExecutionSize(1000) + .setInterceptorFactory(tracer) + .build()); + workerNew.registerWorkflowImplementationTypes( + CronWorkflowImpl.class, ContinueAsNewWorkflowImpl.class); + factoryNew.start(); + } + + @After + public void tearDown() throws Throwable { + factoryCurr.shutdown(); + factoryNew.shutdown(); + } + + public interface CronWorkflow { + @WorkflowMethod( + taskList = TASKLIST, + workflowIdReusePolicy = WorkflowIdReusePolicy.RejectDuplicate, + executionStartToCloseTimeoutSeconds = 10 + ) + @CronSchedule("* * * * *") + String execute(String testName); + } + + public static class CronWorkflowImpl implements CronWorkflow { + @Override + public String execute(String testName) { + return "Cron Workflow Completed"; + } + } + + public interface ContinueAsNewWorkflow { + @WorkflowMethod( + taskList = TASKLIST, + workflowIdReusePolicy = WorkflowIdReusePolicy.RejectDuplicate, + executionStartToCloseTimeoutSeconds = 10 + ) + void execute(int iter); + } + + public static class ContinueAsNewWorkflowImpl implements ContinueAsNewWorkflow { + @Override + public void execute(int iter) { + Workflow.continueAsNew(iter + 1); + } + } + + @Test + public void whenUseDockerService_cronWorkflowMigration() { + Assume.assumeTrue(useDockerService); + String workflowID = UUID.randomUUID().toString(); + try { + workflowClientCurr + .newWorkflowStub( + CronWorkflow.class, new WorkflowOptions.Builder().setWorkflowId(workflowID).build()) + .execute("for test"); + } catch (CancellationException e) { + try { + describeWorkflowExecution(workflowClientNew, workflowID); + } catch (Exception eDesc) { + fail("fail to describe workflow execution in new domain: " + eDesc); + } + } + } + + @Test + public void whenUseDockerService_continueAsNewWorkflowMigration() { + Assume.assumeTrue(useDockerService); + String workflowID = UUID.randomUUID().toString(); + try { + workflowClientCurr + .newWorkflowStub( + ContinueAsNewWorkflow.class, + new WorkflowOptions.Builder().setWorkflowId(workflowID).build()) + .execute(0); + } catch (CancellationException e) { + try { + describeWorkflowExecution(workflowClientNew, workflowID); + } catch (Exception eDesc) { + fail("fail to describe workflow execution in new domain: " + eDesc); + } + } + } + + private DescribeWorkflowExecutionResponse describeWorkflowExecution( + WorkflowClient wc, String workflowID) throws TException { + return wc.getService() + .DescribeWorkflowExecution( + new DescribeWorkflowExecutionRequest() + .setExecution(new WorkflowExecution().setWorkflowId(workflowID)) + .setDomain(wc.getOptions().getDomain())); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 34324225c..1c7a3628f 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -62,10 +62,8 @@ import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.sync.DeterministicRunnerTest; -import com.uber.cadence.internal.sync.SyncWorkflowDefinition; import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal; import com.uber.cadence.internal.worker.PollerOptions; -import com.uber.cadence.internal.worker.WorkflowExecutionException; import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.serviceclient.WorkflowServiceTChannel; @@ -73,8 +71,7 @@ import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.testing.WorkflowReplayer; import com.uber.cadence.worker.*; -import com.uber.cadence.workflow.Functions.Func; -import com.uber.cadence.workflow.Functions.Func1; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -82,7 +79,6 @@ import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; -import java.lang.reflect.Type; import java.text.SimpleDateFormat; import java.time.Duration; import java.time.LocalDateTime; @@ -114,9 +110,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiPredicate; -import java.util.function.Function; -import java.util.function.Supplier; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -5083,37 +5076,6 @@ public void testNonDeterministicWorkflowPolicyFailWorkflow() { } } - private static class TracingWorkflowInterceptorFactory - implements Function { - - private final FilteredTrace trace = new FilteredTrace(); - private List expected; - - @Override - public WorkflowInterceptor apply(WorkflowInterceptor next) { - return new TracingWorkflowInterceptor(trace, next); - } - - public String getTrace() { - return String.join("\n", trace.getImpl()); - } - - public void setExpected(String... expected) { - this.expected = Arrays.asList(expected); - } - - public void assertExpected() { - if (expected != null) { - List traceElements = trace.getImpl(); - for (int i = 0; i < traceElements.size(); i++) { - String t = traceElements.get(i); - String expectedRegExp = expected.get(i); - Assert.assertTrue(t + " doesn't match " + expectedRegExp, t.matches(expectedRegExp)); - } - } - } - } - public static class TestUUIDAndRandom implements TestWorkflow1 { @Override @@ -5887,23 +5849,6 @@ public void testTimerFiringTimestampEarlierThanExpected() throws Exception { "timerfiring.json", TimerFiringWorkflowImpl.class); } - private static class FilteredTrace { - - private final List impl = Collections.synchronizedList(new ArrayList<>()); - - public boolean add(String s) { - log.trace("FilteredTrace isReplaying=" + Workflow.isReplaying()); - if (!Workflow.isReplaying()) { - return impl.add(s); - } - return true; - } - - List getImpl() { - return impl; - } - } - public interface TestCompensationWorkflow { @WorkflowMethod void compensate(); @@ -6148,152 +6093,6 @@ public void testParentWorkflowInfoInChildWorkflows() { assertEquals(expected, result); } - private static class TracingWorkflowInterceptor implements WorkflowInterceptor { - - private final FilteredTrace trace; - private final WorkflowInterceptor next; - - private TracingWorkflowInterceptor(FilteredTrace trace, WorkflowInterceptor next) { - this.trace = trace; - this.next = Objects.requireNonNull(next); - } - - @Override - public byte[] executeWorkflow( - SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) - throws CancellationException, WorkflowExecutionException { - trace.add("executeWorkflow: " + input.getWorkflowType().getName()); - return next.executeWorkflow(workflowDefinition, input); - } - - @Override - public Promise executeActivity( - String activityName, - Class resultClass, - Type resultType, - Object[] args, - ActivityOptions options) { - trace.add("executeActivity " + activityName); - return next.executeActivity(activityName, resultClass, resultType, args, options); - } - - @Override - public Promise executeLocalActivity( - String activityName, - Class resultClass, - Type resultType, - Object[] args, - LocalActivityOptions options) { - trace.add("executeLocalActivity " + activityName); - return next.executeLocalActivity(activityName, resultClass, resultType, args, options); - } - - @Override - public WorkflowResult executeChildWorkflow( - String workflowType, - Class resultClass, - Type resultType, - Object[] args, - ChildWorkflowOptions options) { - trace.add("executeChildWorkflow " + workflowType); - return next.executeChildWorkflow(workflowType, resultClass, resultType, args, options); - } - - @Override - public Random newRandom() { - trace.add("newRandom"); - return next.newRandom(); - } - - @Override - public Promise signalExternalWorkflow( - String domain, WorkflowExecution execution, String signalName, Object[] args) { - trace.add("signalExternalWorkflow " + execution.getWorkflowId() + " " + signalName); - return next.signalExternalWorkflow(domain, execution, signalName, args); - } - - @Override - public Promise signalExternalWorkflow( - WorkflowExecution execution, String signalName, Object[] args) { - trace.add("signalExternalWorkflow " + execution.getWorkflowId() + " " + signalName); - return next.signalExternalWorkflow(execution, signalName, args); - } - - @Override - public Promise cancelWorkflow(WorkflowExecution execution) { - trace.add("cancelWorkflow " + execution.getWorkflowId()); - return next.cancelWorkflow(execution); - } - - @Override - public void sleep(Duration duration) { - trace.add("sleep " + duration); - next.sleep(duration); - } - - @Override - public boolean await(Duration timeout, String reason, Supplier unblockCondition) { - trace.add("await " + timeout + " " + reason); - return next.await(timeout, reason, unblockCondition); - } - - @Override - public void await(String reason, Supplier unblockCondition) { - trace.add("await " + reason); - next.await(reason, unblockCondition); - } - - @Override - public Promise newTimer(Duration duration) { - trace.add("newTimer " + duration); - return next.newTimer(duration); - } - - @Override - public R sideEffect(Class resultClass, Type resultType, Func func) { - trace.add("sideEffect"); - return next.sideEffect(resultClass, resultType, func); - } - - @Override - public R mutableSideEffect( - String id, Class resultClass, Type resultType, BiPredicate updated, Func func) { - trace.add("mutableSideEffect"); - return next.mutableSideEffect(id, resultClass, resultType, updated, func); - } - - @Override - public int getVersion(String changeID, int minSupported, int maxSupported) { - trace.add("getVersion"); - return next.getVersion(changeID, minSupported, maxSupported); - } - - @Override - public void continueAsNew( - Optional workflowType, Optional options, Object[] args) { - trace.add("continueAsNew"); - next.continueAsNew(workflowType, options, args); - } - - @Override - public void registerQuery(String queryType, Type[] argTypes, Func1 callback) { - trace.add("registerQuery " + queryType); - next.registerQuery(queryType, argTypes, callback); - } - - @Override - public UUID randomUUID() { - trace.add("randomUUID"); - return next.randomUUID(); - } - - @Override - public void upsertSearchAttributes(Map searchAttributes) { - trace.add("upsertSearchAttributes"); - next.upsertSearchAttributes(searchAttributes); - } - } - public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 { private String result = ""; diff --git a/src/test/java/com/uber/cadence/workflow/interceptors/TracingWorkflowInterceptorFactory.java b/src/test/java/com/uber/cadence/workflow/interceptors/TracingWorkflowInterceptorFactory.java new file mode 100644 index 000000000..80cf10790 --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/interceptors/TracingWorkflowInterceptorFactory.java @@ -0,0 +1,236 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.workflow.interceptors; + +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.activity.LocalActivityOptions; +import com.uber.cadence.internal.sync.SyncWorkflowDefinition; +import com.uber.cadence.internal.worker.WorkflowExecutionException; +import com.uber.cadence.workflow.*; +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CancellationException; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.function.Supplier; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TracingWorkflowInterceptorFactory + implements Function { + + private final FilteredTrace trace = new FilteredTrace(); + private List expected; + private static final Logger log = + LoggerFactory.getLogger(TracingWorkflowInterceptorFactory.class); + + @Override + public WorkflowInterceptor apply(WorkflowInterceptor next) { + return new TracingWorkflowInterceptor(trace, next); + } + + public String getTrace() { + return String.join("\n", trace.getImpl()); + } + + public void setExpected(String... expected) { + this.expected = Arrays.asList(expected); + } + + public void assertExpected() { + if (expected != null) { + List traceElements = trace.getImpl(); + for (int i = 0; i < traceElements.size(); i++) { + String t = traceElements.get(i); + String expectedRegExp = expected.get(i); + Assert.assertTrue(t + " doesn't match " + expectedRegExp, t.matches(expectedRegExp)); + } + } + } + + private static class FilteredTrace { + + private final List impl = Collections.synchronizedList(new ArrayList<>()); + + public boolean add(String s) { + log.trace("FilteredTrace isReplaying=" + Workflow.isReplaying()); + if (!Workflow.isReplaying()) { + return impl.add(s); + } + return true; + } + + List getImpl() { + return impl; + } + } + + private static class TracingWorkflowInterceptor implements WorkflowInterceptor { + + private final FilteredTrace trace; + private final WorkflowInterceptor next; + + private TracingWorkflowInterceptor(FilteredTrace trace, WorkflowInterceptor next) { + this.trace = trace; + this.next = Objects.requireNonNull(next); + } + + @Override + public byte[] executeWorkflow( + SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) + throws CancellationException, WorkflowExecutionException { + trace.add("executeWorkflow: " + input.getWorkflowType().getName()); + return next.executeWorkflow(workflowDefinition, input); + } + + @Override + public Promise executeActivity( + String activityName, + Class resultClass, + Type resultType, + Object[] args, + ActivityOptions options) { + trace.add("executeActivity " + activityName); + return next.executeActivity(activityName, resultClass, resultType, args, options); + } + + @Override + public Promise executeLocalActivity( + String activityName, + Class resultClass, + Type resultType, + Object[] args, + LocalActivityOptions options) { + trace.add("executeLocalActivity " + activityName); + return next.executeLocalActivity(activityName, resultClass, resultType, args, options); + } + + @Override + public WorkflowResult executeChildWorkflow( + String workflowType, + Class resultClass, + Type resultType, + Object[] args, + ChildWorkflowOptions options) { + trace.add("executeChildWorkflow " + workflowType); + return next.executeChildWorkflow(workflowType, resultClass, resultType, args, options); + } + + @Override + public Random newRandom() { + trace.add("newRandom"); + return next.newRandom(); + } + + @Override + public Promise signalExternalWorkflow( + String domain, WorkflowExecution execution, String signalName, Object[] args) { + trace.add("signalExternalWorkflow " + execution.getWorkflowId() + " " + signalName); + return next.signalExternalWorkflow(domain, execution, signalName, args); + } + + @Override + public Promise signalExternalWorkflow( + WorkflowExecution execution, String signalName, Object[] args) { + trace.add("signalExternalWorkflow " + execution.getWorkflowId() + " " + signalName); + return next.signalExternalWorkflow(execution, signalName, args); + } + + @Override + public Promise cancelWorkflow(WorkflowExecution execution) { + trace.add("cancelWorkflow " + execution.getWorkflowId()); + return next.cancelWorkflow(execution); + } + + @Override + public void sleep(Duration duration) { + trace.add("sleep " + duration); + next.sleep(duration); + } + + @Override + public boolean await(Duration timeout, String reason, Supplier unblockCondition) { + trace.add("await " + timeout + " " + reason); + return next.await(timeout, reason, unblockCondition); + } + + @Override + public void await(String reason, Supplier unblockCondition) { + trace.add("await " + reason); + next.await(reason, unblockCondition); + } + + @Override + public Promise newTimer(Duration duration) { + trace.add("newTimer " + duration); + return next.newTimer(duration); + } + + @Override + public R sideEffect(Class resultClass, Type resultType, Functions.Func func) { + trace.add("sideEffect"); + return next.sideEffect(resultClass, resultType, func); + } + + @Override + public R mutableSideEffect( + String id, + Class resultClass, + Type resultType, + BiPredicate updated, + Functions.Func func) { + trace.add("mutableSideEffect"); + return next.mutableSideEffect(id, resultClass, resultType, updated, func); + } + + @Override + public int getVersion(String changeID, int minSupported, int maxSupported) { + trace.add("getVersion"); + return next.getVersion(changeID, minSupported, maxSupported); + } + + @Override + public void continueAsNew( + Optional workflowType, Optional options, Object[] args) { + trace.add("continueAsNew"); + next.continueAsNew(workflowType, options, args); + } + + @Override + public void registerQuery( + String queryType, Type[] argTypes, Functions.Func1 callback) { + trace.add("registerQuery " + queryType); + next.registerQuery(queryType, argTypes, callback); + } + + @Override + public UUID randomUUID() { + trace.add("randomUUID"); + return next.randomUUID(); + } + + @Override + public void upsertSearchAttributes(Map searchAttributes) { + trace.add("upsertSearchAttributes"); + next.upsertSearchAttributes(searchAttributes); + } + } +}