AMQP Protocol Handler (amqp://)
A-Stack supports AMQP protocol Handler, that can be used to subscribe/publish various messages to an AMQP broker.
AMQP Protocol Handler Flow:
AMQP Protocol Handler Installation:
Date | Download Location | version | Change Log |
---|---|---|---|
04/11/2018 | Download | 1.1.1 |
|
04/17/2018 | Download | 1.1.2 |
|
09/08/2019 | Download | 1.1.3 |
|
10/06/2019 | 2.0.0 |
| |
7/17/2020 | Download | 2.0.0 |
|
AMQP Setup Instructions
- Unzip the downloaded zip file
- Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.
AMQP Protocol Parameters:
The table below lists all the parameters of Amqp protocol handler and their description
Parameter Name | Parameter Description |
ClientType | Whether the client is a publisher or a subscriber. |
HostName | The IP address or Dns name of AMQP Broker. |
PortNumber | The port number on which Amqp broker is running. |
ExchangeName | The name of the exchange to which messages are to be sent or to which a queue would bind. |
ExchangeType | The type of the exchange, it can be either fanout, direct, topic or headers. |
QueueName | This parameter is used in case of subscriber and is the name of the queue to be created. |
RoutingKey | This parameter is used in case of publisher and is the routing Key or topic key of message to be published. |
AMQPSubscriptionConfig | This parameter is used in case of subscriber and is cardinality of routing keys. The subscriber queue is binded to these routing keys. |
UserName | The User Name for AMQP Connection. |
Password | The Password for the AMQP Connection. |
VirtualHost | The Name of the virtual host to which the client belongs. |
AMQPData | This parameter is used in case of publisher and is the message that you want to publish. |
SubscriptionPayload | The Messages of your subscribed topics are stored in this variable. |
Durability | An optional boolean parameter deciding whether a declared exchange or queues should be durable or not, by default it is false. Durable queues and exchanges are persistent across server restarts. |
AutoReconnect | An optional boolean parameter deciding whether a client should automatically reconnect to broker if connection is lost, by default it is true. |
Usage of AMQP protocol handler:
AMQP Publish:
AMQP Publish Facet<ThingFacet Name="AMQPWriteFacet"> <String Name="AMQPData" KnownBy="AMQPWriteAction"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="RoutingKey"/> <String Name="QueueName" default=""/> <String Name="ExchangeType"/> <Integer Name="Priority" default=""/> <String Name="ContentType" default=""/> <Action Name="AMQPWriteAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" > <Event as="ActionArgument" name="Argument"/> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]& Password=[%:Event.Argument.Password.Value:%]& VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& HostName=[%:Event.Argument.HostName.Value:%]& PortNumber=[%:Event.Argument.PortNumber.Value:%]& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& QueueName=[%:Event.Argument.QueueName.Value:%]& ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument" name="InvokeAMQPWrite" scope="local"> <Message> <Value> <PublishMessage>"[%:Event.Argument.AMQPData.Value:%]"</PublishMessage> <RoutingKey>[%:Event.Argument.RoutingKey.Value:%]</RoutingKey> <Priority>[%:Event.Argument.Priority.Value:%]</Priority> <ContentType>[%:Event.Argument.ContentType.Value:%]</ContentType> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <AMQPData>[%:Event.Argument.AMQPData.Value:%]</AMQPData> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Publish Initialization Query<Query> <DeleteAll> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </DeleteAll> <Create> <AMQPWriteModel> <ClientType> publisher </ClientType> <HostName> localhost </HostName> <PortNumber> 5672 </PortNumber> <ExchangeName> AtomitonExchange </ExchangeName> <ExchangeType> topic </ExchangeType> <RoutingKey> sampleKey.error </RoutingKey> <AMQPData> HelloWorld </AMQPData> <UserName> $Null() </UserName> <Password> $Null() </Password> <VirtualHost> $Null() </VirtualHost> <Priority> $Null() </Priority> <ContentType> $Null() </ContentType> </AMQPWriteModel> </Create> </Query>
Publish Update Query<Query> <Find format="version,known"> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </Find> <SetResponseData> <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key> <value>HelloWorld</value> </SetResponseData> <Update> <from>Result</from> <Include>$Response.Message.Value.Find</Include> </Update> </Query>
AMQP Subscribe
AMQP Subscribe Facet<Def Name="AMQPSubscribe"> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="QueueName" default=""/> <Boolean Name="Durability"/> <Boolean Name="AutoReconnect"/> <String Name="ExchangeType"/> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> <Action Name="AMQPSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]& Password=[%:Event.Argument.Password.Value:%]& VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& HostName=[%:Event.Argument.HostName.Value:%]& PortNumber=[%:Event.Argument.PortNumber.Value:%]& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& QueueName=[%:Event.Argument.QueueName.Value:%]& ExchangeType=[%:Event.Argument.ExchangeType.Value:%]& Durability=[%:Event.Argument.Durability.Value:%]& AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription"> <Message> <Value> <AMQPSubscriptionConfig> [%:Event.Argument.AMQPSubscriptionConfig:%] </AMQPSubscriptionConfig> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <SubscriptionPayload> <Include> Invoke.InvokeAMQPSubscription.Message.Value </Include> </SubscriptionPayload> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing.
Subscribe Initialize Query<Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> <Create> <AMQPSubscriberModel> <SubscriptionPayload> $Null() </SubscriptionPayload> <ClientType> subscriber </ClientType> <HostName> localhost </HostName> <PortNumber> 5672 </PortNumber> <UserName> $Null() </UserName> <Password> $Null() </Password> <VirtualHost> $Null() </VirtualHost> <ExchangeName> AtomitonExchange </ExchangeName> <QueueName> sample </QueueName> <Durability> true </Durability> <ExchangeType> topic </ExchangeType> <AMQPSubscriptionConfig> <RoutingKey> sampleKey.* </RoutingKey> </AMQPSubscriptionConfig> <AutoReconnect>true</AutoReconnect> </AMQPSubscriberModel> </Create> </Query>
Delete Subscription Query<Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> </Query>
AMQP Subscription Message Output Format: The output message of amqp subscription is an xml structure that has fields RoutingKey, Message, contentType and DeliveryTag.
XML output format<Known> <RoutingKey>sampleKey.error</RoutingKey> <Message>HelloWorld</Message> <contentType>text/plain</contentType> <DeliveryTag>1</DeliveryTag> </Known>