Skip to content

Commit

Permalink
Add resequence message store sample
Browse files Browse the repository at this point in the history
  • Loading branch information
pamod committed Nov 24, 2023
1 parent 94b9016 commit 77fc132
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -139,14 +140,14 @@ private long readStartId() {
String storeName = this.getName();
final String lastProcessIdSelectStatement = "SELECT " + ResequenceMessageStoreConstants.SEQ_ID + " FROM " +
ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " WHERE " +
ResequenceMessageStoreConstants.STATEMENT_COLUMN + "=" + "\"" + storeName + "\"";

ResequenceMessageStoreConstants.STATEMENT_COLUMN + "= ?";
Statement statement = new Statement(lastProcessIdSelectStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
return startIdSelectionResult(resultSet);
}
};
statement.addParameter(storeName);
List<Map> processedRows = getProcessedRows(statement);
if (processedRows.size() > minimumRowCount) {
final int firstIndex = 0;
Expand Down Expand Up @@ -216,6 +217,42 @@ private Long getMessageSequenceId(MessageContext message) throws StoreException
return Long.parseLong(sequenceIdValue);
}

/**
* Will get the current message belonging to a sequence.
*
* @return the current message in the sequence.
*/
private MessageContext getCurrentMessage() {
MessageContext msg = null;
final int firstRowIndex = 0;
try {
String tableName = getJdbcConfiguration().getTableName();
String selectMessageStatement = "SELECT message FROM " + tableName + " WHERE "
+ ResequenceMessageStoreConstants.SEQ_ID + "= ?";
Statement statement = new Statement(selectMessageStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
return messageContentResultSet(resultSet, this.getStatement());
}
};
statement.addParameter(nextSequenceId - 1);
List<Map> processedRows = getProcessedRows(statement);
if (!processedRows.isEmpty()) {
msg = getMessageContext(processedRows, firstRowIndex);
if (log.isTraceEnabled()) {
log.trace("Message with id " + msg.getMessageID() + " returned for sequence " + nextSequenceId);
}
} else {
if (log.isTraceEnabled()) {
log.trace("Sequences not returned from DB, next sequence will be:" + nextSequenceId);
}
}
} catch (SynapseException ex) {
throw new SynapseException("Error while peek the message", ex);
}
return msg;
}

/**
* Will get the next message belonging to a sequence.
*
Expand Down Expand Up @@ -276,25 +313,41 @@ protected List<Statement> removeMessageStatement(String msgId) {
final String deleteMessageStatement = "DELETE FROM " + getJdbcConfiguration().getTableName()
+ " WHERE msg_id=?";
final String insertLastProcessIdStatement = "INSERT INTO " +
ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME
+ " (statement,seq_id) VALUES (?,?) ON DUPLICATE KEY UPDATE seq_id = ?";
Statement sequenceIdUpdateStatement = new Statement(insertLastProcessIdStatement) {
ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " SELECT ?, ?" +
" FROM " + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " WHERE statement = ? " +
"HAVING COUNT(*) = 0";

final String updateLastProcessIdStatement = "UPDATE " + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME +
" SET statement = ? , seq_id = ? WHERE statement = ?";
Statement sequenceIdInsertStatement = new Statement(insertLastProcessIdStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
throw new UnsupportedOperationException();
}
};

Statement sequenceIdUpdateStatement = new Statement(updateLastProcessIdStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
throw new UnsupportedOperationException();
}
};

Statement deleteMessage = new Statement(deleteMessageStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
throw new UnsupportedOperationException();
}
};
deleteMessage.addParameter(msgId);
sequenceIdInsertStatement.addParameter(messageStoreName);
sequenceIdInsertStatement.addParameter(messageSequenceId);
sequenceIdInsertStatement.addParameter(messageStoreName);
sequenceIdUpdateStatement.addParameter(messageStoreName);
sequenceIdUpdateStatement.addParameter(messageSequenceId);
sequenceIdUpdateStatement.addParameter(messageSequenceId);
sequenceIdUpdateStatement.addParameter(messageStoreName);
statements.add(deleteMessage);
statements.add(sequenceIdInsertStatement);
statements.add(sequenceIdUpdateStatement);
if (log.isDebugEnabled()) {
log.debug("Removing message with id:" + msgId + " and last process id:" + messageSequenceId);
Expand Down Expand Up @@ -372,14 +425,14 @@ private List<Map> getMessageWithMinimumId(ResultSet resultSet, String statement)
private MessageContext getMessageWithMinimumSequence() {
String tableName = getJdbcConfiguration().getTableName();
String selectMinimumSequenceIdStatement = "SELECT message,seq_id FROM " + tableName + " WHERE "
+ ResequenceMessageStoreConstants.SEQ_ID + "=(SELECT min("
+ ResequenceMessageStoreConstants.SEQ_ID + ")" + " from " + tableName + ")";
+ ResequenceMessageStoreConstants.SEQ_ID + "=(SELECT min(?)" + " from " + tableName + ")";
Statement stmt = new Statement(selectMinimumSequenceIdStatement) {
@Override
public List<Map> getResult(ResultSet resultSet) throws SQLException {
return getMessageWithMinimumId(resultSet, this.getStatement());
}
};
stmt.addParameter(ResequenceMessageStoreConstants.SEQ_ID);
MessageContext msg = null;
final int firstRowIndex = 0;
try {
Expand All @@ -388,6 +441,7 @@ public List<Map> getResult(ResultSet resultSet) throws SQLException {
msg = getMessageContext(processedRows, firstRowIndex);
long sequenceId = getSequenceId(processedRows, firstRowIndex);
nextSequenceId = sequenceId + 1;

if (log.isTraceEnabled()) {
log.trace("Message with id " + msg.getMessageID() + " returned as the minimum, the minimum " +
"sequence " + "will be marked as " + nextSequenceId);
Expand Down Expand Up @@ -429,6 +483,20 @@ private boolean shouldWait() {
return nextElapsedTime < 0 || currentTime <= nextElapsedTime;
}

/**
* {@inheritDoc}
*/
@Override
public MessageContext poll() {
MessageContext messageContext = getCurrentMessage();
messageContext = remove(messageContext.getMessageID());
if (messageContext != null) {
return messageContext;
} else {
throw new NoSuchElementException("First element not found and remove failed !");
}
}

/**
* {@inheritDoc}
*/
Expand Down
69 changes: 69 additions & 0 deletions repository/conf/sample/synapse_sample_707.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<!-- Introduction to Synapse Resequence message store -->

<definitions xmlns="http://ws.apache.org/ns/synapse">
<endpoint name="StockQuoteServiceEp">
<address uri="http://localhost:9000/services/SimpleStockQuoteService">
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<progressionFactor>1.0</progressionFactor>
</suspendOnFailure>
</address>
</endpoint>
<sequence name="fault">
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<drop/>
</sequence>
<sequence name="main">
<in>
<log level="full"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="OUT_ONLY" value="true"/>
<property name="target.endpoint" value="StockQuoteServiceEp"/>
<store messageStore="MyStore"/>
</in>
<description>The main sequence for the message mediation</description>
</sequence>
<messageStore xmlns="http://ws.apache.org/ns/synapse"
class="org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore"
name="MyStore">
<parameter name="store.resequence.timeout">-1</parameter>
<parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
<parameter name="store.failover.message.store.name">MyStore</parameter>
<parameter xmlns:m0="http://services.samples"
name="store.resequence.id.path"
expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/>
<parameter name="store.jdbc.password">synapse</parameter>
<parameter name="store.jdbc.driver">org.apache.derby.jdbc.ClientDriver</parameter>
<parameter name="store.jdbc.username">synapse</parameter>
<parameter name="store.jdbc.connection.url">jdbc:derby://localhost:1527/synapsedb</parameter>
<parameter name="store.jdbc.table">tbl_resequence</parameter>
</messageStore>
<messageProcessor class="org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor"
name="ScheduledProcessor" messageStore="MyStore">
<parameter name="interval">10000</parameter>
</messageProcessor>
</definitions>

0 comments on commit 77fc132

Please sign in to comment.