Atomiton Stream Processing Framework Protocol Handler (aspf://)

A-Stack supports Stream processing via a specialized protocol Handler, that can be used to subscribe/publish various messages to a Redis Store. 

ASPF Protocol Handler Flow:



AMQP Protocol Handler Installation: 

DateDownload LocationversionChange Log
 Download2.0.0
  1. The first version that supports Pub/Sub

ASPF Setup Instructions


  1. Unzip the downloaded zip file
  2. Copy the sff.bundle.ext.amqp.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.


ASPF Protocol Parameters:

The table below lists all the parameters of Amqp protocol handler and their description

Parameter Name

Parameter Description

ClientType

Whether the client is a publisher or a subscriber.

ClientId

The IP address or Dns name of AMQP Broker.

PortNumber

The port number on which Amqp broker is running.

Channel

The name of the Channel to which messages are to be sent or to which a queue would bind.

HostName

Host on which the Redis server is installed

Payload

This parameter is used to post a message to a topic


Usage of ASPF protocol handler:

  • ASPF Publish:

    ASPF Publish Facet
      <ThingFacet Name="AspfPublisherFacet">
        <String KnownBy="AspfPublisherAction" Name="PublishMessage" modifiers="virtual"/>
        <String Name="ChannelName" modifiers="virtual"/>
        <String Name="ClientId" modifiers="virtual"/>
        <AA>
            [:#o#Output.Argument:]
        </AA>
        <ASPFURL>
          aspf://?ClientId=[%:[:AA:].ClientId.Value:%]&amp;
          ClientType=publisher&amp;
          HostName=[:Config.SPECacheInfo.HostName:]&amp;
          PortNumber=[:Config.SPECacheInfo.Port:]&amp;
          Channel=[%:[:AA:].ChannelName.Value:%]
        </ASPFURL>
        <Action Name="AspfPublisherAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                <Task name="Main">
                    <Output as="ActionArgument" name="Argument"/>
                    <Invoke name="InvokeAspfPublish" post="[:ASPFURL:]"
                            scope="local" waitFor="Argument">
                        <Message Type="text">
                            <Value>
                                <Payload>
                                    [%:[:AA:].PublishMessage.Value:%]
                                </Payload>
                            </Value>
                        </Message>
                    </Invoke>
                    <Output As="ActionResult" Name="Result">
                        <Value>
                            <PublishMessage>
                                <Include>
                                    Invoke.InvokeAspfPublish.Message.Value
                                </Include>
                            </PublishMessage>
                        </Value>
                    </Output>
                </Task>
            </Workflow>
        </Action>
    </ThingFacet>
    Publish Initialization Query
    <Query>
    	<DeleteAll>
    		<AspfPublisherModel>
    			<publishId ne="" />
    		</AspfPublisherModel>
    	</DeleteAll>
    	<Create>
    		<AspfPublisherModel>
    			<ClientID>PublisherTest</ClientID>
    			<ClientType>Publisher</ClientType>
    			<PublishMessage>Hello TQL111</PublishMessage>
    			<HostName>localhost</HostName>
    			<PortNumber>6379</PortNumber>
    			<Channel>Channel2</Channel>
    		</AspfPublisherModel>
    	</Create>
    </Query>
    Publish Update Query
    <Query>
      <Find format="version,known">
        <AspfPublisherModel>
           <publishId ne=""/>
        </AspfPublisherModel>
      </Find>
      <SetResponseData>
        <key>Message.Value.Find.Result.AspfPublisherModel.PublishMessage.Value</key>
        <value>HelloTQL new 123</value>
      </SetResponseData>
      <Update>
        <from>Result</from>
        <Include>$Response.Message.Value.Find</Include>
      </Update>
    </Query>
  • ASPF Subscribe

    ASPF Subscribe Facet
    <ThingFacet Name="AspfSubscriberFacet">
        <String KnownBy="AspfSubscriberAction" Name="ReceivedData" update="auto"/>
        <String Name="ClientId"/>
        <String Name="ClientType"/>
        <String Name="HostName"/>
        <String Name="PortNumber"/>
        <String Name="Channel"/>
        <Action Name="AspfSubscriberAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                <Task name="Main" while="true">
                    <Event as="ActionArgument" name="Argument"/>
                    <Invoke Name="ProcessMessage" waitFor="Argument">
                        <FacetScript>
                            <Log level="INFO" message="Calling Subscrier...."/>
                        </FacetScript>
                    </Invoke>
                    <Invoke get="aspf:1234//?ClientId=[%:Event.Argument.ClientId.Value:%]&ClientType=[%:Event.Argument.ClientType.Value:%]&HostName=[%:Event.Argument.HostName.Value:%]&PortNumber=[%:Event.Argument.PortNumber.Value:%]&Channel=[%:Event.Argument.Channel.Value:%]" 
                            name="InvokeAspfSubscription" scope="process" waitFor="Argument"/>
                    <Invoke Name="ProcessMessage" waitFor="Argument">
                        <FacetScript>
                            <Log level="INFO" message="subscriber invoked [%:Invoke.InvokeAspfSubscription.Message.Value.Message:%]"/>
                        </FacetScript>
                    </Invoke>
                    <Output As="ActionResult" Name="Result">
                        <Value>
                            <ReceivedData>
                                [%:Invoke.InvokeAspfSubscription.Message.Value.Message:%]
                            </ReceivedData>
                        </Value>
                    </Output>
                </Task>
            </Workflow>
        </Action>
    </ThingFacet>
    
    


    Subscribe Initialize Query
    <Query>
        <DeleteAll>
            <AspfSubscriberModel>
                <subscribeId ne="" />
            </AspfSubscriberModel>
        </DeleteAll>
        <Create>
            <AspfSubscriberModel>
                <ClientID>SubscriberTest</ClientID>
    			<ClientType>Subscriber</ClientType>
    			<HostName>localhost</HostName>
    			<PortNumber>6379</PortNumber>
    			<Channel>Channel2</Channel>
                <ReceivedData>$Null()</ReceivedData>
            </AspfSubscriberModel>
        </Create>
    </Query>