Developer | Java

Simple Messaging with MQTT

Use the principal IoT messaging protocol to asynchronously send and receive data from devices—in this case, from drones.

By Gastón Hillar


MQ Telemetry Transport (MQTT) is a lightweight publish-subscribe messaging protocol, especially suitable for small devices but also useful for any device that requires messaging over a network. In this article, I describe how to publish and receive messages with Java through the Mosquitto broker with the asynchronous API provided by the Eclipse Paho project Java client.

In this article, I develop and explain a project in which three drones use a Mosquitto broker to publish and receive text messages. Some of these messages provide only information to a channel and others specify a command and a destination by which they tell specific drones to display their altitude in feet. I use the asynchronous methods included in the Eclipse Paho Java Client to connect to the broker, publish messages, and subscribe to topics. This way, you can see how to work with MQTT in Java using a nonblocking behavior.

The Pieces of the Puzzle

I will shortly explain how to include the necessary references to work with the latest version of the Eclipse Paho Java Client. However, before moving forward, it is necessary to understand the different pieces of this puzzle: the MQTT protocol, Mosquitto, the Eclipse Paho project, and Eclipse Paho Java Client.

The MQTT protocol is a machine-to-machine (M2M) connectivity protocol used extensively in the Internet of Things (IoT), and it is gaining popularity in mobile and web applications. MQTT is a protocol that works with a publish-subscribe mechanism and runs on top of the TCP/IP protocol. It is lighter than the HTTP protocol and, therefore, it is a very interesting option whenever you need to send and receive data in real time with a publish-subscribe model and you need the lowest possible footprint. However, as always happens, the reduced footprint comes at a price: MQTT does not offer great extensibility.

Mosquitto is an open source message broker that implements two versions of the MQTT protocol: 3.1 and 3.1.1. You can use Mosquitto to make any device subscribe to a specific channel, known as a topic in MQTT terminology. All subscribed devices will receive all the messages published by other devices to this topic. Mosquitto, with its publish-subscribe model, is an iot.eclipse.org project, also known as Eclipse IoT, and it is provided under the Eclipse Distribution License (EDL). It is important to know that, at the time of this writing, MQTT version 5 has reached the working draft stage.

You will never connect one client to another client through a direct connection. The dialogue is always between a client and the MQTT broker.

The Eclipse Paho project offers an open source implementation of an MQTT client library that is capable of working with the same two versions of the MQTT protocol supported by Mosquitto: 3.1 and 3.1.1. The Eclipse Paho Java Client provides both a synchronous and an asynchronous API. As previously explained, I will demonstrate how to work with its asynchronous API. However, bear in mind that there is a synchronous version you can use if you don’t need the nonblocking features and you want to keep your code simpler.

It is important to understand that the MQTT connection is always between a client and the broker—in this case, between Paho Java Client and Mosquitto. You will never connect one client to another client through a direct connection. The dialogue is always between a client and the MQTT broker. In my example, each drone establishes a connection with Mosquitto, and a master drone sends messages with commands that are meant to be processed by specific drones.

It is possible for any client to subscribe to a specific topic. By doing so, it will receive all the messages published to that topic. In addition, the client can publish messages to that specific topic or to other topics. In my example, I work with just one topic, and I won’t take advantage of the wildcards that allow you to work with many topics at the same time. However, once you understand how to work with the Java client, you can use the sample code to take advantage of the additional features, based on your specific needs.

I don’t want to focus on the configuration of a Mosquitto message browser. Instead, I want to put my efforts into demonstrating how to publish and receive messages. Eclipse allows you to use a publicly accessible sandbox server for the Eclipse IoT projects at iot.eclipse.org port 1883 identified with the following URI: tcp://iot.eclipse.org:1883. I will use this sandbox server as the Mosquitto message broker, without any security. That way, you don’t have to lose time setting up a Mosquitto message broker to test the example. Of course, a real-life application would require you to set up a Mosquitto message broker. I provide many useful links to allow you to learn more about Mosquitto and MQTT at the end of this article.

You need to make sure that your software and hardware firewalls allow the application or the IDE to work with TCP in port 1883.

Using the Java Client for MQTT

At the time of this writing, the latest release of Paho Java Client is 1.1.0. I will use this version to include the necessary dependencies. The easiest way to use the Java client is to start a Maven project in your favorite Java IDE and add the following lines before </project> in the pom.xml file. I’m assuming that you are working with an empty Maven project. If you have other repositories or dependencies, make sure you edit the pom.xml file to include the new entries, as shown next. You can also use the features included in your favorite IDE or its Maven plugins to add the repository and the dependency.

<repositories>
  <repository>
    <id>Eclipse Paho Repo</id>
    <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
    </repository>
</repositories>  
<dependencies>
  <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>
      org.eclipse.paho.client.mqttv3
    </artifactId>
    <version>1.1.0</version>
  </dependency>
</dependencies>

I am going to create the following three classes. All these classes are included in the code bundle for this article.

  • MessageActionListener implements the org.eclipse.paho.client.mqttv3.IMqttActionListener interface. I use instances of this class to specify the callback that will run code whenever a message has been successfully published to a topic.
  • Drone represents a drone that has a name and can send and receive messages through the MQTT broker. This class not only encapsulates the data and logic related to drones as well as the messages and the commands included in messages, but it also implements the MqttCallback and ImqttActionListener interfaces defined in org.eclipse.paho.client.mqttv3. There is no type in these interface names—the naming convention for interfaces in the Java library is a bit confusing because some interfaces start with I while others don’t. I use Drone instances as callbacks for specific events.
  • MqttSample01 creates three instances of the Drone class, makes them connect to the MQTT broker, and sends messages and commands. This class declares the main static method for the example application.

Creating a Class That Is Notified When Asynchronous Actions Are Complete

The asynchronous API requires you to work with callbacks. In this example, I demonstrate many ways of working with the necessary callbacks. The following code shows the import statements and the code for the MessageActionListener class that implements the IMqttActionListener interface.

import 
  org.eclipse.paho.client.mqttv3.IMqttActionListener;
import 
  org.eclipse.paho.client.mqttv3.IMqttToken;
public class MessageActionListener 
implements IMqttActionListener {
    protected final String messageText;
    protected final String topic;
    protected final String userContext;

    public MessageActionListener(
      String topic, 
      String messageText,
      String userContext) {
        this.topic = topic;
        this.messageText = messageText;
        this.userContext = userContext;
    }

    @Override
    public void onSuccess(
      IMqttToken asyncActionToken) {
        if ((asyncActionToken != null) && 
             asyncActionToken.getUserContext()
               .equals(userContext)) 
        {
         System.out.println( String.format(
           "Message '%s' published to topic '%s'",
           messageText, topic));
        }
      }

    @Override
    public void onFailure(
        IMqttToken asyncActionToken, 
        Throwable exception) {
            exception.printStackTrace();
        }
}


What does it mean that a message was successfully delivered by the MQTT broker? It depends on the quality of service (QoS) that you select.

When I create an instance of the MessageActionListener class, I need to specify the topic to which the message is going to be published, the message text, and a user context. The constructor saves the received values in immutable fields that have the same names as the received arguments.

The class implements the onSuccess method required by the IMqttActionListener interface. Whenever an instance of the MessageActionListener class is used as a callback for an asynchronous action, the Java client invokes the onSuccess method when the action has been completed successfully, and it passes an asynchronous action token (asyncActionToken) of type IMqttToken as an argument to specify the action that has been completed. The method makes sure that asyncActionToken is not null and checks whether the value returned by asyncActionToken.getUserContext() matches the userContext saved by the constructor. If they match, the successful event is related to the event I wanted to monitor for its successful execution, and the code displays a message containing the message text that has been published and the name of the destination topic. I use an instance of this class as a callback for each message that is published, and thereby I can see all the successfully published messages.

The class also implements the onFailure method, which is required by the IMqttActionListener interface and simply calls the printStackTrace method for the received exception.

Specifying the Quality of Service

What does it mean that a message was successfully delivered by the MQTT broker? It depends on the quality of service (QoS) that you select when you work with the MQTT protocol. The QoS level is the agreement between the publisher and the receiver of a message about the guarantees for delivering the message. Delivering a message involves publishing from the client to the broker and then from the broker to the subscribed client. MQTT supports three possible QoS values:

  • Level 0 means at most once: This level provides the same guarantee as the TCP protocol. The message is not acknowledged by the receiver. The sender neither stores nor re-delivers any messages. As you might expect, this level has the lowest overhead.
  • Level 1 means at least once: This level provides a guarantee that the message will be delivered at least once to the receiver. The main drawback is that this QoS level might generate duplicates, because the message can be delivered more than once. The sender stores the message until it receives an acknowledgment. In the event the acknowledgment isn’t received within a specific time, the sender will publish again.
  • Level 2 means exactly once: This level provides a guarantee that the message is delivered only once to the receiver. This QoS level makes sure the message isn’t delivered more than once and, therefore, there is no chance for duplicates. However, as you might expect, it has the highest overhead because it requires two flows between the sender and receiver (one to receive, the other to send acknowledgment of receipt). Only when the entire flow is completed is the message considered to be successfully delivered.

In this example, I will work with QoS level 2, because I don’t want the possibility of receiving a command twice. Messages are delivered even across network and client restarts. However, for that to occur, each message needs to be stored in a safe location until it has been successfully delivered. The Java client works with a pluggable persistence mechanism to store the messages. To keep things simple in this example, I will use memory-based persistence, which is not the best option with QoS level 2. However, you can easily explore and configure other pluggable persistence mechanisms to avoid losing messages in the event the application, the JVM, the computer, or the device running the application stops working or shuts down.

Creating a Class to Represent a Drone That Processes Messages

The following code shows the key aspects of the Drone class that implements the MqttCallback and IMqttActionListener interfaces. The full class is available in the downloadable code.

When I create an instance of the Drone class, it is necessary to specify the desired name for the drone. The class defines many constants that I will use throughout the code and determine the string that defines a command key, the separator, the command that retrieves the altitude for the drone, the topic to which the Java client will subscribe, the desired QoS level, and the encoding that will be used for the messages, as shown below.

public class Drone implements MqttCallback, 
    IMqttActionListener {
    public static final String COMMAND_KEY = "COMMAND";
    public static final String COMMAND_SEPARATOR = ":";
    public static final String 
        GET_ALTITUDE_COMMAND_KEY = "GET_ALTITUDE";
    // Replace with your own topic name
    public static final String TOPIC =
        "java-magazine-mqtt/drones/altitude";

    public static final String ENCODING = "UTF-8";

    // Quality of Service = Exactly once
    // I want to receive all messages exactly once
    public static final int QUALITY_OF_SERVICE = 2;
    protected String name;
    protected String clientId;
    protected MqttAsyncClient client;
    protected MemoryPersistence memoryPersistence;
    protected IMqttToken connectToken;
    protected IMqttToken subscribeToken;

    public Drone(String name) { this.name = name; }

    public String getName() { return name; }

It is very important that you replace the TOPIC string with your own, unique topic name. The Mosquitto broker I am using in the example is public and, therefore, I need to use a unique topic to make sure I receive only the messages published by my code. I have specified "java-magazine-mqtt/drones/altitude" for TOPIC in this example. MQTT uses topic names that have a hierarchy and are separated by a slash (/). Another example of a topic name is "java-client/samples/drones/commands/altitude".

The connect method has some code that is commented out just to remind you that in a production environment you shouldn’t send messages over an insecure connection and your MQTT broker should work with TLS/SSL and require the appropriate authentication. The code creates an instance of the MemoryPersistence class to use as the previously explained pluggable persistence, generates a unique client ID, and creates an instance of MqttAsyncClient named client. This way, I create the entry point for the Java client with the asynchronous API.

public void connect() {
    try {
        MqttConnectOptions options = 
            new MqttConnectOptions();
        // options.setUserName(
        //    "replace with your username");
        // options.setPassword(
        //    "replace with your password"
        //    .toCharArray());
        // Replace with ssl:// and work with TLS/SSL
        // best practices in a 
        // production environment
        memoryPersistence = 
            new MemoryPersistence();
        String serverURI = 
            "tcp://iot.eclipse.org:1883";
        clientId = MqttAsyncClient.generateClientId();
        client = new MqttAsyncClient(
                       serverURI, clientId, 
                       memoryPersistence);
        // I want to use this instance as the callback
        client.setCallback(this);
        connectToken = client.connect(
            options, null, this);
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

public boolean isConnected() {
    return (client != null) && 
           (client.isConnected());
}

@Override
public void connectionLost(Throwable cause) {
    // The MQTT client lost the connection
    cause.printStackTrace();
}

The line that calls the setCallback method uses this as an argument because I use the actual instance as the callback that will execute specific methods when some asynchronous events occur. The setCallback method requires an argument of the MqttCallback type. The Drone class implements the MqttCallback interface that requires the following three methods: connectionLost, messageArrived, and deliveryComplete. I’ll get back to these methods later. It is very important to call the setCallback method before establishing the connection with the MQTT broker.

The line that calls the client.connect method specifies this as the last argument because I will also use the actual instance as the callback that will execute specific methods when some asynchronous events related to the connection occur. The fourth argument for the connect method requires an argument of the IMqttActionListener type.

The Drone class implements the IMqttActionListener interface that requires these two methods: onSuccess and onFailure.

@Override
public void onSuccess(
              IMqttToken asyncActionToken) {
        if (asyncActionToken.equals(connectToken)) {
            System.out.println( String.format(
                "%s successfully connected",name));
            try {
                subscribeToken = client.subscribe(
                    TOPIC, QUALITY_OF_SERVICE, 
                    null, this);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } 
        else if (asyncActionToken.equals( 
                       subscribeToken)) 
        {
            System.out.println( String.format(
                "%s subscribed to the %s topic",
                name, TOPIC));
            publishTextMessage( String.format(
                "%s is listening.", name));
        }
    }

@Override
public void onFailure(IMqttToken asyncActionToken, 
                      Throwable exception) 
{
    // The method will run if an operation failed
    exception.printStackTrace();
}

I implemented the same interface in the MessageActionListener class. However, in this case, I will implement the interface to run code when the success or failure is related to the connection, not with messages as happened in the MessageActionListener class.

The code saves the IMqttToken returned by the client.connect method to the connectToken protected field. This way, I am able to check whether the onSuccess method’s execution is related to this token or not. The connection uses asynchronous execution, and the onSuccess method for the Drone class will be executed after the connection with the MQTT broker has been successfully established.

The onSuccess method displays a message indicating that the specific drone has been successfully connected. Then, the code calls the client.subscribe method with the topic to which I want to subscribe and the desired QoS level. The call to this method specifies this as the last argument because I will also use the actual instance as the callback that will execute specific methods when some asynchronous events related to the subscription occur. The fourth argument for the subscribe method requires an argument of the IMqttActionListener type. So, after a successful subscription, the Java client will run the same onSuccess method, but the code will recognize that the event is related to the subscription because the received token won’t match the connection token and instead will match the subscription token. It is not necessary to make it this way. It is possible to create an anonymous type to declare the methods that are necessary for the subscription callback for the asynchronous subscription. However, I wanted to demonstrate the usage of the tokens.

The onSuccess method displays a message indicating that the specific drone has been successfully subscribed to the specific topic. Then, the code calls the publishTextMessage method to publish a message to the topic indicating that the drone is listening.

public MessageActionListener publishTextMessage(
           String messageText) 
    {
        byte[] bytesMessage;
        try {
            bytesMessage = 
                messageText.getBytes(ENCODING);
            MqttMessage message;
            message = new MqttMessage(bytesMessage);
            String userContext = "ListeningMessage";
            MessageActionListener actionListener = 
                new MessageActionListener(
                    TOPIC, messageText, userContext);
            client.publish(TOPIC, message,
                    userContext,actionListener);
            return actionListener;
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
        return null;
    } catch (MqttException e) {
        e.printStackTrace();
        return null;
    }
}

The publishTextMessage method receives a String in the messageText argument with the text message that the drone has to publish to the topic and returns an instance of the MessageActionListener class. The method calls the messageTextgetBytes method to generate a byte array that takes into account the UTF-8 encoding. The MqttMessage class requires a byte array with the message to be sent as an argument to create an instance. Then, the code creates an instance of the MessageActionListener class named actionListener and calls the client.publish method to publish the message on the topic with an actionListener instance as the last argument that specifies the desired callback. This way, the code declared in the onSuccess method for this class will run after the message has been successfully published to the specified topic.

The publishCommand method takes two String arguments: the command name and the destination name. The method uses the received values to build a command and then calls the previously explained publishTextMessage method with this command as an argument. For example, the following command requests a drone whose name is Drone #1 to print its altitude in feet: COMMAND:GET_ALTITUDE:DRONE #1.

public MessageActionListener publishCommand(
           String commandName, String destinationName) 
{
     String command = String.format("%s%s%s%s%s",
         COMMAND_KEY, COMMAND_SEPARATOR,
         commandName, COMMAND_SEPARATOR,
         destinationName);
     return publishTextMessage(command);
}

Whenever a message arrives, the messageArrived method will be executed. The code in this method receives a String with the topic and an instance of MqttMessage.

@Override
public void messageArrived(String topic, 
               MqttMessage message) throws Exception 
{
    // A message has arrived from the MQTT broker
    // The MQTT broker doesn't send back 
    // an acknowledgment to the server until 
    // this method returns cleanly
    if (!topic.equals(TOPIC)) {
        return;
    }

    String messageText = 
        new String(message.getPayload(), ENCODING);
    System.out.println( String.format(
        "%s received %s: %s", name, topic,
        messageText));
    String[] keyValue = 
        messageText.split(COMMAND_SEPARATOR);
    if (keyValue.length != 3) {
        return;
    }
    if (keyValue[0].equals(COMMAND_KEY) &&
        keyValue[1].equals(
            GET_ALTITUDE_COMMAND_KEY) &&
            keyValue[2].equals(name)) 
        {
            // Process the "get altitude" command
            int altitudeInFeet = ThreadLocalRandom
                .current().nextInt(1, 6001);
            System.out.println( String.format(
                "%s altitude: %d feet",
                name, altitudeInFeet));
        }
}

First, the code makes sure that the topic is the one that the drone is interested in. Then, the code calls the message.getPayload() method to retrieve the byte array with the message. The code creates a String instance with the byte array and the UTF-8 encoding as arguments to generate the appropriate String. The code displays a message indicating that the specific drone has received a message in the topic. Finally, the code processes the String with the received message to determine whether the message is a command. If it has the command-key prefix, the code decodes the command and if it’s the “get altitude” command, the code displays a message with a pseudorandom altitude value for the drone expressed in feet.

Whenever a message has been successfully delivered, the deliveryComplete method is executed with a token (token) of type IMqttDeliveryToken as an argument to allow you to identify which message has been delivered. As previously explained, the meaning of a successfully delivered message will depend on the QoS level. In this case, I’m working with QoS level 2 and this method will be executed after the acknowledgment from the receiver arrives. Here, I didn’t add code to this method and I just declared it to implement all the methods required by the interface.

@Override
public void deliveryComplete(
                IMqttDeliveryToken token) {
    // Delivery for a message has been completed
    // and all acknowledgments have been received
}

Publishing Messages

The following code lines show the code for the MqttSample01 class that declares the main method. The main method creates three instances of the Drone class—drone1, drone2, and masterDrone—and calls the connect method for each of these instances. I didn’t use any kind of list to work with the drones because I wanted the code to be easier to read. Forgive me for repeating some code in this main method.

public class MqttSample01 {
    public static void main(String[] args) {
    Drone drone1 = new Drone("[Drone #1]");
    drone1.connect();
    Drone drone2 = new Drone("[Drone #2]");
    drone2.connect();
    Drone masterDrone = new Drone("*Master Drone*");
    masterDrone.connect();

    try {
        while (true) {
            try {
                Thread.sleep(5000);
                int r = 
                    ThreadLocalRandom.current()
                        .nextInt(1, 11);
                if ((r < 5) && drone1.isConnected()) {
                    masterDrone.publishCommand(               
                        Drone.GET_ALTITUDE_COMMAND_KEY,
                        drone1.getName());
                } else  
                if (drone2.isConnected()) {
                    masterDrone.publishCommand(
                        Drone.GET_ALTITUDE_COMMAND_KEY,
                        drone2.getName());
               }
            } catch (InterruptedException e) {
                 e.printStackTrace();
         }
     } catch(Exception e) {
        e.printStackTrace();
     } finally {
        if (drone1.isConnected()) {
            try {
                drone1.client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
            ...similarly for drone2 and masterDrone… 
    }
}

Then, a forever-running loop generates a pseudorandom number every five seconds and, based on that number, it makes masterDrone publish a command to get the altitude for either drone1 or drone2.

Figure 1 shows an example of the output messages shown in the Console window of the IDE. Notice that both Drone #1 and Drone #2 receive the same messages with the GET_ALTITUDE command. However, only the drone that is the destination for the message processes the command and displays its pseudorandom altitude.

Figure 1. Console messages

Figure 1. Console messages

Conclusion

This example demonstrates how you can use the Eclipse Paho Java Client and a Mosquitto MQTT broker to subscribe to a topic and publish messages to a topic. There is also a Java client library that can run on Android, in case you need to work with MQTT in Android. Whenever you need to exchange messages with an asynchronous, nonblocking API, you can consider using MQTT and the Java client.


Gastón Hillar (@gastonhillar) has been working as a software architect with Java since its first release. He has 20 years of experience designing and developing software. He is the author of many books related to software development, hardware, electronics, and the Internet of Things, and he has been awarded the Intel Black Belt Software Developer Award eight times.

This article originally was published in Java Magazine March/April 2017.


Learn More