A-Stack supports AMQP protocol Handler, that can be used to subscribe/publish various messages to an AMQP broker.
...
AMQP Protocol Handler Installation: Download the protocol handlers zip file from the link below
http://sandbox.atomiton.com:8080/fid-downloads/res/downloads/protocolhandlers.zip
once downloaded unzip it. After you unzip it go to protocolhandlers/Amqp directory and copy the
Date | Download Location | version | Change Log |
---|---|---|---|
04/11/2018 | Download | 1.1.1 |
|
04/17/2018 | Download | 1.1.2 |
|
09/08/2019 | Download | 1.1.3 |
|
10/06/2019 |
2.0.0 |
|
AMQP Protocol Parameters:
The table below lists all the parameters of Amqp protocol handler and their description
...
Parameter Name
...
Parameter Description
...
ClientType
...
Whether the client is a publisher or a subscriber.
...
HostName
...
The IP address or Dns name of AMQP Broker.
...
PortNumber
...
The port number on which Amqp broker is running.
...
ExchangeName
...
| |||
7/17/2020 | Download | 2.0.0 |
|
AMQP Setup Instructions
- Unzip the downloaded zip file
- Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.
AMQP Protocol Parameters:
The table below lists all the parameters of Amqp protocol handler and their description
Parameter Name | Parameter Description |
ClientType | Whether the client is a publisher or a subscriber. |
HostName | The IP address or Dns name of AMQP Broker. |
PortNumber | The port number on which Amqp broker is running. |
ExchangeName | The name of the exchange to which messages are to be sent or to which a queue would bind. |
ExchangeType | The type of the exchange, it can be either fanout, direct, topic or headers. |
QueueName | This parameter is used in case of subscriber and is the name of the queue to be created. |
RoutingKey | This parameter is used in case of publisher and is the routing Key or topic key of message to be published. |
AMQPSubscriptionConfig | This parameter is used in case of subscriber and is cardinality of routing keys. The subscriber queue is binded to these routing keys. |
UserName | The User Name for AMQP Connection. |
Password | The Password for the AMQP Connection. |
VirtualHost | The Name of the virtual host to which the client belongs. |
AMQPData | This parameter is used in case of publisher and is the message that you want to publish. |
SubscriptionPayload | The Messages of your subscribed topics are stored in this variable. |
Usage of AMQP protocol handler:
AMQP Publish:
...
Durability | An optional boolean parameter deciding whether a declared exchange or queues should be durable or not, by default it is false. Durable queues and exchanges are persistent across server restarts. |
AutoReconnect | An optional boolean parameter deciding whether a client should automatically reconnect to broker if connection is lost, by default it is true. |
Usage of AMQP protocol handler:
AMQP Publish:
Code Block language xml firstline 1 title AMQP Publish Facet linenumbers true <ThingFacet Name="AMQPWriteFacet"> <String Name="AMQPData" KnownBy="AMQPWriteAction"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="RoutingKey"/> <String Name="QueueName" default=""/> <String Name="ExchangeType"/> <Integer Name="Priority" default=""/> <String Name="ContentType" default=""/> <Action Name="AMQPWriteAction"> <Workflow Limit="1" Live="1" Timeout="-1 <ThingFacet Name="AMQPWriteFacet"> <String <Task name="Main" Name="AMQPData" KnownBy="AMQPWriteAction"/> <Event as="ActionArgument" name="Argument"<String Name="ClientType"/> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&<String Name="UserName"/> <String Name="Password"/> <String UserName=[%:Event.Argument.UserName.Value:%]&Name="VirtualHost"/> <String Name="HostName"/> <Integer Password=[%:Event.Argument.Password.Value:%]&Name="PortNumber"/> <String Name="ExchangeName"/> <String VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&Name="RoutingKey"/> <String Name="QueueName" default=""/> <String HostName=[%:Event.Argument.HostName.Value:%]&Name="ExchangeType"/> <Integer Name="Priority" default=""/> <String PortNumber=[%:Event.Argument.PortNumber.Value:%]& Name="ContentType" default=""/> <Action Name="AMQPWriteAction"> ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& <Workflow Limit="1" Live="1" Timeout="-1"> QueueName=[%:Event.Argument.QueueName.Value:%]& <Task name="Main" > ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument <Event as="ActionArgument" name="InvokeAMQPWrite" scope="local"Argument"/> <Message> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]& <Value>UserName=[%:Event.Argument.UserName.Value:%]& <PublishMessage>"Password=[%:Event.Argument.AMQPDataPassword.Value:%]"</PublishMessage>& <RoutingKey>VirtualHost=[%:Event.Argument.RoutingKeyVirtualHost.Value:%]</RoutingKey> ]& <Priority>HostName=[%:Event.Argument.PriorityHostName.Value:%]</Priority>& <ContentType>PortNumber=[%:Event.Argument.ContentTypePortNumber.Value:%]</ContentType>& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& </Value> QueueName=[%:Event.Argument.QueueName.Value:%]& </Message> ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument" name="InvokeAMQPWrite" scope="local"> </Invoke> <Message> <Output As="ActionResult" Name="Result"> <Value> <AMQPData> <PublishMessage>"[%:Event.Argument.AMQPData.Value:%]"</AMQPData>PublishMessage> </Value> </Output><RoutingKey>[%:Event.Argument.RoutingKey.Value:%]</RoutingKey> </Task> </Workflow> </Action><Priority>[%:Event.Argument.Priority.Value:%]</Priority> </ThingFacet>
Code Block language xml firstline 1 title Publish Initialization Query linenumbers true <Query> <DeleteAll> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </DeleteAll> <Create> <ContentType>[%:Event.Argument.ContentType.Value:%]</ContentType> <AMQPWriteModel> <ClientType> publisher </ClientType> <HostName> </Value> localhost </HostName> <PortNumber> </Message> 5672 </PortNumber> <ExchangeName> AtomitonExchange</Invoke> </ExchangeName> <ExchangeType> topic <Output As="ActionResult" Name="Result"> </ExchangeType> <RoutingKey> sampleKey.error </RoutingKey> <AMQPData><Value> HelloWorld </AMQPData> <UserName> $Null() <AMQPData>[%:Event.Argument.AMQPData.Value:%]</AMQPData> </UserName> <Password> $Null() </Password>Value> <VirtualHost> $Null() </VirtualHost>Output> <Priority> $Null()</Task> </Priority> <ContentType> </Workflow> $Null() </ContentType>Action> </AMQPWriteModel> </Create> </Query>ThingFacet>
Code Block language xml firstline 1 title Publish Update Initialization Query linenumbers true <Query> <Find format="version,known"><DeleteAll> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </Find>DeleteAll> <Create> <AMQPWriteModel> <ClientType> <SetResponseData> <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key>publisher <value>HelloWorld</value></ClientType> </SetResponseData><HostName> <Update> <from>Result</from> localhost <Include>$Response.Message.Value.Find</Include> </Update> </Query>
AMQP Subscribe
Code Block language xml firstline 1 title AMQP Subscribe Facet linenumbers true <Def Name="AMQPSubscribe">HostName> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <PortNumber> 5672 </PortNumber> <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <ExchangeName> AtomitonExchange <String Name="ClientType"/></ExchangeName> <ExchangeType> <String Name="UserName"/> topic </ExchangeType> <String Name="Password"/> <RoutingKey> sampleKey.error <String Name="VirtualHost"/> </RoutingKey> <AMQPData> <String Name="HostName"/> HelloWorld <Integer Name="PortNumber"/></AMQPData> <UserName> <String Name="ExchangeName"/>$Null() </UserName> <String Name="QueueName" default=""/><Password> $Null() <String Name="ExchangeType"/></Password> <VirtualHost> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> $Null() </VirtualHost> <Priority> <Action Name="AMQPSubscriberAction"> $Null() </Priority> <Workflow Limit="1" Live="1" Timeout="-1"> <ContentType> $Null() </ContentType> <Task name="Main" while="true"> </AMQPWriteModel> </Create> </Query>
Code Block language xml firstline 1 title Publish Update Query linenumbers true <Query> <Find format="version,known"> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> <Event as="ActionArgument" name="Argument"/> </Find> <SetResponseData> <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key> <value>HelloWorld</value> </SetResponseData> <Update> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]&<from>Result</from> <Include>$Response.Message.Value.Find</Include> </Update> </Query>
AMQP Subscribe
Code Block language xml firstline 1 title AMQP Subscribe Facet linenumbers true <Def Name="AMQPSubscribe"> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <String Name="SubscriptionPayload" UserName=[%:Event.Argument.UserName.Value:%]& update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <String Name="ClientType"/> <String Name="UserName"/> Password=[%:Event.Argument.Password.Value:%]& <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer VirtualHost=[%:Event.Argument.VirtualHost.Value:%]&Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="QueueName" default=""/> <Boolean Name="Durability"/> HostName=[%:Event.Argument.HostName.Value:%]& <Boolean Name="AutoReconnect"/> <String Name="ExchangeType"/> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> <Action PortNumber=[%:Event.Argument.PortNumber.Value:%]&Name="AMQPSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> ExchangeName=[%:Event.Argument.ExchangeName.Value:%]&<Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> QueueName<Invoke post="amqp://?ClientType=[%:Event.Argument.QueueNameClientType.Value:%]& ExchangeTypeUserName=[%:Event.Argument.ExchangeTypeUserName.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription">& Password=[%:Event.Argument.Password.Value:%]& <Message> VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& <Value> HostName=[%:Event.Argument.HostName.Value:%]& <AMQPSubscriptionConfig>PortNumber=[%:Event.Argument.PortNumber.AMQPSubscriptionConfigValue:%]</AMQPSubscriptionConfig>& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& </Value> QueueName=[%:Event.Argument.QueueName.Value:%]& </Message> ExchangeType=[%:Event.Argument.ExchangeType.Value:%]& </Invoke> Durability=[%:Event.Argument.Durability.Value:%]& <Output As="ActionResult" Name="Result"> AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription"> <Message> <Value> <Value> <AMQPSubscriptionConfig> <SubscriptionPayload> [%:Event.Argument.AMQPSubscriptionConfig:%] </AMQPSubscriptionConfig> <Include> </Value> </Message> Invoke.InvokeAMQPSubscription.Message.Value</Invoke> <Output As="ActionResult" Name="Result"> <Value> </Include> <SubscriptionPayload> <Include> </SubscriptionPayload> Invoke.InvokeAMQPSubscription.Message.Value </Value> </Include> </Output>SubscriptionPayload> </Value> </Task> </Output> </Workflow>Task> </Workflow> </Action> </ThingFacet>
Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing.
Code Block language xml firstline 1 title Subscribe Initialize Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> <Create> <AMQPSubscriberModel> <SubscriptionPayload> $Null() </SubscriptionPayload> <ClientType> subscriber </ClientType> <HostName> localhost </HostName> <PortNumber> 5672 </PortNumber> <UserName> $Null() </UserName> <Password> $Null() </Password> <VirtualHost> $Null() </VirtualHost> <ExchangeName> AtomitonExchange </ExchangeName> <QueueName> AtomitonQueuesample </QueueName> <Durability> true </Durability> <ExchangeType> topic </ExchangeType> <AMQPSubscriptionConfig> <RoutingKey> sampleKey.* </RoutingKey> </AMQPSubscriptionConfig> <AutoReconnect>true</AutoReconnect> </AMQPSubscriberModel> </Create> </Query>
Code Block language xml firstline 1 title Delete Subscription Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> </Query>
...