...
AMQP Publish:
Code Block language xml firstline 1 title AMQP Publish Facet linenumbers true <ThingFacet Name="AMQPWriteFacet"> <String Name="AMQPData" KnownBy="AMQPWriteAction"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String <Integer Name="ExchangeNamePortNumber"/> <String <String Name="RoutingKeyExchangeName"/> <String Name="QueueName" default=""/> <String Name="ExchangeTypeRoutingKey"/> <Action Name="AMQPWriteAction"> <Workflow<String LimitName="1QueueName" Livedefault="1" Timeout="-1">/> <Task<String nameName="MainExchangeType" /> <Event as <Integer Name="ActionArgumentPriority" namedefault="Argument"/> <Invoke post="amqp://" waitFor="Argument" name="InvokeAMQPWrite" <String Name="ContentType" default=""/> <Action ClientTypeName="[%:Event.Argument.ClientType.Value:%]"AMQPWriteAction"> UserName="[%:Event.Argument.UserName.Value:%]" <Workflow Limit="1" Live="1" Timeout="-1"> Password="[%:Event.Argument.Password.Value:%]" <Task name="Main" > VirtualHost="[%:Event.Argument.VirtualHost.Value:%]" HostName="[%:Event.Argument.HostName.Value:%]" <Event PortNumberas="ActionArgument" name="[%:Event.Argument.PortNumber.Value:%]""/> ExchangeName="[%:Event.Argument.ExchangeName.Value:%]" <Invoke QueueNamepost="[%:Eventamqp://?ClientType=[%:Event.Argument.QueueNameClientType.Value:%]"& RoutingKey="[%:Event.Argument.RoutingKey.Value:%]" ExchangeTypeUserName="[%:Event.Argument.ExchangeTypeUserName.Value:%]">& <Message> <Value> "[%:Event.Argument.AMQPData Password=[%:Event.Argument.Password.Value:%]"& </Value> </Message> VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& </Invoke> <Output As="ActionResult" Name="Result"> HostName=[%:Event.Argument.HostName.Value:%]& <Value> <AMQPData>PortNumber=[%:Event.Argument.AMQPDataPortNumber.Value:%]</AMQPData>& </Value> </Output> ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& </Task> </Workflow> </Action> </ThingFacet>
Code Block language xml firstline 1 title Publish Initialization Query linenumbers true <Query> <DeleteAll> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </DeleteAll> <Create> <AMQPWriteModel> QueueName=[%:Event.Argument.QueueName.Value:%]& <ClientType>publisher</ClientType> <HostName>localhost</HostName> <PortNumber>5672</PortNumber> <ExchangeName>AtomitonExchange</ExchangeName> <ExchangeType>direct</ExchangeType> <RoutingKey>sampleKey</RoutingKey> <AMQPData>HelloTQL</AMQPData>ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" waitFor="Argument" name="InvokeAMQPWrite" scope="local"> <UserName>$Null()</UserName> <Password>$Null()</Password> <VirtualHost>$Null()</VirtualHost> </AMQPWriteModel> </Create> </Query>
Code Block language xml firstline 1 title Publish Update Query linenumbers true <Query> <Message> <Find format="version,known"> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> </Find> <SetResponseData> <key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key><Value> <value>HelloWorld</value> </SetResponseData> <Update> <from>Result</from> <Include>$Response.Message.Value.Find</Include> </Update> </Query>
AMQP Subscribe
Code Block language xml firstline 1 title AMQP Subscribe Facet linenumbers true <Def Name="AMQPSubscribe"> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <PublishMessage>"[%:Event.Argument.AMQPData.Value:%]"</PublishMessage> <String Name="SubscriptionData" update="auto" KnownBy="AMQPSubscriberAction"/> <String Name="SubscriptionPayload" Format="$ObjectFormat(xml)" /> <String Name="ClientType"/> <String Name="UserName"/> <RoutingKey>[%:Event.Argument.RoutingKey.Value:%]</RoutingKey> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="QueueName" default=""/> <Priority>[%:Event.Argument.Priority.Value:%]</Priority> <String Name="ExchangeType"/> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> <Action Name="AMQPSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"><ContentType>[%:Event.Argument.ContentType.Value:%]</ContentType> <Event as="ActionArgument" name="Argument"/> <Invoke get="amqp://Subscribe" scope="process" waitFor="Argument" name="InvokeAMQPSubscription" </Value> ClientType="[%:Event.Argument.ClientType.Value:%]" </Message> UserName="[%:Event.Argument.UserName.Value:%]" Password="[%:Event.Argument.Password.Value:%]" </Invoke> VirtualHost="[%:Event.Argument.VirtualHost.Value:%]" HostName="[%:Event.Argument.HostName.Value:%]" <Output PortNumber="[%:Event.Argument.PortNumber.Value:%]"As="ActionResult" Name="Result"> ExchangeName="[%:Event.Argument.ExchangeName.Value:%]" <Value> QueueName="[%:Event.Argument.QueueName.Value:%]" ExchangeType="[%:Event.Argument.ExchangeType.Value:%]" AMQPSubscriptionConfig="<AMQPData>[%:Event.Argument.AMQPData.AMQPSubscriptionConfigValue:%]"</>AMQPData> <Invoke name="SaveData" waitFor="Argument"> <FacetScript> <executeQuery> <QueryString></Value> <Query> <Find format="version,known"> </Output> <AMQPSubscriberModel> </Task> <subscribeId eq="[%:Event.Argument.subscribeId:%]"/> </AMQPSubscriberModel>Workflow> </Find>Action> </ThingFacet>
Code Block language xml firstline 1 title Publish Initialization Query linenumbers true <Query> <DeleteAll> <if condition="$Response.Message.Value/Find/Status eq 'Success'"> <AMQPWriteModel> <writeId ne=""/> </AMQPWriteModel> <then> </DeleteAll> <Create> <AMQPWriteModel> <SetResponseData> <ClientType> publisher <key> </ClientType> <HostName> Message.Value.Find.Result.AMQPSubscriberModel.SubscriptionPayload.Value localhost </key>HostName> <PortNumber> <value> 5672 </PortNumber> <ExchangeName> <Include> AtomitonExchange </ExchangeName> <ExchangeType> Invoke.InvokeAMQPSubscription.Message.Value topic </Include>ExchangeType> <RoutingKey> sampleKey.error </value> </RoutingKey> </SetResponseData><AMQPData> HelloWorld <Update> </AMQPData> <UserName> <from> $Null() Result</UserName> <Password> </from> $Null() </Password> <Include> <VirtualHost> $Response.Message.Value.Find $Null() </Include>VirtualHost> <Priority> </Update> $Null() </then>Priority> <ContentType> </if> </Query>$Null() </QueryString> ContentType> </executeQuery> AMQPWriteModel> </FacetScript> </Invoke> <Output As="ActionResult" Name="ResultCreate> </Query>
Code Block language xml firstline 1 title Publish Update Query linenumbers true <Query> <Find format="version,known"> <AMQPWriteModel> <Value> <writeId ne=""/> <SubscriptionData>Saved</SubscriptionData> </AMQPWriteModel> </Value>Find> <SetResponseData> </Output><key>Message.Value.Find.Result.AMQPWriteModel.AMQPData.Value</key> <value>HelloWorld</value> </Task>SetResponseData> <Update> <<from>Result</Workflow>from> </Action> </ThingFacet>
Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing.
Code Block language xml <Include>$Response.Message.Value.Find</Include> </Update> </Query>
AMQP Subscribe
Code Block language xml firstline 1 title AMQP Subscribe Initialize QueryFacet linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> <Create> <AMQPSubscriberModel> <SubscriptionData>$Null()</SubscriptionData> <SubscriptionPayload>$Null()</SubscriptionPayload> <ClientType>subscriber</ClientType> <HostName>localhost</HostName> <PortNumber>5672</PortNumber> <UserName>$Null()</UserName> <Password>$Null()</Password> <VirtualHost>$Null()</VirtualHost> <ExchangeName>AtomitonExchange</ExchangeName> <QueueName>AtomitonQueue</QueueName> <ExchangeType>direct</ExchangeType> <AMQPSubscriptionConfig> <RoutingKey>sampleKey</RoutingKey> </AMQPSubscriptionConfig> </AMQPSubscriberModel> <Def Name="AMQPSubscribe"> <String Name="RoutingKey" Cardinality="0..m"/> </Def> <ThingFacet Name="AMQPSubscriberFacet"> <String Name="SubscriptionPayload" update="auto" KnownBy="AMQPSubscriberAction" Format="$ObjectFormat(xml)" /> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="VirtualHost"/> <String Name="HostName"/> <Integer Name="PortNumber"/> <String Name="ExchangeName"/> <String Name="QueueName" default=""/> <String Name="ExchangeType"/> <AMQPSubscribe Name="AMQPSubscriptionConfig"/> <Action Name="AMQPSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke post="amqp://?ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]& Password=[%:Event.Argument.Password.Value:%]& VirtualHost=[%:Event.Argument.VirtualHost.Value:%]& HostName=[%:Event.Argument.HostName.Value:%]& PortNumber=[%:Event.Argument.PortNumber.Value:%]& ExchangeName=[%:Event.Argument.ExchangeName.Value:%]& QueueName=[%:Event.Argument.QueueName.Value:%]& ExchangeType=[%:Event.Argument.ExchangeType.Value:%]" scope="process" waitFor="Argument" name="InvokeAMQPSubscription"> <Message> <Value> <AMQPSubscriptionConfig>[%:Event.Argument.AMQPSubscriptionConfig:%]</AMQPSubscriptionConfig> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <SubscriptionPayload> <Include> Invoke.InvokeAMQPSubscription.Message.Value </Include> </SubscriptionPayload> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Note: The output is stored in the variable SubscriptionPayload although the Known By is on the variable SubscriptionData, this has been done to preserve the Xml structure and avoid TP processing.
Code Block language xml firstline 1 title Subscribe Initialize Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> <Create> <AMQPSubscriberModel> <SubscriptionPayload> $Null() </SubscriptionPayload> <ClientType> subscriber </ClientType> <HostName> localhost </HostName> <PortNumber> 5672 </PortNumber> <UserName> $Null() </UserName> <Password> $Null() </Password> <VirtualHost> $Null() </VirtualHost> <ExchangeName> AtomitonExchange </ExchangeName> <QueueName> AtomitonQueue </QueueName> <ExchangeType> topic </ExchangeType> <AMQPSubscriptionConfig> <RoutingKey> sampleKey.* </RoutingKey> </AMQPSubscriptionConfig> </AMQPSubscriberModel> </Create> </Query>
Code Block language xml firstline 1 title Delete Subscription Query linenumbers true <Query> <DeleteAll> <AMQPSubscriberModel> <subscribeId ne=""/> </AMQPSubscriberModel> </DeleteAll> </Query>
AMQP Subscription Message Output Format: The output message of amqp subscription is an xml structure that has fields RoutingKey, Message, contentType and DeliveryTag.
Code Block language xml firstline 1 title XML output format linenumbers true <Value><Known> <RoutingKey>sampleKey<<RoutingKey>sampleKey.error</RoutingKey> <Message>HelloTQL< <Message>HelloWorld</Message> <contentType>text/plain</contentType> <DeliveryTag>1</DeliveryTag> </Value>Known>