AMQP v1.0 Example Client

Table of Contents


Sample AMQP v1.0 Client - Producer (Sends Request Messages)

Properties Setup File

The code section below demonstrates a sample properties file for an AMQP v1.0 compliant client created using Apache Qpid's JMS implementation. For further details, please refer to here.

Sample "producer.properties" File
clientId = Shares-R-Us-ite1-Rqsts01
connectionUrl = failover:(amqps://ite1-amqp.asx.com.au:4nnn?transport.tcpKeepAlive=true&transport.trustStoreLocation=mypath/mytruststore.jks&transport.trustStorePassword=mypassword1&transport.contextProtocol=TLS&transport.keyStoreLocation=mypath/myClientKeystore.pfx&transport.keyStoreType=pkcs12&transport.keyStorePassword=mypassword2&jms.forceAsyncAcks=true)?failover.initialReconnectDelay=3000&failover.reconnectDelay=3000&failover.maxReconnectDelay=30000&failover.useReconnectBackOff=true&failover.reconnectBackOffMultiplier=2.0&failover.maxReconnectAttempts=20&failover.startupMaxReconnectAttempts=10&failover.warnAfterReconnectAttempts=10&failover.randomize=false&failover.amqpOpenServerListAction=IGNORE

# List all trx request queues, if multiple...
requestQueue01 = TestKit999.00001.ite1.posttrade.csp.amqp.trx.request

statePath = producer_desired_state.txt
msgsPerTransaction = 50

demoOnlyRunTimeExceptionsPerMillion = 0
demoOnlyNoMoreMsgsPerMillion = 100


Sample Java Client "Producer" Code

The code section below demonstrates a sample AMQP v1.0 compliant client created using Apache Qpid's JMS implementation.

Sample Command

You would need to compile and run the following java code.

Commands to Start and Stop the client:

   java -DpropertiesPath=producer.properties -jar amqp-asx-producer-1.0.jar

   echo 'stop' >producer_desired_state.txt



AMQP Client "AsxSampleAmqpProducerMain" Java Class
/**
 * The below code is a demonstration of one possible AMQP v1.0 compliant "Producer" client, to send ISO20022 request messages.
 * Apache Qpid's AMQP v1.0 compliant JMS client was used.
 * 
 * This sample has the following features but your version will need to fit in with your standards and architecture:
 * - It generates and sends mock signed-ISO20022 request messages to a queue identified in a property file
 * - The property-file path is passed to the sample as a System Property
 * - If you need to send to multiple trx.request queues, you should put each "sendRequestMessagesNN" method on a separate
 *   thread.  ASX recommends you only send to one queue per AMQP "Session".
 * - It is intended to run forever, even if there are network problems or ASX shuts down the AMQP service.  However,
 *   certain JMS/QPID exception types and, arbitrarily, the java RuntimeException, will cause the client to terminate.
 * - This sample looks in a file regularly to determine if it should continue running or stop.  To stop it, just copy
 *   the word 'stop' into the 'desired state' file.
 * - This sample uses Local Transactions so that it better fits a need to commit multiple database updates or wherever
 *   you would normally source your messages to be sent.  Transactions also offers improved SENDING performance
 *   as the messages are not actually sent to AMQP until the session-commit is invoked.
 * - The error handling ensures no messages are lost:
 *   - The source of the messages is rolled-back BEFORE AMQP is rolled-back if there is an error,
 *   - AMQP is committed BEFORE the source of the messages is committed.
 *   - ASX will detect and discard any duplicates caused by messages being resent
 * - The sample can be configured to randomly run out of messages to send, causing a smaller-than-normal transaction
 *   to be committed and to wait a few seconds to see if there are more messages yet to send.
 * - Similarly, it can be configured to randomly throw an exception demonstrate the unexpected termination of the client
 * 
 * NOTES:
 * - This sample code expects the Connection to be configured for reconnections should any errors occur.
 * - The Connection uses background threads to keep the network connection alive and to attempt reconnection
 * - This complicates sending/committing the messages as there is no way to determine if the connection is made
 *   without trying again to commit.
 *  
 **/

package au.com.asx.amqp.producer;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

/**
 * See https://github.com/apache/qpid-jms/blob/1.0.0/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Sender.java
 */
public class AsxSampleAmqpProducerMain {

    private static final Logger LOG = LoggerFactory.getLogger(AsxSampleAmqpProducerMain.class);
    private static final Random RANDOM = new Random();

    private static Properties properties;
    private static Long sentTotal01 = 0L;
    private static Long seqNo = 0L;

    private static Long msgsPerTransaction;
    private static Long demoOnlyRunTimeExceptionsPerMillion;
    private static Long demoOnlyNoMoreMsgsPerMillion;


    public static void main(String[] args) throws InterruptedException {
        LOG.info("ASX: Sample Java \"Qpid JMS AMQP Producer\"");
        LOG.info("=========================================");
        LOG.info("AsxSampleAmqpProducerMain.class: Starting");
        LOG.info("");

        properties = getProperties();

        msgsPerTransaction
                = Long.valueOf(properties.getProperty("msgsPerTransaction", "50L"));
        demoOnlyRunTimeExceptionsPerMillion
                = Long.valueOf(properties.getProperty("demoOnlyRunTimeExceptionsPerMillion", "10L"));
        demoOnlyNoMoreMsgsPerMillion
                = Long.valueOf(properties.getProperty("demoOnlyNoMoreMsgsPerMillion", "1000L"));

        setDesiredRunState("run"); // change state to "stop" to stop sending messages

        ConnectionFactory factory = new JmsConnectionFactory(properties.getProperty("connectionUrl"));
        JmsConnection connection = null;


        while ("run".equals(getDesiredRunState())) {
            // ...loop in case provided connection URL has no reconnection parameters so this code must reconnect

            try {

                LOG.info("=========================================");
                LOG.info(" - Creating connection...");
                connection = (JmsConnection) factory.createConnection();
                connection.setClientID(properties.getProperty("clientId", "Shares-R-Us-01"));
                connection.start();
                LOG.info("=========================================");

                // ------------------------------------------------------------------------------
                // For ASX, create a session per queue, as queues are not related to each other.
                // If multiple queues, create session and process each queue on a separate thread

                sendRequestMessages01(connection); // Send messages for queue 1

                // ------------------------------------------------------------------------------
                connection.close();


            } catch (Exception e) {

                logException("Connection-related exception", e);
                try {
                    connection.close();
                } catch (JMSException ex) {
                    LOG.info("Connection already closed");
                }
                rollBackBizMsgSource();
                long finish = System.currentTimeMillis();
                LOG.info("Session01 Totals: Sent " + sentTotal01 + " messages before disconnection.");
                if (isExceptionPermanent(e)) {
                    LOG.error("Above exception cannot be fixed by retrying.  Execution terminating");
                    System.exit(4);
                }
            }
            Thread.sleep(10000L);
        } // keep connecting, while 'run'...
    }

    private static void sendRequestMessages01(JmsConnection connection)
            throws JMSException, InterruptedException {

        LOG.info(" - Creating session01...");
        JmsSession session01 = (JmsSession) connection.createSession(true, Session.SESSION_TRANSACTED);

        JmsQueue rqstQueue01 = new JmsQueue(properties.getProperty("requestQueue01"));
        LOG.info(" - Creating session01 producer to queue01: " + rqstQueue01.getQueueName());
        MessageProducer messageProducer01 = session01.createProducer(rqstQueue01);
        LOG.info(" - Session session01 producer set up successfully");

        Boolean isConnected = true;
        Long start01 = System.currentTimeMillis();
        sentTotal01 = 0L;

        while ("run".equals(getDesiredRunState())) {

            if (isConnected) {
                try {
                    Long sentCommit = 0L;
                    while (sentCommit < msgsPerTransaction) {
                        DemoBizMsg bizMsg = getNextBizMsg();
                        if (bizMsg == null) {
                            LOG.info("No more requests to send for now.");
                            break;
                        }
                        TextMessage amqpMsg = session01.createTextMessage();
                        amqpMsg.setText(bizMsg.getSignedBizMsgXml());
                        amqpMsg.setStringProperty("BizMsgIdr", bizMsg.getBizMsgIdr());
                        amqpMsg.setStringProperty("correlationId",
                                bizMsg.getBizMsgIdr().replace('|', '-'));
                        messageProducer01.send(amqpMsg);
                        sentCommit++;
                        LOG.info("Sent " + (sentTotal01 + sentCommit)
                                + ": BizMsgIdr(" + amqpMsg.getStringProperty("correlationId")
                                + "),id(" + amqpMsg.getJMSMessageID()
                                + "),xml(" + amqpMsg.getText() + ").");
                    } // while sent within transaction
                    if (sentCommit > 0) {
                        LOG.info("Committing after " + sentCommit + " requests sent");
                        // Commit AMQP Session First to ensure saved by ASX, then commit database source
                        session01.commit();
                        commitBizMsgSource();
                        sentTotal01 = sentTotal01 + sentCommit;
                    } else {
                        Thread.sleep(5000L); // no messages to send.  Wait 5 seconds
                    }
                } catch (TransactionRolledBackException e) {
                    logException("AMQP Rolled back transaction", e);
                    // See https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q1/html-single/using_the_amq_jms_client/index#handling_unacknowledged_deliveries
                    // AMQP Already Rolled-Back, now roll-back database source
                    rollBackBizMsgSource();
                    LOG.warn("Rollback Complete. Will resend messages later if reconnect successful");
                    Thread.sleep(1000L);
                    isConnected = false;
                } catch (JMSException e) {
                    logException("Other JMSException Caught", e);
                    // Roll-Back Msg-Source first to ensure can be resent, then roll-back AMQP
                    rollBackBizMsgSource();
                    session01.rollback();
                    if (isExceptionPermanent(e)) {
                        throw e;
                    }
                } catch (Exception e) {
                    logException("Other Non-JMS Exception Caught", e);
                    // Roll-Back Msg-Source first to ensure can be resent, then roll-back AMQP
                    rollBackBizMsgSource();
                    session01.rollback();
                    throw e;
                }
            } else { // connection not connected
                LOG.warn("Not Connected. Check if Reconnected yet");
                try {
                    session01.commit(); // will fail if still disconnected
                    LOG.warn("Reconnected successfully.  Resume sending");
                    isConnected = true;
                } catch (TransactionRolledBackException e) {
                    LOG.warn("Still not connected");
                    Thread.sleep(1000);
                } catch (JMSException e) {
                    logException("Empty session.commit() to test reconnection caused unexpected exception", e);
                    throw e;
                }

            }
        } // while desired state is 'run'

        session01.close();
        long finish = System.currentTimeMillis();
        long taken = finish - start01;
        LOG.info("Session01 Totals: Sent " + sentTotal01 + " messages in " + (taken / 1000) + "secs");

    }


    // ==========================================================================================================
    // DEMONSTRATION/MOCK METHODS TO GET ISO20022 MESSAGES TO SEND, COMMIT OR ROLL-BACK THAT SOURCE OF MSGS
    // ==========================================================================================================

    /**
     * Get the next BizMsg ISO20022 message request to SEND to AMQP Queue, or null if no messages to send for a while
     * This might come from a database or another queue.  For this demonstration, null is returned randomly.
     * Note that if a rollback of wherever these messages came from occurred, this routine would need to return
     * the same rolled-back messages to resend them.  This simple Demonstration code doesn't, however.
     *
     * @return
     */
    private static DemoBizMsg getNextBizMsg() {
        int randNo = RANDOM.nextInt(1000000);
        if (randNo < demoOnlyNoMoreMsgsPerMillion) { // return null sometimes to test running out of msgs to send
            return null;
        }
        if (randNo > (1000000L - demoOnlyRunTimeExceptionsPerMillion)) { // throw runtime exception sometimes to test roll-backs where get-msg fails
            throw new RuntimeException("Mock runtime exception to test roll-backs of msg-source and AMQP");
        }
        return new DemoBizMsg(RANDOM.nextInt(89999));
    }

    /**
     * Commit source of BizMsgs as they have been successfully committed to AMQP.
     */
    private static void commitBizMsgSource() {
        LOG.info("Demonstration BizMsg Source committed! (ie. from other queue or a database, etc)");
    }

    /**
     * Roll back source of BizMsgs, such as if sending-application or AMQP has failed.
     */
    private static void rollBackBizMsgSource() {
        LOG.warn("Demonstration BizMsgSource rolled back! (ie. from other queue or a database, etc)");
    }

    private static Boolean isExceptionPermanent(Throwable e) {
        return (e instanceof InvalidClientIDException
                || e instanceof InvalidDestinationException
                || e instanceof InvalidSelectorException
                || e instanceof JMSSecurityException
                || e instanceof RuntimeException
        );
    }

    private static Properties getProperties() {
        Properties properties = new Properties();
        String pathStr = System.getProperty("propertiesPath");
        if (pathStr == null) {
            pathStr = "producer.properties";
        }
        Path propPath = Paths.get(pathStr);
        LOG.info("getProperties: propPath=" + pathStr + ",absolute=" + propPath.toAbsolutePath());
        try {
            FileChannel channel = FileChannel.open(propPath, StandardOpenOption.READ);
            properties.load(Channels.newInputStream(channel));
        } catch (IOException e) {
            logException("getProperties() failed", e);
            System.exit(4);
        }
        LOG.info("getProperties: properties=" + properties);
        return properties;
    }

    private static void logException(String msg, Throwable excp) {
        StringBuffer sb = new StringBuffer(10000);
        String prefix = msg + " - ";
        Throwable nextExcp = excp;
        while (nextExcp != null) {
            sb.append(prefix + excp);
            StackTraceElement[] stes = excp.getStackTrace();
            for (int i = 0; i < stes.length; i++) {
                sb.append("\n     at " + stes[i].toString());
            }
            nextExcp = nextExcp.getCause();
            prefix = "\n caused by ";
        }
        LOG.error(sb.toString());
    }
    // ==========================================================================================================
    // DEMONSTRATION/MOCK METHODS TO DETERMINE IF PROCESSING TO CONTINUE OR STOP
    // ==========================================================================================================

    /**
     * Get desired run-state, normally 'run', unless changed to 'stop' by some system-admin facility.
     * A file is used for this demonstration but state might be retrieved from an Environment-Variable
     * or a Singleton-object that can be set by some web-service triggered by your operations-automation
     * facility.
     *
     * @return state - 'run', 'stop' (Normally, 'run' but may be set to 'stop' at end-of-business-day.)
     */
    private static String getDesiredRunState() {
        Path path = Paths.get(properties.getProperty("statePath"));
        String state = null;
        try {
            state = Files.readAllLines(path).get(0);
        } catch (IOException e) {
            logException("State 'stop' assumed", e);
            state = "stop";
        }
        LOG.info("getDesiredRunState: (" + state + ")");
        return state;
    }

    /**
     * Sets run-state to 'run', just to make running this demonstration easier.
     *
     * @param state
     * @return
     */
    private static Path setDesiredRunState(String state) {
        Path path = Paths.get(properties.getProperty("statePath"));
        Path out = null;
        try {
            out = Files.write(path, state.getBytes(StandardCharsets.UTF_8),
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
            LOG.info("setDesiredRunState: (" + state + ") written to " + out);
        } catch (IOException e) {
            logException("Couldn't set state", e);
        }
        return out;
    }


    // ==========================================================================================================
    // PRIVATE CLASSES - JUST FOR THE PURPOSE OF THIS DEMONSTRATION
    // ==========================================================================================================

    /**
     * This is a class to represent an ISO20022 BizMsg, which has a randomly-generated BizMsgIdr and a mock XML body
     */
    private static class DemoBizMsg {

        private String bizMsgIdr;
        private Long seq = seqNo++;

        public DemoBizMsg(Integer uic) {
            this.bizMsgIdr = (10000 + uic) + "|" + UUID.randomUUID().toString(); // type 4 UUID
        }

        public String getBizMsgIdr() {
            return this.bizMsgIdr;
        }

        public String getSignedBizMsgXml() {
            return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><BizMsg><AppHdr><Fr>from</Fr><To>to</To><BizMsgIdr>"
                    + this.bizMsgIdr
                    + "</BisMsgIdr>"
                    + "<Sgntr><Signature>asdfasdfasdfasdfasdf098098098098098098"
                    + seq + "</Signature></Sgntr>"
                    + "</AppHdr><Document><blah>blah blah $1234.56</blah><xyz>XYZ blah blah blah</xyz></Document>";
        }
    }
}

Sample AMQP v1.0 Client - Consumer (Receives Notify Messages)

Properties Setup File

The code section below demonstrates a sample properties file for an AMQP v1.0 compliant client created using Apache Qpid's JMS implementation. For further details, please refer to here

Sample "consumer.properties" File
clientId = Shares-R-Us-ite1-Notifs01
connectionUrl = failover:(amqps://ite1-amqp.asx.com.au:4nnn?transport.tcpKeepAlive=true&transport.trustStoreLocation=mypath/mytruststore.jks&transport.trustStorePassword=mypassword1&transport.contextProtocol=TLS&transport.keyStoreLocation=mypath/myClientKeystore.pfx&transport.keyStoreType=pkcs12&transport.keyStorePassword=mypassword2&jms.forceAsyncAcks=true)?failover.initialReconnectDelay=3000&failover.reconnectDelay=3000&failover.maxReconnectDelay=30000&failover.useReconnectBackOff=true&failover.reconnectBackOffMultiplier=2.0&failover.maxReconnectAttempts=20&failover.startupMaxReconnectAttempts=10&failover.warnAfterReconnectAttempts=10&failover.randomize=false&failover.amqpOpenServerListAction=IGNORE

# List all trx and rpt notify queues, if multiple...
notifyQueue01 = TestKit999.00001.ite1.posttrade.csp.amqp.trx.notify

statePath = consumer_desired_state.txt
msgsPerTransaction = 50

demoOnlyRunTimeExceptionsPerMillion = 0


Sample Java Client "Consumer" Code

The code section below demonstrates a sample AMQP v1.0 compliant client created using Apache Qpid's JMS implementation.

Sample Command

You would need to compile and run the following java code.

Commands to Start and Stop the client:

   java -DpropertiesPath=consumer.properties -jar amqp-asx-consumer-1.0.jar

   echo 'stop' >consumer_desired_state.txt



AMQP Client "AsxSampleAmqpConsumerMain" Java Class
/**
 * The below code is a demonstration of one possible AMQP v1.0 compliant "Consumer" client, to receive ISO20022 notify messages.
 * Apache Qpid's AMQP v1.0 compliant JMS client was used.
 *
 * This sample has the following features but your version will need to fit in with your standards and architecture:
 * - It receives Signed ISO20022 notify messages from a queue identified in a property file
 * - The property-file path is passed to the sample as a System Property
 * - If you need to receive from multiple trx.notify or rpt.notify queues, you should put each "receiveTrxNotifyMessages01"
 *   or "receiveRptNotifyMessages01" method on a separate thread.  ASX recommends you only receive from one queue per
 *   AMQP "Session".
 * - It is intended to run forever, even if there are network problems or ASX shuts down the AMQP service.  However,
 *   certain JMS/QPID exception types and, arbitrarily, the java RuntimeException, will cause the client to terminate.
 * - This sample looks in a file regularly to determine if it should continue running or stop.  To stop it, just copy
 *   the word 'stop' into the 'desired state' file.
 * - This sample uses Local Transactions so that it better fits a need to commit multiple database updates or wherever
 *   you would normally save or process your received messages.
 * - Although AMQP Transactions are not necessary, they give you more control if you need to roll-messages back onto
 *   the notify queue, ready to receive/reprocess them again.
 * - The error handling ensures no messages are lost:
 * - The store or processing of the messages is rolled-back AFTER AMQP is rolled-back if there is an error,
 * - AMQP is committed AFTER the store or process of the messages is committed.
 * - You must detect and discard any duplicates caused by messages being resent.
 * - The sample can be configured to randomly throw an exception demonstrate the unexpected termination of the client
 *
 * NOTES:
 * - This sample code expects the Connection to be configured for reconnections should any errors occur.
 * - The Connection uses background threads to keep the network connection alive and to attempt reconnection
 * - This complicates receiving/committing the messages as there is no way to determine if the connection is made
 * without trying again to commit.
 **/
package au.com.asx.amqp.consumer;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Properties;
import java.util.Random;

/**
 * See https://github.com/apache/qpid-jms/blob/1.0.0/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Sender.java
 */
public class AsxSampleAmqpConsumerMain {

    private static final Logger LOG = LoggerFactory.getLogger(AsxSampleAmqpConsumerMain.class);
    private static final Random RANDOM = new Random();

    private static Properties properties;
    private static Long receivedTotal01 = 0L;
    private static Long seqNo = 0L;

    private static Long msgsPerTransaction;
    private static Long amqpWaitTimeMs;
    private static Long demoOnlyRunTimeExceptionsPerMillion;


    public static void main(String[] args) throws InterruptedException {
        LOG.info("ASX: Sample Java \"Qpid JMS AMQP Consumer\"");
        LOG.info("=========================================");
        LOG.info("AsxSampleAmqpConsumerMain.class: Starting");
        LOG.info("");

        properties = getProperties();

        msgsPerTransaction
                = Long.valueOf(properties.getProperty("msgsPerTransaction", "50L"));
        amqpWaitTimeMs
                = Long.valueOf(properties.getProperty("amqpWaitTimeMs", "5000L"));
        demoOnlyRunTimeExceptionsPerMillion
                = Long.valueOf(properties.getProperty("demoOnlyRunTimeExceptionsPerMillion", "0L"));

        setDesiredRunState("run"); // change state to "stop" to stop receiving messages

        ConnectionFactory factory = new JmsConnectionFactory(properties.getProperty("connectionUrl"));
        JmsConnection connection = null;


        while ("run".equals(getDesiredRunState())) {
            // ...loop in case provided connection URL has no reconnection parameters so this code must reconnect

            try {

                LOG.info("=========================================");
                LOG.info(" - Creating connection...");
                connection = (JmsConnection) factory.createConnection();
                connection.setClientID(properties.getProperty("clientId", "Shares-R-Us-01"));
                connection.start();
                LOG.info("=========================================");

                // ------------------------------------------------------------------------------
                // For ASX, create a session per queue, as queues are not related to each other.
                // If multiple queues, create session and process each queue on a separate thread

                receiveTrxNotifyMessages01(connection); // Receive messages from queue 1

                // ------------------------------------------------------------------------------
                connection.close();


            } catch (Exception e) {

                logException("Connection-related exception", e);
                try {
                    connection.close();
                } catch (JMSException ex) {
                    LOG.info("Connection already closed");
                }
                rollBackBizMsgProcess();
                long finish = System.currentTimeMillis();
                LOG.info("Session01 Totals: Received " + receivedTotal01 + " messages before disconnection.");
                if (isExceptionPermanent(e)) {
                    LOG.error("Above exception cannot be fixed by retrying.  Execution terminating");
                    System.exit(4);
                }
            }
            Thread.sleep(10000L);
        } // keep connecting, while 'run'...
    }

    private static void receiveTrxNotifyMessages01(JmsConnection connection)
            throws JMSException, InterruptedException {

        LOG.info(" - Creating session01...");
        JmsSession session01 = (JmsSession) connection.createSession(true, Session.SESSION_TRANSACTED);

        JmsQueue notifyQueue01 = new JmsQueue(properties.getProperty("notifyQueue01"));
        LOG.info(" - Creating session01 consumer to queue01: " + notifyQueue01.getQueueName());
        MessageConsumer messageConsumer01 = session01.createConsumer(notifyQueue01);
        LOG.info(" - Session session01 consumer set up successfully");

        Boolean isConnected = true;
        Long start01 = System.currentTimeMillis();
        receivedTotal01 = 0L;
        Object msgObj;
        TextMessage message = null;

        while ("run".equals(getDesiredRunState())) {

            if (isConnected) {
                try {
                    msgObj = ""; // not null
                    Long receivedCommit = 0L;
                    while (msgObj != null && receivedCommit < msgsPerTransaction) {
                        msgObj = messageConsumer01.receive(amqpWaitTimeMs);
                        if (msgObj != null) {
                            receivedCommit++;
                            if (msgObj instanceof TextMessage) {
                                message = (TextMessage) msgObj;
                                LOG.info("Received message " + (receivedTotal01 + receivedCommit)
                                        + ": id=" + safe(message.getJMSMessageID())
                                        + ",BizMsgIdr=" + safe(message.getStringProperty("BizMsgIdr"))
                                        + ",correlId=" + safe(message.getStringProperty("correlationId"))
                                        + ",body=" + safe(message.getText())
                                );
                                processBizMsg(message);
                            } else {
                                LOG.error("Rejecting message of wrong type: " + msgObj.getClass().getCanonicalName());
                            }
                        } // not null
                    } // while received within transaction
                    if (receivedCommit > 0) {
                        LOG.info("Committing after " + receivedCommit + " notifications received");
                        // Commit Message Save/Process before AMQP Session to ensure no messages lost
                        commitBizMsgProcess();
                        session01.commit();
                        receivedTotal01 = receivedTotal01 + receivedCommit;
                    } else {
                        Thread.sleep(5000L); // no messages received.  Wait 5 seconds before trying again
                    }
                } catch (TransactionRolledBackException e) {
                    logException("AMQP Rolled back transaction", e);
                    // See https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q1/html-single/using_the_amq_jms_client/index#handling_unacknowledged_deliveries
                    // AMQP Already Rolled-Back, now roll-back msg-process (eg. database updates)
                    rollBackBizMsgProcess();
                    LOG.warn("Rollback Complete. Will re-receive same messages later if reconnect successful");
                    Thread.sleep(1000L);
                    isConnected = false;
                } catch (JMSException e) {
                    logException("Other JMSException Caught", e);
                    // Roll-Back messages onto queue first before rolling-back processed/saved messages, to ensure
                    // can be re-received later.  The worst that could then happen is that duplicates are processed.
                    session01.rollback();
                    rollBackBizMsgProcess();
                    if (isExceptionPermanent(e)) {
                        throw e;
                    }
                } catch (Exception e) {
                    logException("Other Non-JMS Exception Caught", e);
                    // Roll-Back Msg-Source first to ensure can be rereceived, then roll-back AMQP
                    rollBackBizMsgProcess();
                    session01.rollback();
                    throw e;
                }
            } else { // connection not connected
                LOG.warn("Not Connected. Check if Reconnected yet");
                try {
                    session01.commit(); // will fail if still disconnected
                    LOG.warn("Reconnected successfully.  Resume receiving");
                    isConnected = true;
                } catch (TransactionRolledBackException e) {
                    LOG.warn("Still not connected");
                    Thread.sleep(1000);
                } catch (JMSException e) {
                    logException("Empty session.commit() to test reconnection caused unexpected exception", e);
                    throw e;
                }

            }
        } // while desired state is 'run'

        session01.close();
        long finish = System.currentTimeMillis();
        long taken = finish - start01;
        LOG.info("Session01 Totals: Received " + receivedTotal01 + " messages in " + (taken / 1000) + "secs");

    }


    // ==========================================================================================================
    // DEMONSTRATION/MOCK METHODS TO PROCESS OR SAVE ISO20022 MESSAGES, COMMIT OR ROLL-BACK THAT MSG-PROCESS
    // ==========================================================================================================

    /**
     * Process or Save the received BizMsg ISO20022 notify message.
     * Note:
     * - If there is something wrong with the message (eg. signature doesn't match) that prevents processing it, the
     *   message should be logged and ASX should be contacted.  Do not throw an exception as re-processing the same
     *   message won't fix it.
     * - If something environmental prevents the message being processed, an Exception can be thrown to
     *   cause the messages to be "rolled-back" onto the queue, to be reprocessed again later. (Not a
     *   "RuntimeException" in this example, as this exception type causes the entire process to terminate)
     * - This demonstration randomly throws these exceptions to emulate an error requiring a roll-back and terminate
     *   the entire client.
     *
     * @return
     */
    private static void processBizMsg(TextMessage message) {
        int randNo = RANDOM.nextInt(1000000);
        if (randNo > (1000000L - demoOnlyRunTimeExceptionsPerMillion)) { // throw runtime exception sometimes to test roll-backs where get-msg fails
            throw new RuntimeException("Mock runtime exception to test roll-backs of msg-process/store and AMQP");
        }
    }

    /**
     * Commit source of BizMsgs as they have been successfully committed to AMQP.
     */
    private static void commitBizMsgProcess() {
        LOG.info("Demonstration BizMsg Process committed! (ie. Notify msgs processed or saved successfully)");
    }

    /**
     * Roll back storage or processing of BizMsgs, such as if message-processing or AMQP has failed.
     */
    private static void rollBackBizMsgProcess() {
        LOG.warn("Demonstration BizMsg Process rolled back! (ie. Notify msg-processing or save backed-out)");
    }

    private static Boolean isExceptionPermanent(Throwable e) {
        return (e instanceof InvalidClientIDException
                || e instanceof InvalidDestinationException
                || e instanceof InvalidSelectorException
                || e instanceof JMSSecurityException
                || e instanceof RuntimeException
        );
    }

    private static Properties getProperties() {
        Properties properties = new Properties();
        String pathStr = System.getProperty("propertiesPath");
        if (pathStr == null) {
            pathStr = "consumer.properties";
        }
        Path propPath = Paths.get(pathStr);
        LOG.info("getProperties: propPath=" + pathStr + ",absolute=" + propPath.toAbsolutePath());
        try {
            FileChannel channel = FileChannel.open(propPath, StandardOpenOption.READ);
            properties.load(Channels.newInputStream(channel));
        } catch (IOException e) {
            logException("getProperties() failed", e);
            System.exit(4);
        }
        LOG.info("getProperties: properties=" + properties);
        return properties;
    }

    private static void logException(String msg, Throwable excp) {
        StringBuffer sb = new StringBuffer(10000);
        String prefix = msg + " - ";
        Throwable nextExcp = excp;
        while (nextExcp != null) {
            sb.append(prefix + excp);
            StackTraceElement[] stes = excp.getStackTrace();
            for (int i = 0; i < stes.length; i++) {
                sb.append("\n     at " + stes[i].toString());
            }
            nextExcp = nextExcp.getCause();
            prefix = "\n caused by ";
        }
        LOG.error(sb.toString());
    }

    private static String safe(String str) {
        return (str == null) ? "" : str;
    }


    // ==========================================================================================================
    // DEMONSTRATION/MOCK METHODS TO DETERMINE IF PROCESSING TO CONTINUE OR STOP
    // ==========================================================================================================

    /**
     * Get desired run-state, normally 'run', unless changed to 'stop' by some system-admin facility.
     * A file is used for this demonstration but state might be retrieved from an Environment-Variable
     * or a Singleton-object that can be set by some web-service triggered by your operations-automation
     * facility.
     *
     * @return state - 'run', 'stop' (Normally, 'run' but may be set to 'stop' at end-of-business-day.)
     */
    private static String getDesiredRunState() {
        Path path = Paths.get(properties.getProperty("statePath"));
        String state = null;
        try {
            state = Files.readAllLines(path).get(0);
        } catch (IOException e) {
            logException("State 'stop' assumed", e);
            state = "stop";
        }
        LOG.info("getDesiredRunState: (" + state + ")");
        return state;
    }

    /**
     * Sets run-state to 'run', just to make running this demonstration easier.
     *
     * @param state
     * @return
     */
    private static Path setDesiredRunState(String state) {
        Path path = Paths.get(properties.getProperty("statePath"));
        Path out = null;
        try {
            out = Files.write(path, state.getBytes(StandardCharsets.UTF_8),
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
            LOG.info("setDesiredRunState: (" + state + ") written to " + out);
        } catch (IOException e) {
            logException("Couldn't set state", e);
        }
        return out;
    }

}




Related Pages:

There are no related labels.

Browse Popular Pages:

No labels match these criteria.



This document provides general information only. ASX Limited (ABN 98 008 624 691) and its related bodies corporate (“ASX”) makes no representation or warranty with respect to the accuracy, reliability or completeness of the information. To the extent permitted by law, ASX and its employees, officers and contractors shall not be liable for any loss or damage arising in any way (including by way of negligence) from or in connection with any information provided or omitted or from anyone acting or refraining to act in reliance on this information.

© 2022 ASX Limited ABN 98 008 624 691