MQTT Protocol Handler (mqtt://)

A-Stack supports MQTT protocol Handler, that can be used to subscribe/publish various messages to an mqtt broker. 

MQTT Protocol Handler Flow:

MQTT Protocol Handler Installation: 

DateDownload LocationVersion
4/11/2018Download1.0-041118
08/14/2018Download1.0.2
09/10/2018Download1.0.3
10/12/2018Download1.0.4
7/17/2020Download2.0.0


MQTT Setup Instructions

  1. Unzip the downloaded zip file
  2. Copy the sff.bundle.ext.mqtt.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.


Change Log

  • Version 1.0.0
  1. Auto Reconnection of client in case of network disconnects and broker restarts.
  2. All logs will be printed in engine.log.
  • Version 1.0.2
  1. Delayed subscription for subscription drops.
  2. Extra NPE check.
  • Version 1.0.3
  1. Additional logging on error condition.
  • Version 1.0.4
  1. Bugfix : Changed Log level of handler subscription messages to debug .


MQTT Protocol Parameters:

The table below lists all the parameters of Mqtt protocol handler and their description

Parameter Name

Parameter Description

Topic


The topic you are publishing to or subscribing from the broker.

Broker


The Url of your MQTT Broker.

ClientId


The client ID should be unique for a new connection. This parameter is mandatory and responsibility of the user make it unique.

ClientType

Whether the client is a publisher or a subscriber. This parameter is mandatory and value will be either publisher or subscriber

QoS

This parameter is used in case of publisher and specifies the QoS value for your Message.

PublishMessage

This parameter is used in case of publisher and is the message that you want to publish.

ReceivedData

The Messages of your subscribed topics are stored in this variable.

UserNameThe User Name for MQTT Connection. This parameter is mandatory if your broker requires username to connect. If the Mqtt broker doesn't have a User Name and password use $Null().
PasswordThe Password for the MQTT Connection. This parameter is mandatory if your broker requires password along with username to connect. If the Mqtt broker doesn't have a User Name and password use $Null().
AutoReconnectThis argument determines whether a client should try to automatically reconnected to broker if connection is lost, by default this is true.


Usage of MQTT protocol handler:


  • MQTT Publish:


    MQTT Publish Facet
    <ThingFacet Name="MqttPublisherFacet">
        <String KnownBy="MqttPublisherAction" Name="PublishMessage"/>
        <String Name="QoS"/>
        <String Name="Topic"/>
        <String Name="Broker"/>
        <String Name="ClientId"/>
        <String Name="ClientType"/>
        <String Name="UserName"/>
        <String Name="Password"/>
        <AA>[:#o#Output.Argument:]</AA>
        <Action Name="MqttPublisherAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                <Task name="Main">
                    <Output as="ActionArgument" name="Argument"/>
                    <Invoke name="InvokeMqttPublish" post="mqtt://?Topic=[%:[:AA:].Topic.Value:%]&amp;Broker=[%:[:AA:].Broker.Value:%]&amp;ClientId=[%:[:AA:].ClientId.Value:%]&amp;ClientType=[%:[:AA:].ClientType.Value:%]&amp;QoS=[%:[:AA:].QoS.Value:%]&amp;UserName=[%:[:AA:].UserName.Value:%]&amp;Password=[%:[:AA:].Password.Value:%]" scope="local" waitFor="Argument">
                        <Message Type="text">
                            <Value>
                                <Payload>
                                    [%:[:AA:].PublishMessage.Value:%]
                                </Payload>
                            </Value>
                        </Message>
                    </Invoke>
                    <Output As="ActionResult" Name="Result">
                        <Value>
                            <PublishMessage>
                                <Include>
    								Invoke.InvokeMqttPublish.Message.Value
                                </Include>
                            </PublishMessage>
                        </Value>
                    </Output>
                </Task>
            </Workflow>
        </Action>
    </ThingFacet>



    Publish Initialization Query
    <Query>
    	<DeleteAll>
    		<MqttPublisherModel>
    			<publishId ne="" />
    		</MqttPublisherModel>
    	</DeleteAll>
    	<Create>
    		<MqttPublisherModel>
    			<ClientID>PublisherTest</ClientID>
    			<ClientType>Publisher</ClientType>
    			<Broker>tcp://localhost:1883</Broker> 
    			<Topic>/atomiton/mqtt</Topic>
    			<PublishMessage>HelloWorld</PublishMessage>
    			<QoS>2</QoS>
    			<UserName>atomiton</UserName>
    			<Password>atomiton</Password>
    		</MqttPublisherModel>
    	</Create>
    </Query>
    Publish Update Query
    <Query>
      <Find format="version,known">
        <MqttPublisherModel>
           <publishId ne=""/>
        </MqttPublisherModel>
      </Find>
      <SetResponseData>
        <key>Message.Value.Find.Result.MqttPublisherModel.PublishMessage.Value</key>
        <value>HelloTQL</value>
      </SetResponseData>
      <Update>
        <from>Result</from>
        <Include>$Response.Message.Value.Find</Include>
      </Update>
    </Query>
  • MQTT Subscribe:


    MQTT Subscribe Facet
    <ThingFacet Name="MqttSubscriberFacet">
        <String Format="$ObjectFormat(xml)" KnownBy="MqttSubscriberAction" Name="ReceivedData" update="auto"/>
        <String Name="Topic"/>
        <String Name="Broker"/>
        <String Name="ClientId"/>
        <String Name="ClientType"/>
        <String Name="UserName"/>
        <String Name="Password"/>
        <String Name="AutoReconnect"/>
        <Action Name="MqttSubscriberAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                <Task name="Main" while="true">
                    <Event as="ActionArgument" name="Argument"/>
                    <Invoke get="mqtt://?Topic=[%:Event.Argument.Topic.Value:%]&amp;Broker=[%:Event.Argument.Broker.Value:%]&amp;
                                 ClientId=[%:Event.Argument.ClientId.Value:%]&amp;ClientType=[%:Event.Argument.ClientType.Value:%]&amp;
    							UserName=[%:Event.Argument.UserName.Value:%]&amp;Password=[%:Event.Argument.Password.Value:%]&amp;
                                AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" 
    							name="InvokeMqttSubscription" scope="process" waitFor="Argument"/>
                    <Output As="ActionResult" Name="Result">
                        <Value>
                            <ReceivedData>
                                <Include>
    								Invoke.InvokeMqttSubscription.Message.Value
                                </Include>
                            </ReceivedData>
                        </Value>
                    </Output>
                </Task>
            </Workflow>
        </Action>
    </ThingFacet>
    
    
  • MQTT Subscribe Using CreatePipeline and NewFacetInstance


    Subscribe using NewFacetInstance
    <NewPackage>
      <NewFacetInstance Name="MqttSubscriber" fid="mqttFid" Type="SffMessageFacet">
        <OnActivate>
          <ImportFacet>?MyMacroFacet</ImportFacet> <!-- If required.. -->
          <CreatePipeline arguments="MqttClientExtensionArgs" get="[:$Macro.Argument.actualURL:]"/> <!-- Complete URL of connection of MQTT Subscriber -->
        </OnActivate>
        <OnRequest>
          <SetLocalData key="guard" value=""/>
          <Log Level="Info" Message="Got the message in subscriber [:[:$LocalData.guard:]$Request:]"/>
          <SetProcessData key="RequestBodyEncapsulated">
            <Value>
              [:[:$LocalData.guard:]$Request:]
            </Value>
          </SetProcessData>
          <Log message="RequestBodyEncapsulated is [:$ProcessData.RequestBodyEncapsulated:]"/>
          <SetProcessData key="RequestBody">
            <Value>
              <Request>
                <Include>[:$LocalData.guard:]$Request</Include>
              </Request>
            </Value>
          </SetProcessData>
          <Log message="RequestBody is [:$ProcessData.RequestBody:]"/>
          <!-- Process the message -->
          <Log Level="Info" Message="Reponse mqtt[:$Response.Message.Value:]:]"/>
        </OnRequest>
    
        <OnError>
          <SetLocalData key="guard" value=""/>
          <Log Level="INFO" Message="Inside OnError of MQTTSubscriberHandler [:[:$LocalData.guard:]$Error:]"/>
        </OnError>
      </NewFacetInstance>
    </NewPackage>
    Delete Subscription Query
    <Query>
      <DeleteAll>
        <MqttSubscriberModel>
          <subscribeId ne="" />
        </MqttSubscriberModel>
      </DeleteAll>
    </Query>
    MQTT Subscribe Facet
    <ThingFacet Name="MqttSubscriberFacet">
        <String Format="$ObjectFormat(xml)" KnownBy="MqttSubscriberAction" Name="ReceivedData" update="auto"/>
        <String Name="Topic"/>
        <String Name="Broker"/>
        <String Name="ClientId"/>
        <String Name="ClientType"/>
        <String Name="UserName"/>
        <String Name="Password"/>
        <String Name="AutoReconnect"/>
        <Action Name="MqttSubscriberAction">
            <Workflow Limit="1" Live="1" Timeout="-1">
                <Task name="Main" while="true">
                    <Event as="ActionArgument" name="Argument"/>
                    <Invoke get="mqtt://?Topic=[%:Event.Argument.Topic.Value:%]&amp;Broker=[%:Event.Argument.Broker.Value:%]&amp;
                                 ClientId=[%:Event.Argument.ClientId.Value:%]&amp;ClientType=[%:Event.Argument.ClientType.Value:%]&amp;
    							UserName=[%:Event.Argument.UserName.Value:%]&amp;Password=[%:Event.Argument.Password.Value:%]&amp;
                                AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" 
    							name="InvokeMqttSubscription" scope="process" waitFor="Argument"/>
                    <Output As="ActionResult" Name="Result">
                        <Value>
                            <ReceivedData>
                                <Include>
    								Invoke.InvokeMqttSubscription.Message.Value
                                </Include>
                            </ReceivedData>
                        </Value>
                    </Output>
                </Task>
            </Workflow>
        </Action>
    </ThingFacet>
    
    
MQTT Subscription Message Output Format: The output message of mqtt subscription is an xml structure that has fields Topic, Message,QoS.


Xml Output Format
<ReceivedData>
        <Known>
          <Topic>/atomiton/new</Topic>
          <Message>99</Message>
          <QoS>0</QoS>
        </Known>
</ReceivedData>