A-Stack supports AMQP protocol Handler, that can be used to subscribe/publish various messages to an AMQP broker.
...
AMQP Protocol Handler Installation: Download the protocol handlers zip file from the link below
http://sandbox.atomiton.com:8080/fid-downloads/res/downloads/protocolhandlerssff.bundle.ext.amqp-1.1.1.jar.zip
once downloaded unzip it. After you unzip it go to protocolhandlers/Amqp directory and copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack folder.
...
Parameter Name | Parameter Description |
ClientType | Whether the client is a publisher or a subscriber. |
HostName | The IP address or Dns name of AMQP Broker. |
PortNumber | The port number on which Amqp broker is running. |
ExchangeName | The name of the exchange to which messages are to be sent or to which a queue would bind. |
ExchangeType | The type of the exchange, it can be either fanout, direct, topic or headers. |
QueueName | This parameter is used in case of subscriber and is the name of the queue to be created. |
RoutingKey | This parameter is used in case of publisher and is the routing Key or topic key of message to be published. |
AMQPSubscriptionConfig | This parameter is used in case of subscriber and is cardinality of routing keys. The subscriber queue is binded to these routing keys. |
UserName | The User Name for AMQP Connection. |
Password | The Password for the AMQP Connection. |
VirtualHost | The Name of the virtual host to which the client belongs. |
AMQPData | This parameter is used in case of publisher and is the message that you want to publish. |
SubscriptionPayload | The Messages of your subscribed topics are stored in this variable. |
Durability | An optional boolean parameter deciding whether a declared exchange or queues should be durable or not, by default it is false. Durable queues and exchanges are persistent across server restartspersistent across server restarts. |
AutoReconnect | An optional boolean parameter deciding whether a client should automatically reconnect to broker if connection is lost, by default it is true. |
Usage of AMQP protocol handler:
...
AMQP Subscribe
Code Block language xml firstline 1 title AMQP Subscribe Facet linenumbers true <Def Name="AMQPSubscribe"> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="QueueName" default=""/> <Boolean Name="Durability"/> <Boolean Name="AutoReconnect"/> <String Name="ExchangeType"/> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> <Action Name="AMQPSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]& Password=[%:Event.Argument.Password.Value:%]& VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& HostName=[%:Event.Argument.HostName.Value:%]& PortNumber=[%:Event.Argument.PortNumber.Value:%]& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& QueueName=[%:Event.Argument.QueueName.Value:%]& ExchangeType=[%:Event.Argument.ExchangeType.Value:%]& Durability=[%:Event.Argument.Durability.Value:%]& AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription"> <Message> <Value> <AMQPSubscriptionConfig> [%:Event.Argument.AMQPSubscriptionConfig:%] </AMQPSubscriptionConfig> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <SubscriptionPayload> <Include> Invoke.InvokeAMQPSubscription.Message.Value </Include> </SubscriptionPayload> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing.
Code Block language xml firstline 1 title Subscribe Initialize Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> <Create> <AMQPSubscriberModel> <SubscriptionPayload> $Null() </SubscriptionPayload> <ClientType> subscriber </ClientType> <HostName> localhost </HostName> <PortNumber> 5672 </PortNumber> <UserName> $Null() </UserName> <Password> $Null() </Password> <VirtualHost> $Null() </VirtualHost> <ExchangeName> AtomitonExchange </ExchangeName> <QueueName> sample </QueueName> <Durability> true </Durability> <ExchangeType> topic </ExchangeType> <AMQPSubscriptionConfig> <RoutingKey> sampleKey.* </RoutingKey> </AMQPSubscriptionConfig> <AutoReconnect>true</AutoReconnect> </AMQPSubscriberModel> </Create> </Query>
Code Block language xml firstline 1 title Delete Subscription Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> </Query>
...