Your search did not match any results.
We suggest you try the following to help find what you’re looking for:
Use the principal IoT messaging protocol to asynchronously send and receive data from devices—in this case, from drones.
By Gastón Hillar
MQ Telemetry Transport (MQTT) is a lightweight publish-subscribe messaging protocol, especially suitable for small devices but also useful for any device that requires messaging over a network. In this article, I describe how to publish and receive messages with Java through the Mosquitto broker with the asynchronous API provided by the Eclipse Paho project Java client.
In this article, I develop and explain a project in which three drones use a Mosquitto broker to publish and receive text messages. Some of these messages provide only information to a channel and others specify a command and a destination by which they tell specific drones to display their altitude in feet. I use the asynchronous methods included in the Eclipse Paho Java Client to connect to the broker, publish messages, and subscribe to topics. This way, you can see how to work with MQTT in Java using a nonblocking behavior.
I will shortly explain how to include the necessary references to work with the latest version of the Eclipse Paho Java Client. However, before moving forward, it is necessary to understand the different pieces of this puzzle: the MQTT protocol, Mosquitto, the Eclipse Paho project, and Eclipse Paho Java Client.
The MQTT protocol is a machine-to-machine (M2M) connectivity protocol used extensively in the Internet of Things (IoT), and it is gaining popularity in mobile and web applications. MQTT is a protocol that works with a publish-subscribe mechanism and runs on top of the TCP/IP protocol. It is lighter than the HTTP protocol and, therefore, it is a very interesting option whenever you need to send and receive data in real time with a publish-subscribe model and you need the lowest possible footprint. However, as always happens, the reduced footprint comes at a price: MQTT does not offer great extensibility.
Mosquitto is an open source message broker that implements two versions of the MQTT protocol: 3.1 and 3.1.1. You can use Mosquitto to make any device subscribe to a specific channel, known as a topic in MQTT terminology. All subscribed devices will receive all the messages published by other devices to this topic. Mosquitto, with its publish-subscribe model, is an iot.eclipse.org project, also known as Eclipse IoT, and it is provided under the Eclipse Distribution License (EDL). It is important to know that, at the time of this writing, MQTT version 5 has reached the working draft stage.
The Eclipse Paho project offers an open source implementation of an MQTT client library that is capable of working with the same two versions of the MQTT protocol supported by Mosquitto: 3.1 and 3.1.1. The Eclipse Paho Java Client provides both a synchronous and an asynchronous API. As previously explained, I will demonstrate how to work with its asynchronous API. However, bear in mind that there is a synchronous version you can use if you don’t need the nonblocking features and you want to keep your code simpler.
It is important to understand that the MQTT connection is always between a client and the broker—in this case, between Paho Java Client and Mosquitto. You will never connect one client to another client through a direct connection. The dialogue is always between a client and the MQTT broker. In my example, each drone establishes a connection with Mosquitto, and a master drone sends messages with commands that are meant to be processed by specific drones.
It is possible for any client to subscribe to a specific topic. By doing so, it will receive all the messages published to that topic. In addition, the client can publish messages to that specific topic or to other topics. In my example, I work with just one topic, and I won’t take advantage of the wildcards that allow you to work with many topics at the same time. However, once you understand how to work with the Java client, you can use the sample code to take advantage of the additional features, based on your specific needs.
I don’t want to focus on the configuration of a Mosquitto message browser. Instead, I want to put my efforts into demonstrating how to publish and receive messages. Eclipse allows you to use a publicly accessible sandbox server for the Eclipse IoT projects at iot.eclipse.org port 1883 identified with the following URI: tcp://iot.eclipse.org:1883. I will use this sandbox server as the Mosquitto message broker, without any security. That way, you don’t have to lose time setting up a Mosquitto message broker to test the example. Of course, a real-life application would require you to set up a Mosquitto message broker. I provide many useful links to allow you to learn more about Mosquitto and MQTT at the end of this article.
You need to make sure that your software and hardware firewalls allow the application or the IDE to work with TCP in port 1883.
At the time of this writing, the latest release of Paho Java Client is 1.1.0. I will use this version to include the necessary dependencies. The easiest way to use the Java client is to start a Maven project in your favorite Java IDE and add the following lines before </project> in the pom.xml file. I’m assuming that you are working with an empty Maven project. If you have other repositories or dependencies, make sure you edit the pom.xml file to include the new entries, as shown next. You can also use the features included in your favorite IDE or its Maven plugins to add the repository and the dependency.
<repositories> <repository> <id>Eclipse Paho Repo</id> <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId> org.eclipse.paho.client.mqttv3 </artifactId> <version>1.1.0</version> </dependency> </dependencies>
I am going to create the following three classes. All these classes are included in the code bundle for this article.
MessageActionListener
implements the org.eclipse.paho.client.mqttv3.IMqttActionListener
interface. I use instances of this class to specify the callback that will run code whenever a message has been successfully published to a topic.Drone
represents a drone that has a name and can send and receive messages through the MQTT broker. This class not only encapsulates the data and logic related to drones as well as the messages and the commands included in messages, but it also implements the MqttCallback
and ImqttActionListener
interfaces defined in org.eclipse.paho.client.mqttv3
. There is no type in these interface names—the naming convention for interfaces in the Java library is a bit confusing because some interfaces start with I while others don’t. I use Drone
instances as callbacks for specific events.MqttSample01
creates three instances of the Drone
class, makes them connect to the MQTT broker, and sends messages and commands. This class declares the main
static method for the example application.The asynchronous API requires you to work with callbacks. In this example, I demonstrate many ways of working with the necessary callbacks. The following code shows the import statements and the code for the MessageActionListener
class that implements the IMqttActionListener
interface.
import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttToken; public class MessageActionListener implements IMqttActionListener { protected final String messageText; protected final String topic; protected final String userContext; public MessageActionListener( String topic, String messageText, String userContext) { this.topic = topic; this.messageText = messageText; this.userContext = userContext; } @Override public void onSuccess( IMqttToken asyncActionToken) { if ((asyncActionToken != null) && asyncActionToken.getUserContext() .equals(userContext)) { System.out.println( String.format( "Message '%s' published to topic '%s'", messageText, topic)); } } @Override public void onFailure( IMqttToken asyncActionToken, Throwable exception) { exception.printStackTrace(); } }
When I create an instance of the MessageActionListener
class, I need to specify the topic to which the message is going to be published, the message text, and a user context. The constructor saves the received values in immutable fields that have the same names as the received arguments.
The class implements the onSuccess
method required by the IMqttActionListener
interface. Whenever an instance of the MessageActionListener
class is used as a callback for an asynchronous action, the Java client invokes the onSuccess
method when the action has been completed successfully, and it passes an asynchronous action token (asyncActionToken
) of type IMqttToken
as an argument to specify the action that has been completed. The method makes sure that asyncActionToken
is not null and checks whether the value returned by asyncActionToken.getUserContext()
matches the userContext
saved by the constructor. If they match, the successful event is related to the event I wanted to monitor for its successful execution, and the code displays a message containing the message text that has been published and the name of the destination topic. I use an instance of this class as a callback for each message that is published, and thereby I can see all the successfully published messages.
The class also implements the onFailure
method, which is required by the IMqttActionListener
interface and simply calls the printStackTrace
method for the received exception.
What does it mean that a message was successfully delivered by the MQTT broker? It depends on the quality of service (QoS) that you select when you work with the MQTT protocol. The QoS level is the agreement between the publisher and the receiver of a message about the guarantees for delivering the message. Delivering a message involves publishing from the client to the broker and then from the broker to the subscribed client. MQTT supports three possible QoS values:
In this example, I will work with QoS level 2, because I don’t want the possibility of receiving a command twice. Messages are delivered even across network and client restarts. However, for that to occur, each message needs to be stored in a safe location until it has been successfully delivered. The Java client works with a pluggable persistence mechanism to store the messages. To keep things simple in this example, I will use memory-based persistence, which is not the best option with QoS level 2. However, you can easily explore and configure other pluggable persistence mechanisms to avoid losing messages in the event the application, the JVM, the computer, or the device running the application stops working or shuts down.
The following code shows the key aspects of the Drone
class that implements the MqttCallback
and IMqttActionListener
interfaces. The full class is available in the downloadable code.
When I create an instance of the Drone
class, it is necessary to specify the desired name
for the drone. The class defines many constants that I will use throughout the code and determine the string that defines a command key, the separator, the command that retrieves the altitude for the drone, the topic to which the Java client will subscribe, the desired QoS level, and the encoding that will be used for the messages, as shown below.
public class Drone implements MqttCallback, IMqttActionListener { public static final String COMMAND_KEY = "COMMAND"; public static final String COMMAND_SEPARATOR = ":"; public static final String GET_ALTITUDE_COMMAND_KEY = "GET_ALTITUDE"; // Replace with your own topic name public static final String TOPIC = "java-magazine-mqtt/drones/altitude"; public static final String ENCODING = "UTF-8"; // Quality of Service = Exactly once // I want to receive all messages exactly once public static final int QUALITY_OF_SERVICE = 2; protected String name; protected String clientId; protected MqttAsyncClient client; protected MemoryPersistence memoryPersistence; protected IMqttToken connectToken; protected IMqttToken subscribeToken; public Drone(String name) { this.name = name; } public String getName() { return name; }
It is very important that you replace the TOPIC
string with your own, unique topic name. The Mosquitto broker I am using in the example is public and, therefore, I need to use a unique topic to make sure I receive only the messages published by my code. I have specified "java-magazine-mqtt/drones/altitude"
for TOPIC
in this example. MQTT uses topic names that have a hierarchy and are separated by a slash (/). Another example of a topic name is "java-client/samples/drones/commands/altitude"
.
The connect
method has some code that is commented out just to remind you that in a production environment you shouldn’t send messages over an insecure connection and your MQTT broker should work with TLS/SSL and require the appropriate authentication. The code creates an instance of the MemoryPersistence
class to use as the previously explained pluggable persistence, generates a unique client ID, and creates an instance of MqttAsyncClient
named client
. This way, I create the entry point for the Java client with the asynchronous API.
public void connect() { try { MqttConnectOptions options = new MqttConnectOptions(); // options.setUserName( // "replace with your username"); // options.setPassword( // "replace with your password" // .toCharArray()); // Replace with ssl:// and work with TLS/SSL // best practices in a // production environment memoryPersistence = new MemoryPersistence(); String serverURI = "tcp://iot.eclipse.org:1883"; clientId = MqttAsyncClient.generateClientId(); client = new MqttAsyncClient( serverURI, clientId, memoryPersistence); // I want to use this instance as the callback client.setCallback(this); connectToken = client.connect( options, null, this); } catch (MqttException e) { e.printStackTrace(); } } public boolean isConnected() { return (client != null) && (client.isConnected()); } @Override public void connectionLost(Throwable cause) { // The MQTT client lost the connection cause.printStackTrace(); }
The line that calls the setCallback
method uses this
as an argument because I use the actual instance as the callback that will execute specific methods when some asynchronous events occur. The setCallback
method requires an argument of the MqttCallback
type. The Drone
class implements the MqttCallback
interface that requires the following three methods: connectionLost
, messageArrived
, and deliveryComplete
. I’ll get back to these methods later. It is very important to call the setCallback
method before establishing the connection with the MQTT broker.
The line that calls the client.connect
method specifies this
as the last argument because I will also use the actual instance as the callback that will execute specific methods when some asynchronous events related to the connection occur. The fourth argument for the connect
method requires an argument of the IMqttActionListener
type.
The Drone
class implements the IMqttActionListener
interface that requires these two methods: onSuccess
and onFailure
.
@Override public void onSuccess( IMqttToken asyncActionToken) { if (asyncActionToken.equals(connectToken)) { System.out.println( String.format( "%s successfully connected",name)); try { subscribeToken = client.subscribe( TOPIC, QUALITY_OF_SERVICE, null, this); } catch (MqttException e) { e.printStackTrace(); } } else if (asyncActionToken.equals( subscribeToken)) { System.out.println( String.format( "%s subscribed to the %s topic", name, TOPIC)); publishTextMessage( String.format( "%s is listening.", name)); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { // The method will run if an operation failed exception.printStackTrace(); }
I implemented the same interface in the MessageActionListener
class. However, in this case, I will implement the interface to run code when the success or failure is related to the connection, not with messages as happened in the MessageActionListener
class.
The code saves the IMqttToken
returned by the client.connect
method to the connectToken
protected field. This way, I am able to check whether the onSuccess
method’s execution is related to this token or not. The connection uses asynchronous execution, and the onSuccess
method for the Drone
class will be executed after the connection with the MQTT broker has been successfully established.
The onSuccess
method displays a message indicating that the specific drone has been successfully connected. Then, the code calls the client.subscribe
method with the topic to which I want to subscribe and the desired QoS level. The call to this method specifies this
as the last argument because I will also use the actual instance as the callback that will execute specific methods when some asynchronous events related to the subscription occur. The fourth argument for the subscribe
method requires an argument of the IMqttActionListener
type. So, after a successful subscription, the Java client will run the same onSuccess
method, but the code will recognize that the event is related to the subscription because the received token won’t match the connection token and instead will match the subscription token. It is not necessary to make it this way. It is possible to create an anonymous type to declare the methods that are necessary for the subscription callback for the asynchronous subscription. However, I wanted to demonstrate the usage of the tokens.
The onSuccess
method displays a message indicating that the specific drone has been successfully subscribed to the specific topic. Then, the code calls the publishTextMessage
method to publish a message to the topic indicating that the drone is listening.
public MessageActionListener publishTextMessage( String messageText) { byte[] bytesMessage; try { bytesMessage = messageText.getBytes(ENCODING); MqttMessage message; message = new MqttMessage(bytesMessage); String userContext = "ListeningMessage"; MessageActionListener actionListener = new MessageActionListener( TOPIC, messageText, userContext); client.publish(TOPIC, message, userContext,actionListener); return actionListener; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } catch (MqttException e) { e.printStackTrace(); return null; } }
The publishTextMessage
method receives a String
in the messageText
argument with the text message that the drone has to publish to the topic and returns an instance of the MessageActionListener
class. The method calls the messageTextgetBytes
method to generate a byte array that takes into account the UTF-8 encoding. The MqttMessage
class requires a byte array with the message to be sent as an argument to create an instance. Then, the code creates an instance of the MessageActionListener
class named actionListener
and calls the client.publish
method to publish the message on the topic with an actionListener
instance as the last argument that specifies the desired callback. This way, the code declared in the onSuccess
method for this class will run after the message has been successfully published to the specified topic.
The publishCommand
method takes two String
arguments: the command name and the destination name. The method uses the received values to build a command and then calls the previously explained publishTextMessage
method with this command as an argument. For example, the following command requests a drone whose name is Drone #1 to print its altitude in feet: COMMAND:GET_ALTITUDE:DRONE #1.
public MessageActionListener publishCommand( String commandName, String destinationName) { String command = String.format("%s%s%s%s%s", COMMAND_KEY, COMMAND_SEPARATOR, commandName, COMMAND_SEPARATOR, destinationName); return publishTextMessage(command); }
Whenever a message arrives, the messageArrived
method will be executed. The code in this method receives a String
with the topic and an instance of MqttMessage
.
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { // A message has arrived from the MQTT broker // The MQTT broker doesn't send back // an acknowledgment to the server until // this method returns cleanly if (!topic.equals(TOPIC)) { return; } String messageText = new String(message.getPayload(), ENCODING); System.out.println( String.format( "%s received %s: %s", name, topic, messageText)); String[] keyValue = messageText.split(COMMAND_SEPARATOR); if (keyValue.length != 3) { return; } if (keyValue[0].equals(COMMAND_KEY) && keyValue[1].equals( GET_ALTITUDE_COMMAND_KEY) && keyValue[2].equals(name)) { // Process the "get altitude" command int altitudeInFeet = ThreadLocalRandom .current().nextInt(1, 6001); System.out.println( String.format( "%s altitude: %d feet", name, altitudeInFeet)); } }
First, the code makes sure that the topic is the one that the drone is interested in. Then, the code calls the message.getPayload()
method to retrieve the byte array with the message. The code creates a String
instance with the byte array and the UTF-8 encoding as arguments to generate the appropriate String
. The code displays a message indicating that the specific drone has received a message in the topic. Finally, the code processes the String
with the received message to determine whether the message is a command. If it has the command-key prefix, the code decodes the command and if it’s the “get altitude” command, the code displays a message with a pseudorandom altitude value for the drone expressed in feet.
Whenever a message has been successfully delivered, the deliveryComplete
method is executed with a token (token
) of type IMqttDeliveryToken
as an argument to allow you to identify which message has been delivered. As previously explained, the meaning of a successfully delivered message will depend on the QoS level. In this case, I’m working with QoS level 2 and this method will be executed after the acknowledgment from the receiver arrives. Here, I didn’t add code to this method and I just declared it to implement all the methods required by the interface.
@Override public void deliveryComplete( IMqttDeliveryToken token) { // Delivery for a message has been completed // and all acknowledgments have been received }
The following code lines show the code for the MqttSample01
class that declares the main
method. The main
method creates three instances of the Drone class—drone1
, drone2
, and masterDrone
—and calls the connect
method for each of these instances. I didn’t use any kind of list to work with the drones because I wanted the code to be easier to read. Forgive me for repeating some code in this main
method.
public class MqttSample01 { public static void main(String[] args) { Drone drone1 = new Drone("[Drone #1]"); drone1.connect(); Drone drone2 = new Drone("[Drone #2]"); drone2.connect(); Drone masterDrone = new Drone("*Master Drone*"); masterDrone.connect(); try { while (true) { try { Thread.sleep(5000); int r = ThreadLocalRandom.current() .nextInt(1, 11); if ((r < 5) && drone1.isConnected()) { masterDrone.publishCommand( Drone.GET_ALTITUDE_COMMAND_KEY, drone1.getName()); } else if (drone2.isConnected()) { masterDrone.publishCommand( Drone.GET_ALTITUDE_COMMAND_KEY, drone2.getName()); } } catch (InterruptedException e) { e.printStackTrace(); } } catch(Exception e) { e.printStackTrace(); } finally { if (drone1.isConnected()) { try { drone1.client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } ...similarly for drone2 and masterDrone… } }
Then, a forever-running loop generates a pseudorandom number every five seconds and, based on that number, it makes masterDrone
publish a command to get the altitude for either drone1
or drone2
.
Figure 1 shows an example of the output messages shown in the Console window of the IDE. Notice that both Drone #1 and Drone #2 receive the same messages with the GET_ALTITUDE
command. However, only the drone that is the destination for the message processes the command and displays its pseudorandom altitude.
This example demonstrates how you can use the Eclipse Paho Java Client and a Mosquitto MQTT broker to subscribe to a topic and publish messages to a topic. There is also a Java client library that can run on Android, in case you need to work with MQTT in Android. Whenever you need to exchange messages with an asynchronous, nonblocking API, you can consider using MQTT and the Java client.
Gastón Hillar (@gastonhillar) has been working as a software architect with Java since its first release. He has 20 years of experience designing and developing software. He is the author of many books related to software development, hardware, electronics, and the Internet of Things, and he has been awarded the Intel Black Belt Software Developer Award eight times.
This article originally was published in Java Magazine March/April 2017.