kpi

Magazín KPI

№ 5 / február 2019

Message Queuing Telemetry Transport

Internet vecí predstavuje systém vzájomne prepojených zariadení, ktoré spolu komunikujú po počítačovej sieti. Na komunikáciu medzi týmito zariadeniami slúži komunikačný protokol. V tomto článku sa dozvieme viac o komunikačnom protokole Message Queuing Telemetry Transport (MQTT), ktorý slúži na výmenu správ medzi klientmi a serverom, a je jedným z najpoužívanejších protokolov v systéme Internetu vecí. Ukážeme si tiež príklad použitia tohto protokolu na ovládanie LED žiarovky pomocou mobilnej aplikácie.

Senzor a aktuátor

Inteligentné zariadenia obsahujú senzory a aktuátory. Senzory snímajú podnety v okolí zariadenia Internetu vecí, ktoré sú ďalej spracovávané na dáta. Aktuátory na základe týchto dát konajú. Ako príklad si môžeme uviesť domácnosť, v ktorej chceme mať vždy dobre osvetlenú miestnosť tak, že ak máme nedostatok denného svetla, osvetlíme miestnosť pomocou žiarovky. V tomto koncepte potrebujeme mať senzor ktorý sníma citlivosť svetla a aktuátor, ktorý bude predstavovať žiarovku.

Protokol MQTT

Komunikačný protokol MQTT slúži na výmenu správ medzi klientmi prostredníctvom centrálneho bodu, ktorý sa nazýva MQTT broker. MQTT rozlišuje dva typy klientov, a to MQTT vydavateľa a MQTT odoberateľa. MQTT klient, ktorý je vydavateľom publikuje správu k téme, pričom MQTT odoberateľ odoberá správy z danej témy. MQTT broker zabezpečuje prijímanie správ od vydavateľa a triedenie správ do tém. MQTT broker následne správy rozposiela MQTT odoberateľom, ktorí sú prihlásení na odber správ danej témy. Témy sú od seba nezávislé, čo znamená že jedno zariadenie môže v jednej téme správy publikovať, a v inej téme správy odoberať.

MQTT má rôzne alternatívy akými sú protokoly CoAP a AMQP, pričom MQTT má mnoho výhod. V porovnaní s CoAP nám poskytuje tri úrovne kvality prenosu (QoS), pričom CoAP len dva. Protokol CoAP je navrhnutý na komunikáciu entít 1:1 a ako sme sa už dozvedeli MQTT je schopný komunikácie M:N. V porovnaní s protokolom AMQP, MQTT podporuje zariadenia s obmedzenými zdrojmi, čo AMQP nedokáže.

Architektúra MQTT (zdroj)
Architektúra MQTT (zdroj)

MQTT využíva na transportnej vrstve sieťového OSI modelu spoľahlivý protokol TCP. Najčastejšie sa využíva pot 1883, no pre spojenie TLS sa využíva port 8883.

MQTT témy

Témy v MQTT sú hierarchicky usporiadané do viacerých úrovní, čo nám slúži na uľahčenie identifikovania zariadenia. Na oddelenie úrovní hierarchie používame znak "/". Pri vytváraní tém existujú tipy, ktoré nám uľahčia prácu s protokolom MQTT. Viac sa môžeme dozvedieť v článku MQTT Essentials Part 5: MQTT Topics & Best Practices.

Ako príklad si môžeme uviesť hierarchickú štruktúru pre teplotu v detskej izbe na prvom poschodí a pre svetlo na prízemí v kuchyni a na chodbe, na prvom poschodí v spálni a na druhom poschodí na chodbe:

  • domov/prizemie/kuchyna/svetlo
  • domov/prizemie/chodba/svetlo
  • domov/1poschodie/spalna/svetlo
  • domov/1poschodie/detskaizba/teplota
  • domov/2poschodie/chodba/svetlo

Zástupný znak

Štandardne sa odoberateľ prihlasuje na odber spôsobom, kedy sa prihlási na práve jednu tému. Zástupný znak nám poskytuje možnosť prihlásiť sa jediným odberom na viacero tém súčasne. U zástupných znakov rozlišujeme dva typy substitúcií:

  1. Substitúcia jednej úrovne
  2. Substitúcia viacerých úrovní

Substitúcia jednej úrovne pomocou zástupného znaku sa vykonáva pomocou znaku „+“. Zástupný znak použijeme ak chceme odoberať všetky témy danej úrovne. Ak by sme teda chceli dostať intenzitu svetla vo všetkých izbách na prízemnom podlaží z vyššie uvažovanej hierarchickej štruktúry kde ide teda o intenzitu svetla v kuchyni a v chodbe, použijeme substitúciu jednej úrovne, teda na správnej úrovni použijeme znak „+“:

  • domov/prizemie/+/svetlo

Substitúciu viacerých úrovní pomocou zástupného znaku realizujeme pomocou znaku „#“, ktorý nám vymedzí všetky možné témy všetkých nasledujúcich dostupných úrovní smerom k cieľovej úrovni. Je dôležité taktiež podotknúť, že "#" musí byť posledným znakom v odoberanej téme. Ak by sme sa chceli prihlásiť na odber správ z celej domácnosti z vyššie uvažovanej hierarchickej štruktúry, použijeme substitúciu viacerých úrovní, a to následovne:

  • domov/#

Praktický príklad rozsvietenia LED žiarovky

V ďalšej časti tohoto článku si ukážeme demonštračný príklad komunikácie protokolu MQTT. Pripravíme si aplikáciu na platforme Android, v ktorej naprogramujeme MQTT klienta, ktorý publikuje správy k téme s názvom "domov/raspberrypi/led". Na Raspberry Pi si naprogramujeme program, ktorý bude obsahovať MQTT klienta, ktorý sa prihlási na odber správ z témy "domov/raspberrypi/led" a ukážeme si, ako správne zapojiť LED žiarovku k Raspberry Pi. Stlačením tlačidla v Android aplikácií budeme schopní túto LED žiarovku rozsvietiť alebo zhasnúť. Zdrojové kódy obidvoch aplikácii sú voľne dostupné na školskom GitLabe.

Príprava MQTT brokera (CloudMqtt)

MQTT brokera, ktorý zabezpečuje redistribúciu správ, zabezpečíme pomocou bezplatnej služby CloudMQTT, kde si vieme jednoducho vytvoriť zabezpečeného MQTT brokera pomocou troch rýchlych krokov. CloudMQTT nám poskytuje množstvo použiteľných súčastí ako logovanie a debugovanie.

Po prihlásení teda klikneme na vytvorenie novej inštancie.

Vytvorenie MQTT inštancie
Vytvorenie MQTT inštancie

Pri vytváraní tejto inštancie stačí zvoliť meno inštancie a ponechať plán „Cute Cat (Free)“, ktorý je bezplatný. V kolonke „Tags“ si vieme slovne označkovať inštanciu pre lepšiu orientáciu v jednotlivých inštanciách.

Nastavenie názvu
Nastavenie názvu

Následne si zvolíme jeden zo serverov ktorý nám bude hosťovať našu inštanciu MQTT brokera.

Výber regiónu a data centra
Výber regiónu a data centra

Posledným krokom je už len potvrdenie na vytvorenie nakonfigurovanej inštancie.

Potvrdenie vytvorenia inštancie
Potvrdenie vytvorenia inštancie

Keď už teda máme zabezpečenú inštanciu MQTT brokera, zvolíme si zobrazenie podrobnosti o novovytvorenej inštancií. Následné parametre tejto inštancie využijeme v ďalšej implementácií MQTT klientov vydavateľa a odoberateľa.

Informácie o inštancií
Informácie o inštancií

Je dôležité dbať na bezpečnosť, a preto by sa v zavedených riešeniach do praxe nemal využívať externý broker. Mali by sme teda zvážiť vytvorenie vlastného lokálneho MQTT brokera, napríklad priamo na Raspberry Pi.

Pripojenie LED žiarovky k Raspberry Pi

V tejto ukážke bol použitý vnorený systém Raspberry Pi 3 model B+, ktorý obsahuje rozhrania, pričom nás budú zaujímať len vstupno-výstupné piny. Vstupno-výstupné piny slúžia ako fyzické rozhranie medzi Raspberry Pi a vonkajším svetom a nachádzajú sa na okraji dosky. Vieme na nich napojiť senzory, aktuátory a obvody. Raspberry Pi 3 model B+ obsahuje 40 pinových hlavičiek s rôznymi funkcionalitami. Dôležité pre nás budú len piny štandardného typu GPIO, na ktorých vieme odovzdávať 3.3 voltov a piny typu GND, ktoré slúžia ako uzemňovacie. Ďalšie informácie o vstupno-výstupných pinoch nájdeme v dokumentácií Raspberry Pi GPIO.

Schéma zapojenia aktuátora vyzerá následovne, pričom ako štandardný pin použijeme pin 4, ktorý sa nachádza na pozícií 7 a uzemňovací pin, nachádzajúci sa na pozícií 6:

Schéma zapojenia aktuátora
Schéma zapojenia aktuátora

MQTT odoberateľ (Raspberry Pi) v jazyku Python

Na vnorenom systéme Raspberry Pi si pripravíme MQTT klienta odoberateľa v jazyku Python, ktorý bude odoberať MQTT správy z témy na ktorú je prihlásený. Uvažujme, že Raspberry Pi sa nachádza doma a je naň pripojená LED žiarovka. Meno témy by malo vyplývať z umiestnenia nášho aktuátora LED žiarovky. Nazvime si teda tému: „domov/raspberrypi/led“. Podľa obsahu správy sa program následne rozhodne či LED žiarovku rozsvieti alebo zhasne. Pri obsahu správy „ON“ sa teda LED žiarovka rozsvieti a pri obsahu „OFF“ zhasne.

Pred samotnou implementáciou si stiahneme knižnicu paho-mqtt nasledujúcim spôsobom:

sudo pip install paho-mqtt

Vytvoríme si program v jazyku Python, kde náš zdrojový kód bude vyzerať následovne:

import RPi.GPIO as GPIO
import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("connected OK")
        client.subscribe("domov/raspberrypi/led")
    else:
        print("Bad connection")


def lightLEDon():
    print("turning LED ON")
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(4, GPIO.OUT)
    GPIO.output(4, GPIO.HIGH)

def lightLEDoff():
    print("turning LED OFF")
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(4, GPIO.OUT)
    GPIO.output(4, GPIO.LOW)
     
def on_message(client, userdata, message):
    print ("MQTT message payload: " + message.payload)
    if message.payload == "ON":
        lightLEDon()
    elif message.payload == "OFF":
        lightLEDoff()
    else:
        print("Unexpected message: " + message.payload)

client = mqtt.Client("rpi")
client.connect("m23.cloudmqtt.com", 15810)      # connect to broker
client.username_pw_set("qfzjmzmt", "1EujNfl9m5Xw")

client.on_connect = on_connect  # bind call back function
client.on_message = on_message
client.loop_forever()

V programe sa nachádzajú následovné funkcie pre správne fungovanie protokolu MQTT:

  • on_connect(client, userdata, flags, rc), ktorá je volaná po pripojení k MQTT brokerovi a prihlásení na odber danej témy;
  • on_message(client, userdata, message) je volaná pre príjem správy z odoberanej témy, kde sa následne volajú podľa obsahu MQTT správy funkcie lightLEDon() alebo lightLEDoff();
  • lightLEDon() — zabezpečí rozsvietenie LED žiarovky;
  • lightLEDoff() — zabezpečí zhasnutie LED žiarovky.

Keď teda máme pripravené hore uvedené funkcie, je možné nadviazať komunikáciu s MQTT brokerom. Ako prvé si vytvoríme objekt klienta (premenná client), nad ktorým zavoláme metódu connect(ip address, port), kde prvým argumentom je IP adresa MQTT brokera a druhým argumentom je port, pod ktorým bude naša komunikácia fungovať. Ďalšou metódu username_pw_set(login, password) nastavíme prihlasovacie meno a heslo pre nadviazanie spojenia s MQTT brokerom. Následne do členských premenných klienta priradíme funkcie on_connect a on_message, ktoré sa zavolajú pri nadviazaní spojenia a príjme správy. V implementácií programu nám už ostáva len zavolať metódu slučky a čakať na príjem odoberaných správ z danej témy.

Program spustíme príkazom v konzole:

python názov programu

MQTT vydavateľ v jazyku Java — aplikácia pre Android

Implementácia tohto Android projektu nám zaistí MQTT vydavateľa. V našej hlavnej triede budeme mať dve tlačidla „ON“ a „OFF“, kde použitie „ON“ pošle MQTT správu s obsahom „ON“, čo nám v implementácií MQTT odoberateľa zaistí rozsvietenie LED žiarovky. Po aktivovaní tlačidla „OFF“ sa nám LED žiarovka vypne.

Tlačidlá ON a OFF
Tlačidlá ON a OFF

Publikovanie bude prebiehať voči serveru „m23.cloudmqtt.com“ pod portom 15810, s prihlasovacím menom a heslom, ktoré nám poskytla služba www.cloudmqtt.com na MQTT tému s názvom „domov/raspberrypi/led“.

Aby sme mohli používať službu MQTT, budeme potrebovať knižnice na jeho nastavenie. Ako prvé si pridáme do build.gradle našej Android aplikácie tieto knižnice:

dependencies {
  implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
  implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

Ďalším krokom je pridať povolenia pre našu Android aplikáciu do AndroidManifest.xml. Aplikácia bude potrebovať jednotlivé povolenia z týchto dôvodov:

  • android.permission.INTERNET pre internetové spojenie s MQTT brokerom,
  • android.permission.ACCESS_NETWORK_STATE na rozpoznanie kedy sa internetové spojenie nadviaže alebo preruší,
  • android.permission.WAKE_LOCK potrebuje pre inicializáciu a udržanie spojenia s MQTT brokerom. Bez tohoto povolenia sa nám aplikácia zrúti s chybovým kódom RuntimeException hneď ako sa aplikácia pokúsi o inicializáciu spojenia.

Tieto riadky teda zapíšeme pred otváraciu značku application:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<uses-permission android:name="android.permission.WAKE_LOCK" />

A pre zaregistrovanie služby MQTT napíšeme pred zatváraciu značku application:

 <service android:name="org.eclipse.paho.android.service.MqttService" />

Vytvoríme si triedu MqttService, ktorá nám bude slúžiť ako MQTT klient, ktorý bude publikovať správy k danej téme. Metóda tejto triedy publish(String message) publikuje správu k téme, ktorej obsahom je argument tejto metódy. Metóda connect() zabezpečí pripojenie klienta k brokerovi. V konštruktore triedy si vytvoríme objekt triedy MqttAndroidClient, kde argumentmi konštruktora sú kontext aplikácie, IP adresa MQTT brokera a identifikátor, pod ktorým budeme v koncepcií MQTT vystupovať. Ďalším krokom pre správnu konfiguráciu klienta je vytvorenie objektu triedy MqttConnectOptions, nad ktorým vieme zavolať metódy pre nastavenie prihlasovacieho mena a hesla pre MQTT spojenie, ktoré nám poskytla služba CloudMQTT. Následne si vytvoríme objekt triedy IMqttToken, ktorý nám bude slúžiť pre priradenie objektov tried MqttAndroidClient a MqttConnectOptions.

package com.domin.mqttpublisher;

import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttService {
    private final MqttAndroidClient client;
    private MqttConnectOptions options;
    private static final String topic = "domov/raspberrypi/led";

    public MqttService(Context context) {
        client = new MqttAndroidClient(context, "tcp://m23.cloudmqtt.com:15810", "Android");
        options = new MqttConnectOptions();
        options.setUserName("qfzjmzmt");
        options.setPassword("1EujNfl9m5Xw".toCharArray());
    }

    public void publish(String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        client.publish(topic, mqttMessage);
        Log.d("mqtt", "Message published:" + mqttMessage.toString());
    }

    public void connect() throws MqttException {
        IMqttToken token = client.connect(options);
    }
}

V našej hlavnej triede si vytvoríme objekt MqttService, nad ktorým zavoláme metódu connect(), hneď po zapnutí aplikácie. Po stlačení tlačidiel buttonOn, resp. buttonOff sa zavolá pod objektom MqttService metóda na publikovanie správy publish(String message).

package com.domin.mqttpublisher;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import org.eclipse.paho.client.mqttv3.MqttException;

public class MainActivity extends AppCompatActivity {
    static final String ON = "ON";
    static final String OFF = "OFF";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Button button_ON = findViewById(R.id.button_ON);
        Button button_OFF = findViewById(R.id.button_OFF);

        final MqttService mqttService = new MqttService(this.getApplicationContext());
        try {
            mqttService.connect();
        } catch (MqttException e) {
            Log.d("mqtt", "Cannot connect to the MQTT broker.");
        }

        button_ON.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                try {
                    mqttService.publish(ON);
                } catch (MqttException e) {
                    Log.d("mqtt", "Cannot publish message to the MQTT broker.");
                }
            }
        });

        button_OFF.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                try {
                    mqttService.publish(OFF);
                } catch (MqttException e) {
                    Log.d("mqtt", "Cannot publish message to the MQTT broker.");
                }
            }
        });
    }
}

MQTT Quality of Service

Quality of Service (QoS) označuje techniky, ktorými vieme prioritizovať a kontrolovať dátový tok. Protokol MQTT obsahuje tri úrovne QoS, a to QoS 0, QoS 1 a QoS 2, ktoré zabezpečujú potvrdzovanie odoslanej správy, čím dokážeme zabezpečiť úspešné prijatie odoslanej správy.

QoS 0, teda najnižšia úroveň, zabezpečuje odoslanie správy bez následného potvrdenia, či správa bola odoberateľom prijatá, čo znamená že odosielateľ odošle správu k danej téme centrálnemu bodu, a centrálny bod ju pošle ďalej odoberateľom a nemáme zaručené, že odoberatelia správu prijali. Stredná úroveň nám hovorí, že centrálny bod čaká na potvrdenie od odoberateľa o doručení správy, čo môže mať ale za následok, že správa bude doručená viackrát. Najvyššia úroveň nám zaručuje, že každá správa bude doručená a to práve raz. Úroveň QoS 2 je najpomalšia úroveň, no je najspoľahlivejšia.

Pri výbere správnej QoS úrovne musíme zvážiť podmienky v ktorých sa nachádzame. Ak je konektivita medzi vydavateľom a odoberateľom spoľahlivá a nepotrebujeme mať doručenú nutne každú jednu odoslanú správu, vieme sa uspokojiť s úrovňou QoS 0. Ako príklad si môžeme uviesť systém, ktorý obsahuje senzor na monitorovanie hladiny rieky, pričom hladina je senzorom snímaná každých 5 minút. Občasné vynechanie jednotlivých správ pre nás v tomto príklade nemusí znamenať zlyhanie.

Na druhej strane ak dáta ktoré posielame sú pre nás dôležité, a nemôžeme si dovoliť stratu týchto správ, QoS 1 je pre nás ideálnou možnosťou. Výberom tejto služby riskujeme príjem duplicitných správ. Uvažujme o monitorovaní hladiny s tým, že sa koná v čase potencionálnych záplav. Každá správa je teda pre nás dôležitá, no duplicitu správ môžeme považovať za zanedbateľnú.

Zaručený príjem jednej správy bez duplicitných správ nám poskytne QoS 2, pričom čas príchodu správy môže byť väčší ako pri ostatných úrovniach. Predstavme si, že zbierame zosnímané dáta pre ďalšiu analýzu. Tým pádom potrebujeme mať doručenú správu práve raz, pričom duplicitné správy by túto analýzu ohrozili.

Do nášho príkladu si taktiež môžeme doplniť nastavenie úrovne QoS. Predpokladajme že sme pripojení do nespoľahlivej počítačovej siete a príjem správy odoberateľom je pre nás dôležitým faktorom. V tomto prípade si teda nastavíme úroveň QoS 2. Do metódy publish(String message) doplníme:

mqttMessage.setQos(2);

Je dôležité zdôrazniť, že úroveň QoS musíme nastaviť pred zavolaním metódy publish nad objektom klienta. Celá metóda teda vyzerá následovne:

public void publish(String message) throws MqttException {
    MqttMessage mqttMessage = new MqttMessage(message.getBytes());
    mqttMessage.setQos(2);
    client.publish(topic, mqttMessage);
    Log.d("mqtt", "Message published:" + mqttMessage.toString());
}

Záver

Najvhodnejší scenár pre praktické použitie protokolu MQTT je ten, kde používame zdrojmi obmedzené zariadenie odkázané na batériu, sme pripojení do nespoľahlivej počítačovej siete a chceme odosielať správy viacerým klientom súčasne. MQTT nie je náročným protokolom na spotrebu batérie a pomocou úrovní QoS vieme docieliť spoľahlivosť aj v nespoľahlivom prostredí alebo naopak ušetriť čas a prenášané dáta v prípade, že spoľahlivosť nepotrebujeme.

Motiváciou pri písaní tohoto článku je neustále zvyšujúca sa popularita technológie Internetu vecí a predošlá skúsenosť s protokolom MQTT pri písaní bakalárskej práce „Mobilná aplikácia pre zber dát z IoT prostredia“, kde som tento protokol využil v komplexnejšom riešení. Pri implementovaní praktickej časti bakalárskej práce som nenarazil na komplikácie resp. obmedzenia tohoto protokolu. Dúfam, že tento článok bude čitateľom nápomocný a pomôže im so začiatkami budovania systémov Internetu vecí.