Cluster overview

Cluster Terminology

This topic introduces some of the concepts unique to the cluster creation and communication feature of A-Stack. 

TerminologyDescriptionSynonym Terms

Network

A collection of A-Stack running instances that communicates (notifications) and keep persistent data store in sync.Cluster, Clustering
PeerA-Stack running instance that is participating in forming a Network
NodeA-Stack running instancePeer if participating in forming a network.
GroupLogical Name defined to make communication type between a collection of Nodes
AdvertiseSend Notifications to peer or group
ReplicateReplication of data (TQL Storage) between nodes or a group of nodes

Cluster Creation

The cluster feature embedded in the A-Stack are built using NetworkFacet Capability - SffNetworkFacet. A-Stack running instances participate in a cluster. SffNetworkFacet takes network definition and provides communications and data exchange between network peers. There are two types of data exchanges between the defined nodes -

  • Notifications and 
  • Persistent TQL Data.

Transport

There are two types of transports that can be configured in cluster communication.

  • Websocket:  Websocket is specified as part of URL parameter. 
  • AMQP - AMQP is an open Internet (or "wire") Protocol standard for message-queuing communications. AMQP setup is described in detail below.

SffNetworkFacet Capability

This section explains keywords associated with NetworkFacet Capability.

SffNetworkFacet Capability
<NewFacetInstance fid="cluster" name="cluster" type="SffNetworkFacet">
   <OnActivate>
        <Process>
          <Network PeerConnectTime="PT60S"  PeerReconnectTime="PT30S">
            <C1 fid="cluster" documentation="shared cluster deployment parameters"/> <!-- Helper Tag -->
            <Group name="G1"/> <!-- Explicitly defined group -->
            <Namespace name="N1">
               <Node name="P0" url="ws://host:port/fid-[:C1.fid:]" group="G1" advertiseTo="G1" documentation="parent replicator"/>
               <Node name="P1" url="ws://host:port/fid-[:C1.fid:]" group="G1" advertiseTo="G1" documentation="parent replicator"/>
            </Namespace>
            <Node name="E2" url="ws://host:port/fid-[:C1.fid:]" group="G2" replicateTo="G1" documentation="child controller"/>                               
            <Node name="E3" url="ws://host:port/fid-[:C1.fid:]" group="G2" replicateTo="G1" documentation="child controller"/>         
          </Network>       
        </Process>
    </OnActivate>
    <OnOpen ModifyPipeline="WsServerExtensionArgs"/>
</NewFacetInstance>       

SffNetworkFacet Keyword Notes

KeywordDescriptionModifierRequired
NetworkMain Tag used to specify two or more node/peer definition that form a network of A-Stacks for communication

PeerConnectTime:

Default 1 second. This is the delay A-Stack uses to make first peer connection or first reconnect attempt after peer connection has been closed for whatever reason.

PeerReConnectTime:

Default 30 seconds. This is the delay engine uses between subsequent peer reconnect attempts

Yes
GroupExplicitly defined Group.

Name:

Specify name of the group

No

Any Tag Name (Example: AStack; Node; Peer with attributes required to qualify as a node/peer).

Guideline: Always use the consistent name when defining a node. Simply call it Node.

Peers can be defined within a namespace (example N1) to avoid name collisions and be aggregated into groups.

Name:

Name of the Node.

Url:

Complete URL of the node

Group:

Group to which the node belongs. Group can be the name of the group that is explicitly defined or defined inline.

ReplicateTo:

AdvertiseTo

Yes


Associating Network with TQL

Any number of TQL facets can associate with a single network facet and any number of network facets can associate with a single TQL facet. That is, a single TQL facet may participate in multiple clusters and single cluster may contain multiple TQL facets

SffNetworkFacet Capability
<NewFacetInstance fid="cluster" name="cluster" type="SffNetworkFacet">
   <OnActivate>
        <Process>
          <Network>
            <C1 fid="cluster" documentation="shared cluster deployment parameters"/>
            <Group name="G1"/>
            <Namespace name="N1">
               <TqlEngine name="P0" url="ws://host:port/fid-[:C1.fid:]" group="G1" advertiseTo="G1" documentation="parent replicator"/>
               <TqlEngine name="P1" url="ws://host:port/fid-[:C1.fid:]" group="G1" advertiseTo="G1" documentation="parent replicator"/>
            </Namespace>
               <TqlEngine name="E2" url="ws://host:port/fid-[:C1.fid:]" group="G2" replicateTo="G1" documentation="child controller"/>                               
               <TqlEngine name="E3" url="ws://host:port/fid-[:C1.fid:]" group="G2" replicateTo="G1" documentation="child controller"/>         
          </Network>       
        </Process>
    </OnActivate>
    <OnOpen ModifyPipeline="WsServerExtensionArgs"/>
</NewFacetInstance>       

AMQP Transport for Cluster Communication

AMQP is a publish/subscribe protocol therefore instead of point-to-point connections between cluster peers a group of peers as a whole connects to the exchange. Recall that peer relationships are defined in terms of groups so every time a peer sends something to a group, the group itself forwards the message to the exchange. The opposite is also true. Group also subscribes to the exchange so it can receive messages published by other peers to that group.


Example of AMQP Topology for 4 peers connected to a single group.


                                 


  1. Please refer to AMQP Protocol Handler for details configurtion parameters. AMQP Protocol Handler parmeters are used in defining Borker Exchange, bindings and Routing Key.
  2. Group also subscribes to the exchange so it can receive messages published by other peers to that group.
  3. Group communication over AMQP must have separate Send and Receive configurations which corresponds to two separate connections for publish and subscribe. Engine recognizes pub/sub style of group communication by looking whether these configurations are present.
  4. New PeerQName parameter is now available to aid in defining peer-unique names like queue name. Each distinct peer must have its own unique queue on the exchange. This parameter is resolved to self QName of the cluster peer. That is, If you’re running on port 8080 it will be “N1.P0” and on port 8081 it will be “N1.P1” etc
  5. Routing key should be unique per cluster per group. In the above example I defined it as cluster.[:QName:], which would be resolved as cluster.G1. In the presence of multiple clusters you should probably use something like [:fid:].[:QName:] instead. Same goes for peer queue names (i.e. QueueName="[:fid:].[:PeerQName:]"  or so).


For the Network Definiton below: It results in following variable values.          

NameValue
QName
AMQPCluster.Region1_Cluster1_Server1
GroupName
Region1_Cluster
RoutingKeycluster.Region1_Cluster1 (Note that RoutingKey is unique per Exchange / Cluster definiton)



SffNetworkFacet with AMQP Transport
<NewPackage>
<RuntimeParams>
    <RegionName>Region1</RegionName>
    <RegionSectionName>Cluster1</RegionSectionName>
    <ClusterName>[:RuntimeParams.RegionName:]_[:RuntimeParams.RegionSectionName:]</ClusterName>
</RuntimeParams>
<NewFacetInstance Name="wsNotificationCluster" Type="SffNetworkFacet" Context="new" fid="wsNotificationCluster">
       <OnActivate>
            <ImportFacet>[:RuntimeParams.MacroFacetID:]</ImportFacet>
            <SetLocalData Key="peers">
                <Value>
                    <Server Id="[:RuntimeParams.ClusterName:]_Server1" Host="bkhan" Port="8082"/>
                    <Server Id="[:RuntimeParams.ClusterName:]_Server2" Host="bkhan" Port="8083"/>
                    <Server Id="[:RuntimeParams.ClusterName:]_Server3" Host="bkhan" Port="8084"/>
                    <Server Id="[:RuntimeParams.ClusterName:]_Server4" Host="bkhan" Port="8085"/>
                </Value>
            </SetLocalData>
            <For Each="server" From="$LocalData.peers" In="Server">
                <AddProcessData Key="ClusterPeers.Peer">
                    <Value>
                        <Name>[:$LocalData.server.Id:]</Name>
                        <URL>ws://[:$LocalData.server.Host:]:[:$LocalData.server.Port:]/fid-wsNotificationCluster</URL>
                        <AdvertiseTo>[:RuntimeParams.ClusterName:]</AdvertiseTo>
                        <Group>[:RuntimeParams.ClusterName:]</Group>
                    </Value>
                </AddProcessData>
            </For>
            <Log Level="INFO" Message="TQLNotificationCluster : Federation AdvertiseTo servers [:$ProcessData.DeviceEngines:]"/>
            <Process>
                <Network>
                    <OnPeerError>
                        <Log Level="error" Message="TQLNotificationCluster : Peer [:$ContextData.$Peer.Key:] error: [:$Error:]"/>
                    </OnPeerError>
                    <Namespace Name="AMQPCluster">
                        <Include>$ProcessData.ClusterPeers</Include>
                    </Namespace>
                     <Group name="[:RuntimeParams.ClusterName:]" UserName="tql" Password="tql12345" 
                           VirtualHost="/" Durability="true"
                           Host="mqtt.atomiton.com" Port="5672" 
                           ExchangeName="AtomitonFanoutExchange" 
                           ExchangeType="fanout" QueueName="peer.[:PeerQName:]">
                           <Send post="amqp://?ClientType=publisher&
                                            UserName=[:UserName:]&
                                            Password=[:Password:]&
                                            VirtualHost=[:VirtualHost:]&
                                            HostName=[:Host:]&
                                            PortNumber=[:Port:]&
                                            ExchangeName=[:ExchangeName:]&
                                            ExchangeType=[:ExchangeType:]&
                                            QueueName=[:QueueName:]&
                                            Durability=[:Durability:]">
                                  <RoutingKey>cluster.[:QName:]</RoutingKey>
                                  <Template>
                                      <Message>
                                          <Value>
                                           <PublishMessage>[:$Request.Message.Value:]</PublishMessage>
                                          </Value>
                                       </Message>
                                    </Template>
                            </Send>
                            <Receive get="amqp://?ClientType=subscriber&
                                              UserName=[:UserName:]&
                                              Password=[:Password:]&
                                              VirtualHost=[:VirtualHost:]&
                                              HostName=[:Host:]&
                                              PortNumber=[:Port:]&
                                              ExchangeName=[:ExchangeName:]&
                                              ExchangeType=[:ExchangeType:]&
                                              QueueName=[:QueueName:]&
                                              Durability=[:Durability:]"
                                         as="ServerPipeline" disable="CMD_SEND">
                                  <Message>
                                    <Value>
                                      <AMQPSubscriptionConfig>
                                        <RoutingKey value="cluster.[:QName:]"/>
                                      </AMQPSubscriptionConfig>
                                    </Value>
                                  </Message>
                             </Receive>
                    </Group>
                </Network>
            </Process>
            <SffReport Name="wsNotificationCluster_OnActivate"/>
        </OnActivate>
        <OnOpen ModifyPipeline="WsServerExtensionArgs" Context="keep"/>
        <Policy name="MyMetering" type="SffSimplePolicy" kind="metering">
            <inc type="integer" target="RequestCount"/>
            <add type="integer" target="TotalBytes" value="[:$RawData.TotalBytesRead:]"/>
            <set type="integer" target="AvgRequestSize" value="[:$ThisData/(number(TotalBytes div RequestCount)):]"/>
        </Policy>
        <OnError>
            <SetLocalData Key="guard" Value=""/>
            <ReportError>
                <Error>[:[:$LocalData.guard:]$Error:]</Error>
                <occurredIn>TQLNotificationCluster</occurredIn>
            </ReportError>
            <SffReport Name="wsNotificationCluster_OnError"/>
        </OnError>
    </NewFacetInstance> 
</NewPackage>      

Broker Exchange and Bindings

                       

AMQP Vs WS Performance

Using AMQP as a cluster transport does have a cost associated with it due to network involving communication with AMQP borker. Sample Test setup will help us understand the performance cost comparison between the two transports.

Test Setup

TransportBroker Instance TypeBoroker VerisonCluster SizeCluster RelationshipDatabase
AMQPAWS EC2 m1.mediumRabbitMQ 3.2.44AdvertiseToRemote Postgress RDS
WS--4AdvertiseToRemote Postgress RDS


Test Results


Cluster Topologies

Choosing the correct data storage strategy depends on the type of application (simple to complex), deployment and interoperability to other platforms. Here are some basic recommended guidelines

Shared Storage

  • Useful if there are multiple A-Stack (s) that are load balanced for access by IoT Application (UI or other)
  • This can be used if A-Stack (s) are responsible for managing Device Cloud Providers (Example, Sensory Network etc)

Standalone A-Stack

  • Useful when A-Stack(s) directly connect to sensors and actuators.
  • Direct connection to sensors and actuators can be physical (USB/Serial) or Low-Frequency Wifi (Xbee, Bluetooth, BLE)


Distributed / Edge Computing

  • Combination of Shared and Standalone mode can be applied leading to Distributed computing mode of operation.
  • Useful if there combination of Edge Devices (compute needs) as well as Cloud Providers.

Dynamic Cluster

Initial dynamic (or rather quasi-static) cluster support.

o   Cluster network configuration now can handle pattern-based peer definitions. For example


Dynamic Cluster
<Network>
  <C1 fid="cluster"/>
  <Group name="G1"/>
  <Namespace name="N1">
    <TqlEngine name="P0" url="ws://MQId-Win7E64:8080/fid-[:C1.fid:]" group="G1" publishTo="G1"/>
    <TqlEngine name="P[:X:]" url="ws://MQId-Win7E64:808[:X:]/fid-[:C1.fid:]" group="G1" publishTo="G1"/>
  </Namespace>
</Network>       


This works as following:

  • First [static] peer (i.e. N1.P0) definition is matched as usual. This is you cluster leader. You can have any number > 0 of leaders. You *must have at least one leader* explicitly defined otherwise peers will not be able to discover each other. (You should have more than one if you want to achieve fault tolerance).
  • Second peer definition is pattern-based (note highlighted parameters). Engine will try to match the pattern to its own environment in order to recognize itself as a peer. For example, if you start an engine instance on host “MQId-Win7E64” bound to port 8081 then it will match parameter “X” as “1” (i.e. 8081 matches as 808[:X:] iff X substitutes as 1). Therefore it will recognize itself as peer name=”N1.P1” (i.e. name=”P[:X:]” à name=”P1”). Once it recognizes itself it will connect to leaders and send “AddPeer” command (see below). In turn, leaders will propagate AddPeer command to their own known peers and send their known peers definitions to the new peer. This allows the new peer to discover all other peers and vice versa.

Obviously, this way you can deploy total of up to 10 nodes (i.e. on ports 8080..8089) named as P0..P9. Defining url url="ws://MQId-Win7E64:80[:X:]/fid-[:C1.fid:]" as will allow to match ports 8000..8099 and will generate names P00..P99 (i.e. up to 100 peers total) etc.

o   Any number of parameters can be defined e.g. name=”P[:Y:][:X:]”url="ws://10.0.1.[:Y:]:808[:X:]/fid-[:C1.fid:]" where “Y” is the last part of IPv4 host address etc. Any matched parameters from the URL can be used in the name.

o   Any number of different patterns can be defined. They will be tried in order of appearance. First match will win.

o   Please make sure that generated names are unique as *there cannot be two peers with the same name*.

o   Please also take care not to use different patterns which match the same URLs. You’ll get a warning if duplicate match is detected.

o   For on-the-same-box deployment this is best used with configured port ranges (e.g. sff.server.port=”[8080..8089]). This way your configuration can be completely static and shared across all peers. Any new peer will simply pick up next port from the range and join the cluster. Everything is nice and predictable (i.e. no messing with sff.server.port=0 ephemerals)

o   As before, it is still possible to update peer local network configuration by simply sending it new peer info, but obviously, instead of doing manual updates to all the peers you can now send

<AddPeer type=”TqlEngine” name="N1.P0" url="ws://MQId-Win7E64:8080/fid-cluster" group="G1" publishTo="G1"/>

command to any peer (one of the leaders is the best choice) to onboard a new static peer manually. (Note that unless you wrap it into a namespace, qualified names must be used unless you have a simple flat namespace. Also note that node type is given which would be a tag name in case of local config update, e.g. compare <TqlEngine name=”…” …/> vs. <AddPeer type=”TqlEngine” name=”…” …/>)

Obviously you can also send

                <RemovePeer name="N1.P0"/>

to manually remove a peer from the cluster. [Qualified] name must be given to identify the peer to remove.

Cluster Limitations

  • Transport Protocol Limitation: Only web socket protocol is supported for peer communications.
  • No Auto / Mutual Discovery: There is no mutual discovery yet so each peer must be provided with the same network definition. For example, the exact same definition above can be deployed on three different engine instances, bound to ports 8080, 8082 and 8083 correspondently. Each instance will have one instance of SffNetworkFacet deployed with same fid-cluster. Each instance will figure out its own name/position in the network topology based on host, port and fid
  • No inter-group communication: There is no inter-group communications yet, only along relationship links
  • No detection of Virtual Peer: It is not yet smart enough to figure out a virtual peer deployed on the same engine so if you configure two related peers and deploy both on the same box, it will try to open a WS to itself instead of using internal communications
  • No detecton of Duplex Links: It is not yet smart enough to figure out duplex links so each peer will open client WS to its target even if server WS is already open.
  • Peer relationships are supported:
      • AdvertizeTo sends data change notifications to the target peer. This is useful while working with a shared DB. This way subscribers will get notified about DB change even if they are connected to a different peer
      • ReplicateTo sends data change requests to the target peer. This effectively replicates changes made to the source DB on the target. Only visible (by target) models/attributes will be updated so facet security is enforced. Multiple sources are allowed as well as local modifications on the target (but local modifications will currently NOT be propagated back to source). In case of multiple source-originated modifications, only the latest one will be applied. Timestamps and versions are used for collision disambiguation. In other words, version numbers work like Lamport clocks across the network. Local modifications follows the usual rules
  • In other words, this cluster works like a tree structure where children can advertise/replicate local changes to the parent. Although possible and legal, it is NOT recommended to have the same instances on different sibling children to avoid any problems with consistency at least until we test it some more. If you need to move instance from one child to another then you need to explicitly delete instance on the first child (which should propagate to parent) and then re-create instance on another child (which is also propagated). This will allow versions on the parent to start from 1 again and should not result in any problems with further updates as versions will remain synchronized between child and parent.

Deploy, Manage and Monitor Cluster

In this section we discuss how to deploy, manage and monitor cluster. A-Stack support to deploy, manage and monitor cluster can described using following high level picture

High Level Architecture of Cluster Management

                        

Managing Multiple Clusters


                        



A-Stack offers following components to help deploy, manage and monitor array of clusters


ComponentDescription
Configurator DaemonA-Stack Runtime with configurator models to help signal cluster management on a given Cluster host
TQLConsole - ThingSpace ConfiguratorUser Interface based deployment of a remote cluster
TQL Command Line Interface (Cluster Option)Command line interface to create, start, stop, list a remote cluster
TQL Command Line Interface (MonitorDashboard Option)Command line interface to provision alarms, notification options 
Cluster Monitoring DashboardRead-onlty view of the cluster, alarms, and alerts

Atomiton IT Infrastructure and Cluster Moniotring Dashboard

Cluster Monitoring Dashboard is utilitzed within Atomiton IT infrastrcuture as well. Below the current Cluster Configuration of Atomiton IT infrastructure.