Atomiton Stream Processing Framework Protocol Handler (aspf://)
A-Stack supports Stream processing via a specialized protocol Handler, that can be used to subscribe/publish various messages to a Redis Store.
ASPF Protocol Handler Flow:
AMQP Protocol Handler Installation:
Date | Download Location | version | Change Log |
---|---|---|---|
Download | 2.0.0 |
|
ASPF Setup Instructions
- Unzip the downloaded zip file
- Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.
ASPF Protocol Parameters:
The table below lists all the parameters of Amqp protocol handler and their description
Parameter Name | Parameter Description |
ClientType | Whether the client is a publisher or a subscriber. |
ClientId | The IP address or Dns name of AMQP Broker. |
PortNumber | The port number on which Amqp broker is running. |
Channel | The name of the Channel to which messages are to be sent or to which a queue would bind. |
HostName | Host on which the Redis server is installed |
Payload | This parameter is used to post a message to a topic |
Usage of ASPF protocol handler:
ASPF Publish:
ASPF Publish Facet<ThingFacet Name="AspfPublisherFacet"> <String KnownBy="AspfPublisherAction" Name="PublishMessage" modifiers="virtual"/> <String Name="ChannelName" modifiers="virtual"/> <String Name="ClientId" modifiers="virtual"/> <AA> [:#o#Output.Argument:] </AA> <ASPFURL> aspf://?ClientId=[%:[:AA:].ClientId.Value:%]& ClientType=publisher& HostName=[:Config.SPECacheInfo.HostName:]& PortNumber=[:Config.SPECacheInfo.Port:]& Channel=[%:[:AA:].ChannelName.Value:%] </ASPFURL> <Action Name="AspfPublisherAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main"> <Output as="ActionArgument" name="Argument"/> <Invoke name="InvokeAspfPublish" post="[:ASPFURL:]" scope="local" waitFor="Argument"> <Message Type="text"> <Value> <Payload> [%:[:AA:].PublishMessage.Value:%] </Payload> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <PublishMessage> <Include> Invoke.InvokeAspfPublish.Message.Value </Include> </PublishMessage> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Publish Initialization Query<Query> <DeleteAll> <AspfPublisherModel> <publishId ne="" /> </AspfPublisherModel> </DeleteAll> <Create> <AspfPublisherModel> <ClientID>PublisherTest</ClientID> <ClientType>Publisher</ClientType> <PublishMessage>Hello TQL111</PublishMessage> <HostName>localhost</HostName> <PortNumber>6379</PortNumber> <Channel>Channel2</Channel> </AspfPublisherModel> </Create> </Query>
Publish Update Query<Query> <Find format="version,known"> <AspfPublisherModel> <publishId ne=""/> </AspfPublisherModel> </Find> <SetResponseData> <key>Message.Value.Find.Result.AspfPublisherModel.PublishMessage.Value</key> <value>HelloTQL new 123</value> </SetResponseData> <Update> <from>Result</from> <Include>$Response.Message.Value.Find</Include> </Update> </Query>
ASPF Subscribe
ASPF Subscribe Facet<ThingFacet Name="AspfSubscriberFacet"> <String KnownBy="AspfSubscriberAction" Name="ReceivedData" update="auto"/> <String Name="ClientId"/> <String Name="ClientType"/> <String Name="HostName"/> <String Name="PortNumber"/> <String Name="Channel"/> <Action Name="AspfSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke Name="ProcessMessage" waitFor="Argument"> <FacetScript> <Log level="INFO" message="Calling Subscrier...."/> </FacetScript> </Invoke> <Invoke get="aspf:1234//?ClientId=[%:Event.Argument.ClientId.Value:%]&ClientType=[%:Event.Argument.ClientType.Value:%]&HostName=[%:Event.Argument.HostName.Value:%]&PortNumber=[%:Event.Argument.PortNumber.Value:%]&Channel=[%:Event.Argument.Channel.Value:%]" name="InvokeAspfSubscription" scope="process" waitFor="Argument"/> <Invoke Name="ProcessMessage" waitFor="Argument"> <FacetScript> <Log level="INFO" message="subscriber invoked [%:Invoke.InvokeAspfSubscription.Message.Value.Message:%]"/> </FacetScript> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <ReceivedData> [%:Invoke.InvokeAspfSubscription.Message.Value.Message:%] </ReceivedData> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Subscribe Initialize Query<Query> <DeleteAll> <AspfSubscriberModel> <subscribeId ne="" /> </AspfSubscriberModel> </DeleteAll> <Create> <AspfSubscriberModel> <ClientID>SubscriberTest</ClientID> <ClientType>Subscriber</ClientType> <HostName>localhost</HostName> <PortNumber>6379</PortNumber> <Channel>Channel2</Channel> <ReceivedData>$Null()</ReceivedData> </AspfSubscriberModel> </Create> </Query>