Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

A-Stack supports AMQP protocol Handler, that can be used to subscribe/publish various messages to an AMQP broker. 

...

AMQP Protocol Handler Installation: Download the protocol handlers zip file from the link below

http://sandbox.atomiton.com:8080/fid-downloads/res/downloads/protocolhandlers.zip

once downloaded unzip it.  After you unzip it go to protocolhandlers/Amqp directory and copy the

DateDownload LocationversionChange Log
04/11/2018Download1.1.1
  1. Auto Reconnection of the client in case of network disconnects and broker restarts.
04/17/2018Download1.1.2
  1. Handling exception when trying to close a channel that is already closed.
  2. Fix in the log message when the client is disconnected by the user
09/08/2019Download1.1.3
  1. Back Pressure Algorithm
10/06/2019 

sff.bundle.ext.amqp

.jar from this location to

-2.0.0.jar

amqp-client-5.7.3.jar

amqptqlexample.zip


2.0.0
  1. A-Stack 2.1 Compatible; Migrate to MapList
  2. Please note that both jars must be placed in the sff.auto.launch
folder which is present in your A-Stack folder.

AMQP Protocol Parameters:

The table below lists all the parameters of Amqp protocol handler and their description

...

Parameter Name

...

Parameter Description

...

ClientType

...

Whether the client is a publisher or a subscriber.

...

HostName

...

The IP address or Dns name of AMQP Broker.

...

PortNumber

...

The port number on which Amqp broker is running.

...

ExchangeName

...

  1. folder 
7/17/2020Download2.0.0
  1. Upgraded all Jars to Engine version 2.0.0.

   

AMQP Setup Instructions


  1. Unzip the downloaded zip file
  2. Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.


AMQP Protocol Parameters:

The table below lists all the parameters of Amqp protocol handler and their description

Parameter Name

Parameter Description

ClientType

Whether the client is a publisher or a subscriber.

HostName

The IP address or Dns name of AMQP Broker.

PortNumber

The port number on which Amqp broker is running.

ExchangeName

The name of the exchange to which messages are to be sent or to which a queue would bind.

ExchangeType

The type of the exchange, it can be either fanout, direct, topic or headers.

QueueName

This parameter is used in case of subscriber and is the name of the queue to be created.

RoutingKey

This parameter is used in case of publisher and is the routing Key or topic key of message

to be published.

AMQPSubscriptionConfigThis parameter is used in case of subscriber and is cardinality of routing keys. The subscriber queue is binded to these routing keys.
UserNameThe User Name for AMQP Connection.
PasswordThe Password for the AMQP Connection.
VirtualHostThe Name of the virtual host to which the client belongs.
AMQPData

This parameter is used in case of publisher and is the message that you want to publish.

SubscriptionPayloadThe Messages of your subscribed topics are stored in this variable.

Usage of AMQP protocol handler:

AMQP Publish:

...

Durability

An optional boolean parameter deciding whether a declared exchange or queues should be durable or not, by default it is false. Durable queues and exchanges are persistent across server restarts.
AutoReconnectAn optional boolean parameter deciding whether a client should automatically reconnect to broker if connection is lost, by default it is true.


Usage of AMQP protocol handler:

  • AMQP Publish:

    Code Block
    languagexml
    firstline1
    titleAMQP Publish Facet
    linenumberstrue
      <ThingFacet Name="AMQPWriteFacet">
                <String Name="AMQPData" KnownBy="AMQPWriteAction"/>
                <String Name="ClientType"/>
                <String Name="UserName"/>
                <String Name="Password"/>
                <String Name="VirtualHost"/>
                <String Name="HostName"/>
                <Integer Name="PortNumber"/>
                <String  Name="ExchangeName"/>
                <String Name="RoutingKey"/>
                <String Name="QueueName" default=""/>
                <String Name="ExchangeType"/>
                <Integer Name="Priority" default=""/>
                <String Name="ContentType" default=""/>
                <Action Name="AMQPWriteAction">
                    <Workflow Limit="1" Live="1" Timeout="-1 <ThingFacet Name="AMQPWriteFacet">
                <String        <Task name="Main" Name="AMQPData" KnownBy="AMQPWriteAction"/>
                            <Event as="ActionArgument" name="Argument"<String Name="ClientType"/>
                            <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&amp;<String Name="UserName"/>
                <String Name="Password"/>
                <String   UserName=[%:Event.Argument.UserName.Value:%]&amp;Name="VirtualHost"/>
                <String Name="HostName"/>
                <Integer    Password=[%:Event.Argument.Password.Value:%]&amp;Name="PortNumber"/>
                <String  Name="ExchangeName"/>
                <String VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&amp;Name="RoutingKey"/>
                <String Name="QueueName" default=""/>
                <String  HostName=[%:Event.Argument.HostName.Value:%]&amp;Name="ExchangeType"/>
                <Integer Name="Priority" default=""/>
                <String PortNumber=[%:Event.Argument.PortNumber.Value:%]&amp;
        Name="ContentType" default=""/>
                <Action Name="AMQPWriteAction">
             ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;       <Workflow Limit="1"    Live="1" Timeout="-1">
                    QueueName=[%:Event.Argument.QueueName.Value:%]&amp;    <Task name="Main" >
                          ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument  <Event as="ActionArgument" name="InvokeAMQPWrite" scope="local"Argument"/>
                                <Message>
      <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&amp; 
                                <Value>UserName=[%:Event.Argument.UserName.Value:%]&amp;
                                        <PublishMessage>"Password=[%:Event.Argument.AMQPDataPassword.Value:%]"</PublishMessage>&amp;
                                        <RoutingKey>VirtualHost=[%:Event.Argument.RoutingKeyVirtualHost.Value:%]</RoutingKey>
       ]&amp;
                                    <Priority>HostName=[%:Event.Argument.PriorityHostName.Value:%]</Priority>&amp;
                                        <ContentType>PortNumber=[%:Event.Argument.ContentTypePortNumber.Value:%]</ContentType>&amp;
                                ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;
                </Value>                QueueName=[%:Event.Argument.QueueName.Value:%]&amp;
                </Message>                ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument" name="InvokeAMQPWrite" scope="local">
         </Invoke>                       <Message>
     <Output As="ActionResult" Name="Result">                             <Value>
                                    <AMQPData>    <PublishMessage>"[%:Event.Argument.AMQPData.Value:%]"</AMQPData>PublishMessage>
                                </Value>                         </Output><RoutingKey>[%:Event.Argument.RoutingKey.Value:%]</RoutingKey>
                        </Task>                 </Workflow>
                </Action><Priority>[%:Event.Argument.Priority.Value:%]</Priority>
            </ThingFacet>
    Code Block
    languagexml
    firstline1
    titlePublish Initialization Query
    linenumberstrue
    <Query>   <DeleteAll>     <AMQPWriteModel>       <writeId ne=""/>     </AMQPWriteModel>   </DeleteAll>   <Create>
     <ContentType>[%:Event.Argument.ContentType.Value:%]</ContentType>
       <AMQPWriteModel>       <ClientType>         publisher       </ClientType>       <HostName>
    </Value>
           localhost       </HostName>       <PortNumber>       </Message>
     5672       </PortNumber>       <ExchangeName>         AtomitonExchange</Invoke>
          </ExchangeName>       <ExchangeType>         topic  <Output As="ActionResult" Name="Result">
      </ExchangeType>       <RoutingKey>         sampleKey.error       </RoutingKey>       <AMQPData><Value>
            HelloWorld       </AMQPData>       <UserName>         $Null()
     <AMQPData>[%:Event.Argument.AMQPData.Value:%]</AMQPData>
         </UserName>       <Password>         $Null()       </Password>Value>
            <VirtualHost>         $Null()       </VirtualHost>Output>
               <Priority>         $Null()</Task>
          </Priority>       <ContentType>   </Workflow>
         $Null()       </ContentType>Action>
        </AMQPWriteModel>   </Create> </Query>ThingFacet>


    Code Block
    languagexml
    firstline1
    titlePublish Update Initialization Query
    linenumberstrue
    <Query>
      <Find format="version,known"><DeleteAll>
        <AMQPWriteModel>
           <writeId ne=""/>
        </AMQPWriteModel>
      </Find>DeleteAll>
      <Create>
        <AMQPWriteModel>
          <ClientType>
        <SetResponseData>     <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key>publisher
          <value>HelloWorld</value></ClientType>
          </SetResponseData><HostName>
      <Update>     <from>Result</from> localhost
       <Include>$Response.Message.Value.Find</Include>   </Update>
    </Query>

    AMQP Subscribe

    Code Block
    languagexml
    firstline1
    titleAMQP Subscribe Facet
    linenumberstrue
    <Def Name="AMQPSubscribe">HostName>
         <String Name="RoutingKey" Cardinality="0..m"/>
     </Def>
     
    <ThingFacet Name="AMQPSubscriberFacet">
     <PortNumber>
            5672
          </PortNumber>
       <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <ExchangeName>
            AtomitonExchange
         <String Name="ClientType"/></ExchangeName>
          <ExchangeType>
         <String Name="UserName"/>  topic
          </ExchangeType>
       <String Name="Password"/>  <RoutingKey>
            sampleKey.error
     <String Name="VirtualHost"/>    </RoutingKey>
          <AMQPData>
     <String Name="HostName"/>      HelloWorld
          <Integer Name="PortNumber"/></AMQPData>
          <UserName>
          <String  Name="ExchangeName"/>$Null()
          </UserName>
         <String Name="QueueName" default=""/><Password>
            $Null()
         <String Name="ExchangeType"/></Password>
          <VirtualHost>
          <AMQPSubscribe Name="AMQPSubscriptionConfig"/>  $Null()
          </VirtualHost>
          <Priority>
     <Action Name="AMQPSubscriberAction">      $Null()
          </Priority>
       <Workflow Limit="1" Live="1" Timeout="-1"> <ContentType>
            $Null()
          </ContentType>
        <Task name="Main" while="true">
       </AMQPWriteModel>
      </Create>
    </Query>


    Code Block
    languagexml
    firstline1
    titlePublish Update Query
    linenumberstrue
    <Query>
      <Find format="version,known">
        <AMQPWriteModel>
           <writeId ne=""/>
        </AMQPWriteModel>
     <Event as="ActionArgument" name="Argument"/> </Find>
      <SetResponseData>
        <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key>
        <value>HelloWorld</value>
      </SetResponseData>
      <Update>
         <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&amp;<from>Result</from>
        <Include>$Response.Message.Value.Find</Include>
      </Update>
    </Query>


  • AMQP Subscribe

    Code Block
    languagexml
    firstline1
    titleAMQP Subscribe Facet
    linenumberstrue
    <Def Name="AMQPSubscribe">
       <String Name="RoutingKey" Cardinality="0..m"/>
     </Def>
     
     <ThingFacet Name="AMQPSubscriberFacet">
          <String Name="SubscriptionPayload" UserName=[%:Event.Argument.UserName.Value:%]&amp;
      update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" />
                  <String Name="ClientType"/>
          <String Name="UserName"/>
      Password=[%:Event.Argument.Password.Value:%]&amp;    <String Name="Password"/>
          <String Name="VirtualHost"/>
          <String Name="HostName"/>
          <Integer VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&amp;Name="PortNumber"/>
          <String  Name="ExchangeName"/>
          <String Name="QueueName" default=""/>
          <Boolean Name="Durability"/>
        HostName=[%:Event.Argument.HostName.Value:%]&amp;  <Boolean Name="AutoReconnect"/>
          <String Name="ExchangeType"/>
          <AMQPSubscribe Name="AMQPSubscriptionConfig"/>
          <Action   PortNumber=[%:Event.Argument.PortNumber.Value:%]&amp;Name="AMQPSubscriberAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                    ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;<Task name="Main" while="true">
                <Event as="ActionArgument" name="Argument"/>
                 QueueName<Invoke post="amqp://?ClientType=[%:Event.Argument.QueueNameClientType.Value:%]&amp; 
                               ExchangeTypeUserName=[%:Event.Argument.ExchangeTypeUserName.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription">&amp;
                              Password=[%:Event.Argument.Password.Value:%]&amp;
              <Message>                VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&amp;
                    <Value>          HostName=[%:Event.Argument.HostName.Value:%]&amp;
                              <AMQPSubscriptionConfig>PortNumber=[%:Event.Argument.PortNumber.AMQPSubscriptionConfigValue:%]</AMQPSubscriptionConfig>&amp;
                              ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&amp;
          </Value>                    QueueName=[%:Event.Argument.QueueName.Value:%]&amp;
            </Message>                  ExchangeType=[%:Event.Argument.ExchangeType.Value:%]&amp;
          </Invoke>                    Durability=[%:Event.Argument.Durability.Value:%]&amp;
    						    <Output As="ActionResult" Name="Result">
       AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" 
    					scope="process" waitFor="Argument" name="InvokeAMQPSubscription">
                  <Message>
             <Value>       <Value>
                      <AMQPSubscriptionConfig>
          <SubscriptionPayload>              [%:Event.Argument.AMQPSubscriptionConfig:%]
                      </AMQPSubscriptionConfig>
       <Include>             </Value>
                  </Message>
                Invoke.InvokeAMQPSubscription.Message.Value</Invoke>
                <Output As="ActionResult" Name="Result">
                   <Value>
         </Include>           <SubscriptionPayload>
                      <Include>
      </SubscriptionPayload>                  Invoke.InvokeAMQPSubscription.Message.Value
              </Value>        </Include>
                    </Output>SubscriptionPayload>
                  </Value>
         </Task>       </Output>
              </Workflow>Task>
            </Workflow>
          </Action>
    
           </ThingFacet>
    

    Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing. 


    Code Block
    languagexml
    firstline1
    titleSubscribe Initialize Query
    linenumberstrue
    <Query>
      <DeleteAll>
        <AMQPSubscriberModel>
          <subscribeId ne=""/>
        </AMQPSubscriberModel>
      </DeleteAll>
      <Create>
        <AMQPSubscriberModel>
          <SubscriptionPayload>
            $Null()
          </SubscriptionPayload>
          <ClientType>
            subscriber
          </ClientType>
          <HostName>
            localhost
          </HostName>
          <PortNumber>
            5672
          </PortNumber>
          <UserName>
            $Null()
          </UserName>
          <Password>
            $Null()
          </Password>
          <VirtualHost>
            $Null()
          </VirtualHost>
          <ExchangeName>
            AtomitonExchange
          </ExchangeName>
          <QueueName>
            AtomitonQueuesample
          </QueueName>
          <Durability>
            true
          </Durability>
          <ExchangeType>
            topic
          </ExchangeType>
          <AMQPSubscriptionConfig>
            <RoutingKey>
              sampleKey.*
            </RoutingKey>
          </AMQPSubscriptionConfig>
          <AutoReconnect>true</AutoReconnect>
        </AMQPSubscriberModel>
      </Create>
    </Query>


    Code Block
    languagexml
    firstline1
    titleDelete Subscription Query
    linenumberstrue
    <Query>
    	<DeleteAll>
    		<AMQPSubscriberModel>
    			<subscribeId ne=""/>
    		</AMQPSubscriberModel>
    	</DeleteAll>
    </Query>


...