AMQP Protocol Handler (amqp://)

A-Stack supports AMQP protocol Handler, that can be used to subscribe/publish various messages to an AMQP broker. 

AMQP Protocol Handler Flow:



AMQP Protocol Handler Installation: 

DateDownload LocationversionChange Log
04/11/2018Download1.1.1
  1. Auto Reconnection of the client in case of network disconnects and broker restarts.
04/17/2018Download1.1.2
  1. Handling exception when trying to close a channel that is already closed.
  2. Fix in the log message when the client is disconnected by the user
09/08/2019Download1.1.3
  1. Back Pressure Algorithm
10/06/2019 

sff.bundle.ext.amqp-2.0.0.jar

amqp-client-5.7.3.jar

amqptqlexample.zip


2.0.0
  1. A-Stack 2.1 Compatible; Migrate to MapList
  2. Please note that both jars must be placed in the sff.auto.launch folder 
7/17/2020Download2.0.0
  1. Upgraded all Jars to Engine version 2.0.0.

AMQP Setup Instructions


  1. Unzip the downloaded zip file
  2. Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.


AMQP Protocol Parameters:

The table below lists all the parameters of Amqp protocol handler and their description

Parameter Name

Parameter Description

ClientType

Whether the client is a publisher or a subscriber.

HostName

The IP address or Dns name of AMQP Broker.

PortNumber

The port number on which Amqp broker is running.

ExchangeName

The name of the exchange to which messages are to be sent or to which a queue would bind.

ExchangeType

The type of the exchange, it can be either fanout, direct, topic or headers.

QueueName

This parameter is used in case of subscriber and is the name of the queue to be created.

RoutingKey

This parameter is used in case of publisher and is the routing Key or topic key of message

to be published.

AMQPSubscriptionConfigThis parameter is used in case of subscriber and is cardinality of routing keys. The subscriber queue is binded to these routing keys.
UserNameThe User Name for AMQP Connection.
PasswordThe Password for the AMQP Connection.
VirtualHostThe Name of the virtual host to which the client belongs.
AMQPData

This parameter is used in case of publisher and is the message that you want to publish.

SubscriptionPayloadThe Messages of your subscribed topics are stored in this variable.

Durability

An optional boolean parameter deciding whether a declared exchange or queues should be durable or not, by default it is false. Durable queues and exchanges are persistent across server restarts.
AutoReconnectAn optional boolean parameter deciding whether a client should automatically reconnect to broker if connection is lost, by default it is true.


Usage of AMQP protocol handler:

  • AMQP Publish:

    AMQP Publish Facet
      <ThingFacet Name="AMQPWriteFacet">
                <String Name="AMQPData" KnownBy="AMQPWriteAction"/>
                <String Name="ClientType"/>
                <String Name="UserName"/>
                <String Name="Password"/>
                <String Name="VirtualHost"/>
                <String Name="HostName"/>
                <Integer Name="PortNumber"/>
                <String  Name="ExchangeName"/>
                <String Name="RoutingKey"/>
                <String Name="QueueName" default=""/>
                <String Name="ExchangeType"/>
                <Integer Name="Priority" default=""/>
                <String Name="ContentType" default=""/>
                <Action Name="AMQPWriteAction">
                    <Workflow Limit="1" Live="1" Timeout="-1">
                        <Task name="Main" >
                            <Event as="ActionArgument" name="Argument"/>
                            <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&amp; 
                                UserName=[%:Event.Argument.UserName.Value:%]&amp;
                                Password=[%:Event.Argument.Password.Value:%]&amp;
                                VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&amp;
                                HostName=[%:Event.Argument.HostName.Value:%]&amp;
                                PortNumber=[%:Event.Argument.PortNumber.Value:%]&amp;
                                ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;
                                QueueName=[%:Event.Argument.QueueName.Value:%]&amp;
                                ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument" name="InvokeAMQPWrite" scope="local">
                                <Message>
                                    <Value>
                                        <PublishMessage>"[%:Event.Argument.AMQPData.Value:%]"</PublishMessage>
                                        <RoutingKey>[%:Event.Argument.RoutingKey.Value:%]</RoutingKey>
                                        <Priority>[%:Event.Argument.Priority.Value:%]</Priority>
                                        <ContentType>[%:Event.Argument.ContentType.Value:%]</ContentType>
                                    </Value>
                                </Message>
                            </Invoke>
                            <Output As="ActionResult" Name="Result">
                                <Value>
                                    <AMQPData>[%:Event.Argument.AMQPData.Value:%]</AMQPData>
                                </Value>
                            </Output>
                        </Task>
                    </Workflow>
                </Action>
            </ThingFacet>
    Publish Initialization Query
    <Query>
      <DeleteAll>
        <AMQPWriteModel>
          <writeId ne=""/>
        </AMQPWriteModel>
      </DeleteAll>
      <Create>
        <AMQPWriteModel>
          <ClientType>
            publisher
          </ClientType>
          <HostName>
            localhost
          </HostName>
          <PortNumber>
            5672
          </PortNumber>
          <ExchangeName>
            AtomitonExchange
          </ExchangeName>
          <ExchangeType>
            topic
          </ExchangeType>
          <RoutingKey>
            sampleKey.error
          </RoutingKey>
          <AMQPData>
            HelloWorld
          </AMQPData>
          <UserName>
            $Null()
          </UserName>
          <Password>
            $Null()
          </Password>
          <VirtualHost>
            $Null()
          </VirtualHost>
          <Priority>
            $Null()
          </Priority>
          <ContentType>
            $Null()
          </ContentType>
        </AMQPWriteModel>
      </Create>
    </Query>
    Publish Update Query
    <Query>
      <Find format="version,known">
        <AMQPWriteModel>
           <writeId ne=""/>
        </AMQPWriteModel>
      </Find>
      <SetResponseData>
        <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key>
        <value>HelloWorld</value>
      </SetResponseData>
      <Update>
        <from>Result</from>
        <Include>$Response.Message.Value.Find</Include>
      </Update>
    </Query>
  • AMQP Subscribe

    AMQP Subscribe Facet
    <Def Name="AMQPSubscribe">
       <String Name="RoutingKey" Cardinality="0..m"/>
     </Def>
     
     <ThingFacet Name="AMQPSubscriberFacet">
          <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" />
          <String Name="ClientType"/>
          <String Name="UserName"/>
          <String Name="Password"/>
          <String Name="VirtualHost"/>
          <String Name="HostName"/>
          <Integer Name="PortNumber"/>
          <String  Name="ExchangeName"/>
          <String Name="QueueName" default=""/>
          <Boolean Name="Durability"/>
          <Boolean Name="AutoReconnect"/>
          <String Name="ExchangeType"/>
          <AMQPSubscribe Name="AMQPSubscriptionConfig"/>
          <Action Name="AMQPSubscriberAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
              <Task name="Main" while="true">
                <Event as="ActionArgument" name="Argument"/>
                <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&amp; 
                              UserName=[%:Event.Argument.UserName.Value:%]&amp;
                              Password=[%:Event.Argument.Password.Value:%]&amp;
                              VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&amp;
                              HostName=[%:Event.Argument.HostName.Value:%]&amp;
                              PortNumber=[%:Event.Argument.PortNumber.Value:%]&amp;
                              ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;
                              QueueName=[%:Event.Argument.QueueName.Value:%]&amp;
                              ExchangeType=[%:Event.Argument.ExchangeType.Value:%]&amp;
                              Durability=[%:Event.Argument.Durability.Value:%]&amp;
    						  AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" 
    					scope="process" waitFor="Argument" name="InvokeAMQPSubscription">
                  <Message>
                    <Value>
                      <AMQPSubscriptionConfig>
                        [%:Event.Argument.AMQPSubscriptionConfig:%]
                      </AMQPSubscriptionConfig>
                    </Value>
                  </Message>
                </Invoke>
                <Output As="ActionResult" Name="Result">
                  <Value>
                    <SubscriptionPayload>
                      <Include>
                        Invoke.InvokeAMQPSubscription.Message.Value
                      </Include>
                    </SubscriptionPayload>
                  </Value>
                </Output>
              </Task>
            </Workflow>
          </Action>
        </ThingFacet>
    

    Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing. 


    Subscribe Initialize Query
    <Query>
      <DeleteAll>
        <AMQPSubscriberModel>
          <subscribeId ne=""/>
        </AMQPSubscriberModel>
      </DeleteAll>
      <Create>
        <AMQPSubscriberModel>
          <SubscriptionPayload>
            $Null()
          </SubscriptionPayload>
          <ClientType>
            subscriber
          </ClientType>
          <HostName>
            localhost
          </HostName>
          <PortNumber>
            5672
          </PortNumber>
          <UserName>
            $Null()
          </UserName>
          <Password>
            $Null()
          </Password>
          <VirtualHost>
            $Null()
          </VirtualHost>
          <ExchangeName>
            AtomitonExchange
          </ExchangeName>
          <QueueName>
            sample
          </QueueName>
          <Durability>
            true
          </Durability>
          <ExchangeType>
            topic
          </ExchangeType>
          <AMQPSubscriptionConfig>
            <RoutingKey>
              sampleKey.*
            </RoutingKey>
          </AMQPSubscriptionConfig>
          <AutoReconnect>true</AutoReconnect>
        </AMQPSubscriberModel>
      </Create>
    </Query>
    Delete Subscription Query
    <Query>
    	<DeleteAll>
    		<AMQPSubscriberModel>
    			<subscribeId ne=""/>
    		</AMQPSubscriberModel>
    	</DeleteAll>
    </Query>
  • AMQP Subscription Message Output Format: The output message of amqp subscription is an xml structure that has fields RoutingKey, Message, contentType and DeliveryTag.

    XML output format
    <Known>
    	<RoutingKey>sampleKey.error</RoutingKey>
        <Message>HelloWorld</Message>
        <contentType>text/plain</contentType>
        <DeliveryTag>1</DeliveryTag>
    </Known>