MQTT Protocol Handler (mqtt://)
A-Stack supports MQTT protocol Handler, that can be used to subscribe/publish various messages to an mqtt broker.Â
MQTT Protocol Handler Flow:
MQTT Protocol Handler Installation:Â
Date | Download Location | Version |
---|---|---|
4/11/2018 | Download | 1.0-041118 |
08/14/2018 | Download | 1.0.2 |
09/10/2018 | Download | 1.0.3 |
10/12/2018 | Download | 1.0.4 |
7/17/2020 | Download | 2.0.0 |
MQTT Setup Instructions
- Unzip the downloaded zip file
- Copy the sff.bundle.ext.mqtt.jar from this location to the sff.auto.launch folder which is present in your A-Stack Prime folder.
Change Log
- Version 1.0.0
- Auto Reconnection of client in case of network disconnects and broker restarts.
- All logs will be printed in engine.log.
- Version 1.0.2
- Delayed subscription for subscription drops.
- Extra NPE check.
- Version 1.0.3
- Additional logging on error condition.
- Version 1.0.4
- Bugfix : Changed Log level of handler subscription messages to debug .
MQTT Protocol Parameters:
The table below lists all the parameters of Mqtt protocol handler and their description
Parameter Name | Parameter Description |
Topic | The topic you are publishing to or subscribing from the broker. |
Broker | The Url of your MQTT Broker. |
ClientId | The client ID should be unique for a new connection. This parameter is mandatory and responsibility of the user make it unique. |
ClientType | Whether the client is a publisher or a subscriber. This parameter is mandatory and value will be either publisher or subscriber |
QoS | This parameter is used in case of publisher and specifies the QoS value for your Message. |
PublishMessage | This parameter is used in case of publisher and is the message that you want to publish. |
ReceivedData | The Messages of your subscribed topics are stored in this variable. |
UserName | The User Name for MQTT Connection. This parameter is mandatory if your broker requires username to connect. If the Mqtt broker doesn't have a User Name and password use $Null(). |
Password | The Password for the MQTT Connection. This parameter is mandatory if your broker requires password along with username to connect. If the Mqtt broker doesn't have a User Name and password use $Null(). |
AutoReconnect | This argument determines whether a client should try to automatically reconnected to broker if connection is lost, by default this is true. |
Usage of MQTT protocol handler:
MQTT Publish:
MQTT Publish Facet<ThingFacet Name="MqttPublisherFacet"> <String KnownBy="MqttPublisherAction" Name="PublishMessage"/> <String Name="QoS"/> <String Name="Topic"/> <String Name="Broker"/> <String Name="ClientId"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <AA>[:#o#Output.Argument:]</AA> <Action Name="MqttPublisherAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main"> <Output as="ActionArgument" name="Argument"/> <Invoke name="InvokeMqttPublish" post="mqtt://?Topic=[%:[:AA:].Topic.Value:%]&Broker=[%:[:AA:].Broker.Value:%]&ClientId=[%:[:AA:].ClientId.Value:%]&ClientType=[%:[:AA:].ClientType.Value:%]&QoS=[%:[:AA:].QoS.Value:%]&UserName=[%:[:AA:].UserName.Value:%]&Password=[%:[:AA:].Password.Value:%]" scope="local" waitFor="Argument"> <Message Type="text"> <Value> <Payload> [%:[:AA:].PublishMessage.Value:%] </Payload> </Value> </Message> </Invoke> <Output As="ActionResult" Name="Result"> <Value> <PublishMessage> <Include> Invoke.InvokeMqttPublish.Message.Value </Include> </PublishMessage> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
Publish Initialization Query<Query> <DeleteAll> <MqttPublisherModel> <publishId ne="" /> </MqttPublisherModel> </DeleteAll> <Create> <MqttPublisherModel> <ClientID>PublisherTest</ClientID> <ClientType>Publisher</ClientType> <Broker>tcp://localhost:1883</Broker> <Topic>/atomiton/mqtt</Topic> <PublishMessage>HelloWorld</PublishMessage> <QoS>2</QoS> <UserName>atomiton</UserName> <Password>atomiton</Password> </MqttPublisherModel> </Create> </Query>
Publish Update Query<Query> <Find format="version,known"> <MqttPublisherModel> <publishId ne=""/> </MqttPublisherModel> </Find> <SetResponseData> <key>Message.Value.Find.Result.MqttPublisherModel.PublishMessage.Value</key> <value>HelloTQL</value> </SetResponseData> <Update> <from>Result</from> <Include>$Response.Message.Value.Find</Include> </Update> </Query>
MQTT Subscribe:
MQTT Subscribe Facet<ThingFacet Name="MqttSubscriberFacet"> <String Format="$ObjectFormat(xml)" KnownBy="MqttSubscriberAction" Name="ReceivedData" update="auto"/> <String Name="Topic"/> <String Name="Broker"/> <String Name="ClientId"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="AutoReconnect"/> <Action Name="MqttSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke get="mqtt://?Topic=[%:Event.Argument.Topic.Value:%]&Broker=[%:Event.Argument.Broker.Value:%]& ClientId=[%:Event.Argument.ClientId.Value:%]&ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]&Password=[%:Event.Argument.Password.Value:%]& AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" name="InvokeMqttSubscription" scope="process" waitFor="Argument"/> <Output As="ActionResult" Name="Result"> <Value> <ReceivedData> <Include> Invoke.InvokeMqttSubscription.Message.Value </Include> </ReceivedData> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
MQTT Subscribe Using CreatePipeline and NewFacetInstance
Subscribe using NewFacetInstance<NewPackage> <NewFacetInstance Name="MqttSubscriber" fid="mqttFid" Type="SffMessageFacet"> <OnActivate> <ImportFacet>?MyMacroFacet</ImportFacet> <!-- If required.. --> <CreatePipeline arguments="MqttClientExtensionArgs" get="[:$Macro.Argument.actualURL:]"/> <!-- Complete URL of connection of MQTT Subscriber --> </OnActivate> <OnRequest> <SetLocalData key="guard" value=""/> <Log Level="Info" Message="Got the message in subscriber [:[:$LocalData.guard:]$Request:]"/> <SetProcessData key="RequestBodyEncapsulated"> <Value> [:[:$LocalData.guard:]$Request:] </Value> </SetProcessData> <Log message="RequestBodyEncapsulated is [:$ProcessData.RequestBodyEncapsulated:]"/> <SetProcessData key="RequestBody"> <Value> <Request> <Include>[:$LocalData.guard:]$Request</Include> </Request> </Value> </SetProcessData> <Log message="RequestBody is [:$ProcessData.RequestBody:]"/> <!-- Process the message --> <Log Level="Info" Message="Reponse mqtt[:$Response.Message.Value:]:]"/> </OnRequest> <OnError> <SetLocalData key="guard" value=""/> <Log Level="INFO" Message="Inside OnError of MQTTSubscriberHandler [:[:$LocalData.guard:]$Error:]"/> </OnError> </NewFacetInstance> </NewPackage>
Delete Subscription Query<Query> <DeleteAll> <MqttSubscriberModel> <subscribeId ne="" /> </MqttSubscriberModel> </DeleteAll> </Query>
MQTT Subscribe Facet<ThingFacet Name="MqttSubscriberFacet"> <String Format="$ObjectFormat(xml)" KnownBy="MqttSubscriberAction" Name="ReceivedData" update="auto"/> <String Name="Topic"/> <String Name="Broker"/> <String Name="ClientId"/> <String Name="ClientType"/> <String Name="UserName"/> <String Name="Password"/> <String Name="AutoReconnect"/> <Action Name="MqttSubscriberAction"> <Workflow Limit="1" Live="1" Timeout="-1"> <Task name="Main" while="true"> <Event as="ActionArgument" name="Argument"/> <Invoke get="mqtt://?Topic=[%:Event.Argument.Topic.Value:%]&Broker=[%:Event.Argument.Broker.Value:%]& ClientId=[%:Event.Argument.ClientId.Value:%]&ClientType=[%:Event.Argument.ClientType.Value:%]& UserName=[%:Event.Argument.UserName.Value:%]&Password=[%:Event.Argument.Password.Value:%]& AutoReconnect=[%:Event.Argument.AutoReconnect.Value:%]" name="InvokeMqttSubscription" scope="process" waitFor="Argument"/> <Output As="ActionResult" Name="Result"> <Value> <ReceivedData> <Include> Invoke.InvokeMqttSubscription.Message.Value </Include> </ReceivedData> </Value> </Output> </Task> </Workflow> </Action> </ThingFacet>
<ReceivedData> <Known> <Topic>/atomiton/new</Topic> <Message>99</Message> <QoS>0</QoS> </Known> </ReceivedData>