Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Welcome to the VerneMQ documentation! This is a reference guide for most of the available features and options of VerneMQ. The Getting Started guide might be a good entry point.
For a more general overview on VerneMQ and MQTT, you might want to start with the introduction.
For downloading VerneMQ see Downloads.
As well as being available as packages that can be installed directly into the operating systems, VerneMQ is also available as a Docker image. Below is an example of how to set up a couple of VerneMQ
Somtimes you need to configure a forwarding for ports (on a Mac for example):
This starts a new node that listens on 1883 for MQTT connections and on 8080 for MQTT over websocket connections. However, at this moment the broker won't be able to authenticate the connecting clients. To allow anonymous clients use the DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
environment variable.
Warning: Setting allow_anonymous=on
completely disables authentication in the broker and plugin authentication hooks are never called! See more information about the authentication hooks here.
This allows a newly started container to automatically join a VerneMQ cluster. Assuming you started your first node like the example above you could autojoin the cluster (which currently consists of a single container 'vernemq1') like the following:
(Note, you can find the IP of a docker container using docker inspect <CONTAINER_NAME> | grep \"IPAddress\"
).
To check if the above containers have successfully clustered you can issue the vmq-admin
command:
A quick and simple guide to get started with VerneMQ
VerneMQ is a high-performance, distributed MQTT message broker. It scales horizontally and vertically on commodity hardware to support a high number of concurrent publishers and consumers while maintaining low latency and fault tolerance. To use it, all you need to do is install the VerneMQ package.
Choose your OS and follow the instructions:
It is also possible to run VerneMQ using our Docker image:
If you built VerneMQ from sources, you can add the /bin
directory of your VerneMQ release to PATH
. For example, if you compiled VerneMQ in the /home/vernemq
directory, then add the binary directory (/home/vernemq/_build/default/rel/vernemq/bin
) to your PATH, so that VerneMQ commands can be used in the same manner as with a packaged installation.
To start a VerneMQ broker, use the vernemq start command in your Shell:
A successful start will return no output. If there is a problem starting the broker, an error message is printed to STDERR
.
To run VerneMQ with an attached interactive Erlang console:
A VerneMQ broker is typically started in console mode for debugging or troubleshooting purposes. Note that if you start VerneMQ in this manner, it is running as a foreground process that will exit when the console is closed.
You can close the console by issuing this command at the Erlang prompt:
Once your broker has started, you can initially check that it is running with the vernemq ping command:
The command will respond with pong
if the broker is running or Node <NodeName> not responding to pings
in case it’s not.
As you may have noticed, VerneMQ will warn you at startup when your system’s open files limit (ulimit -n
) is too low. You’re advised to increase the OS default open files limit when running VerneMQ. Read more about why and how in the Open Files Limit documentation.
VerneMQ can be installed on CentOS-based systems using the binary package we provide.
Once you have downloaded the binary package, execute the following command to install VerneMQ:
or:
Once you've installed VerneMQ, start it on your node:
You can verify that VerneMQ is successfully installed by running:
If VerneMQ has been installed successfully vernemq
is returned.
Now that you've installed VerneMQ, check out How to configure VerneMQ.
VerneMQ comes with a simple file-based password authentication mechanism which is enabled by default. If you don't need this it can be disabled by setting:
Per default VerneMQ doesn't accept any client that hasn't been configured using vmq-passwd
. If you want to change this and accept any client connection you can set:
Warning: Setting allow_anonymous=on
completely disables authentication in the broker and plugin authentication hooks are never called! See more information about the authentication hooks here.
In a production setup we recommend to use the provided password based authentication mechanism or implement your own authentication plugins.
VerneMQ periodically checks the specified password file.
The check interval defaults to 10 seconds and can also be defined in the vernemq.conf
.
Setting the password_reload_interval = 0
disables automatic reloading.
Both configuration parameters can also be changed at runtime using the vmq-admin
script.
vmq-passwd
is a tool for managing password files for the VerneMQ broker. Usernames must not contain ":"
, passwords are stored in similar format to crypt(3).
How to use vmq-passwd
Options
-c
Creates a new password file. If the file already exists, it will be overwritten.
-D
Deletes the specified user from the password file.
-U
This option can be used to upgrade/convert a password file with plain text passwords into one using hashed passwords. It will modify the specified file. It does not detect whether passwords are already hashed, so using it on a password file that already contains hashed passwords will generate new hashes based on the old hashes and render the password file unusable. Note, with this option neither usernames or passwords may contain
":"
.
passwordfile
The password file to modify.
username
The username to add/update/delete.
Examples
Add a user to a new password file: (you can choose an arbitrary name for the password file, it only has to match the configuration in the VerneMQ configuration file).
Delete a user from a password file
Acknowledgements
The original version of vmq-passwd
was developed by Roger Light (roger@atchoo.org).
vmq-passwd
includes :
software developed by the [OpenSSL
Project](http://www.openssl.org/) for use in the OpenSSL Toolkit.
cryptographic software written by Eric Young
(eay@cryptsoft.com)
software written by Tim Hudson (tjh@cryptsoft.com)
VerneMQ comes with a simple ACL based authorization mechanism which is enabled by default. If you don't need this it can be disabled by setting:
VerneMQ periodically checks the specified ACL file.
The check interval defaults to 10 seconds and can also be defined in the vernemq.conf
.
Setting the acl_reload_interval = 0
disables automatic reloading.
Both configuration parameters can also be changed at runtime using the vmq-admin
script.
Topic access is added with lines of the format:
Only one space should be put between the topic and the preceeding keyword. Extra spaces will be interpreted as part of the topic! Also note that the ACL parser doesn't accept empty lines between entries.
The access type is controlled using read
or write
. If not provided then read an write access is granted for the topic
. The topic
can use the MQTT subscription wildcards +
or #
.
The first set of topics are applied to all anonymous clients (assuming allow_anonymous = on
). User specific ACLs are added after a user line as follows (this is the username not the client id):
It is also possible to define ACLs based on pattern substitution within the the topic. The form is the same as for the topic keyword, but using pattern as the keyword.
The patterns available for substitution are:
%c
to match the client id of the client
%u
to match the username of the client
The substitution pattern must be the only text for that level of hierarchy. Pattern ACLs apply to all users even if the user keyword has previously been given.
Example:
VerneMQ currently doesn't cancel active subscriptions in case the ACL file revokes access for a topic.
Anonymous users are allowed to
publish & subscribe to topic bar.
publish to topic foo.
subscribe to topic all.
User john is allowed to
publish & subscribe to topic foo.
subscribe to topic baz.
publish to topic all.
Set the maximum size for client ids, MQTT v3.1 specifies a limit of 23 characters.
This option default to 23
.
This option allows persistent clients (those with clean_session
set to false
) to be removed if they do not reconnect within a certain time frame.
This is a non-standard option. As far as the MQTT specification is concerned, persistent clients are persisted forever.
The expiration period should be an integer followed by one of h
, d
, w
, m
, y
for hour, day, week, month, and year; or never
:
This option defaults to never
.
Limit the maximum publish payload size in bytes that VerneMQ allows. Messages that exceed this size won't be accepted.
Defaults to 0
, which means that all valid messages are accepted. MQTT specification imposes a maximum payload size of 268435455 bytes.
VerneMQ supports the WebSocket protocol out of the box. To be able to open a WebSocket connection to VerneMQ, you have to configure a WebSocket listener or Secure WebSocket listener in the vernemq.conf
file first:
Keep in mind that you'll use MQTT-over-WebSocket, so you will need a Javascript library that implements the MQTT client behaviour. We have used the Eclipse Paho client as well as MQTT.js
You won't be able to open WebSocket connections on a base URL, always add the /mqtt
path.
VerneMQ can be easily clustered. Clients can then connect to any cluster node and receive messages from any other cluster nodes. However, the MQTT specification gives certain guarantees that are hard to fulfill in a distributed environment, especially when network partitions occur. We'll discuss the way VerneMQ deals with network partitions in its own subsection
Set the Cookie! All cluster nodes need to be configured to use the same Cookie value. It can be set in the vernemq.conf
with the distributed_cookie
setting. Set the Cookie to a private value for security reasons!
For a successful VerneMQ cluster setup, it is important to choose proper VerneMQ node names. In vernemq.conf
change the nodename = VerneMQ@127.0.0.1
to something appropriate. Make sure that the node names are unique within the cluster. Read the section on VerneMQ Inter-node Communication if firewalls are involved.
Before you go ahead and experience the full power of clustering VerneMQ, be aware of its stateful character. An MQTT broker is a stateful application and a VerneMQ cluster is a stateful cluster.
What does this mean in detail? It means that clustered VerneMQ nodes will share information about connected clients and sessions but also meta-information about the cluster itself.
For instance, if you stop a cluster node, the VerneMQ cluster will not just forget about it. It will know that there's a node missing and it will keep looking for it. It will know there's a netsplit situation and it will heal the partition when the node comes back up. But if the missing nodes never comes back there's an eternal netsplit. (still resolvable by making the missing nodes explicitly leave).
This doesn't mean that a VerneMQ cluster cannot dynamically grow and shrink. But it means you have to tell the cluster what you intend to do, by using join and leave commands.
If you want a cluster node to leave the cluster, well... use the vmq-admin cluster leave
command. If you want a node to join a cluster, well... use the vmq-admin cluster join
command.
Makes sense? Go ahead and create your first VerneMQ cluster!
A cluster leave will actually do a lot more work, and gives you some options to choose. The node leaving the cluster will go to great length trying to migrate its existing queues to other nodes. As queues (online or offline) are live processes in a VerneMQ node, it will only exit after it has migrated them.
Let's look at the steps in detail:
vmq-admin cluster leave node=<NodeThatShouldGo>
This first step will only stop the MQTT Listeners of the node to ensure that no new connections are accepted. It will not interrupt the existing connections, and behind the scenes the node will not leave the cluster yet. Existing clients are still able to publish and receive messages at this point.
The idea is to give a grace period with the hope that existing clients might re-connect (to another node). If you have decided that this period is over (after 5 minutes or 1 day is up to you), you proceed with step 2: disconnecting the rest of the clients.
vmq-admin cluster leave node=<NodeThatShouldGo> -k
The -k
flag will delete the MQTT Listeners of the leaving node, taking down all live connections. If this is what you want from the beginning, you can do this right away as a first step.
Now, queue migration is triggered by clients re-connecting to other nodes. They will claim their queue and it will get migrated. Still, there might be some offline queues remaining on the leaving node, because they were pre-existing or because some clients do not re-connect and do not reclaim their queues.
VerneMQ will throw an exception if there are remaining offline queues after a configurable timeout. The default is 60 seconds, but you can set it as an option to the cluster leave command. As soon as the exception shows in console or console.log, you can actually retry the cluster leave command (including setting a migration timeout (-t
), and an interval in seconds (-i
) indicating how often information on the migration progress should be printed to the console.log):
vmq-admin cluster leave node=<NodeThatShouldGo> -k -i 5 -t 120
After this timeout VerneMQ will forcefully migrate the remaining offline queues to other cluster nodes in a round robin manner. After doing that, it will stop the leaving VerneMQ node.
Note 1: While doing a cluster leave, it's a good idea to tail -f the VerneMQ console.log to see queue migration progress.
Note 2: A node that has left the cluster is considered dead. If you want to reuse that node as a single node broker, you have to (backup & rename &) delete the whole VerneMQdata
directory and start with a new directory. (It will be created automatically by VerneMQ at boot).
Otherwise that node will start looking for its old cluster peers when you restart it.
So, case A was the happy case. You left the cluster with your node in a controlled manner, and everything worked, including a complete queue (and message) transfer to other nodes.
Let's look at the second possibility where the node is already down. Your cluster is still counting on it though and possibly blocking new subscription for that reason, so you want to make the node leave.
To do this, use the same command(s) as in the first case. There is one important consequence to note: by making a stopped node leave, you basically throw away persistant queue content, as VerneMQ won't be able to migrate or deliver it.
Let's repeat that to make sure:
Case B: Currently the persisted QoS 1 & QoS 2 messages aren't replicated to the other nodes by the default message store backend. Currently you will lose the offline messages stored on the leaving node.
This section elaborates how a VerneMQ cluster deals with network partitions (aka. netsplit or split brain situation). A netsplit is mostly the result of a failure of one or more network devices resulting in a cluster where nodes can no longer reach each other.
VerneMQ is able to detect a network partition, and by default it will stop serving CONNECT
, PUBLISH
, SUBSCRIBE
, and UNSUBSCRIBE
requests. A properly implemented client will always resend unacked commands and messages are therefore not lost (QoS 0 publishes will be lost). However, the time window between the network partition and the time VerneMQ detects the partition much can happen. Moreover, this time frame will be different on every participating cluster node. In this guide we're referring to this time frame as the Window of Uncertainty.
The behaviour during a netsplit is completely configurable via allow_register_during_netsplit
, allow_publish_during_netsplit
, allow_subscribe_during_netsplit
, and allow_unsubscribe_during_netsplit
. These options supersede the trade_consistency
option. In order to reach the same behaviour as trade_consistency = on
all the mentioned netsplit options have to set to on
.
VerneMQ follows an eventually consistent model for storing and replicating the subscription data. This also includes retained messages.
Due to the eventually consistent data model it is possible that during the Window of Uncertainty a publish won't take into account a subscription made on a remote node (in another partition). Obviously, VerneMQ can't deliver the message in this case. The same holds for delivering retained messages to remote subscribers.
last will
messages that are triggered during the Window of Uncertainty will be delivered to the reachable subscribers. Currently during a netsplit, but after the Window of Uncertainty last will messages will be lost.
Normally, client registration is synchronized using an elected leader node for the given client id. Such a synchronization removes the race condition between multiple clients trying to connect with the same client id on different nodes. However, during the Window of Uncertainty it is currently possible that VerneMQ fails to disconnect a client connected to a different node. Although this scenario sounds like artificially crafted it is possible to end up with duplicate clients connected to the cluster.
Bridges are a non-standard way, although kind of a de-facto standard among MQTT broker implementations, to connect two different MQTT brokers to eachother. This allows for example that a topic tree of a remote broker becomes part of the topic tree on the local broker. VerneMQ supports plain TCP connections as well as SSL connections.
in VerneMQ the bridge is distributed with VerneMQ as a plugin and is not enabled by default. After configuring the bridge as described below, make sure to enable the plugin by setting:
When the plugin is enabled a simple status interface is available:
Setup a bridge to a remote broker:
Different connection parameters can be set:
Define the topics the bridge should incorporate in its local topic tree (by subscribing to the remote), or the topics it should export to the remote broker (by publishing to the remote). We share a similar configuration syntax to that used by the Mosquitto broker:
topic
defines a topic pattern that is shared between the two brokers. Any topics matching the pattern (which may include wildcards) are shared. The second parameter defines the direction that the messages will be shared in, so it is possible to import messages from a remote broker usingin
, export messages to a remote broker usingout
or share messages inboth
directions. If this parameter is not defined, VerneMQ defaults toout
. The QoS level defines the publish/subscribe QoS level used for this topic and defaults to0
. (Source: mosquitto.conf)
The local-prefix
and remote-prefix
can be used to prefix incoming or outgoing publish messages.
Currently the #
wildcard is treated as a comment from the configuration parser, please use *
instead.
A simple example:
SSL bridges support the same configuration parameters as TCP bridges, but need further instructions for handling the SSL specifics:
As soon as the partition is healed, and connectivity reestablished, the VerneMQ nodes replicate the latest changes made to the subscription data. This includes all the changes 'accidentally' made during the Window of Uncertainty. Using VerneMQ ensures that convergence regarding subscription data and retained messages is eventually reached.
See for more information on working with plugins.
VerneMQ uses the Erlang distribution mechanism for most inter-node communication. VerneMQ identifies other machines in the cluster using Erlang identifiers (e.g. VerneMQ@10.9.8.7
). Erlang resolves these node identifiers to a TCP port on a given machine via the Erlang Port Mapper daemon (epmd) running on each cluster node.
By default, epmd binds to TCP port 4369 and listens on the wildcard interface. For inter-node communication, Erlang uses an unpredictable port by default; it binds to port 0, which means the first available port.
For ease of firewall configuration, VerneMQ can be configured to instruct the Erlang interpreter to use a limited range of ports. For example, to restrict the range of ports that Erlang will use for inter-Erlang node communication to 6000-7999, add the following lines to vernemq.conf on each VerneMQ node:
The settings above are only used for distributing subscription updates and maintenance messages. For distributing the 'real' MQTT messages the proper vmq
listener must be configured in the vernemq.conf.
It isn't necessary to configure the same port on every machine, as the nodes will probe each other for this information.
Attributions:
This section, "VerneMQ Inter-node Communication", is a derivative of Security and Firewalls by Riak, used under Creative Commons Attribution 3.0 Unported License.
On every VerneMQ node you'll find the vmq-admin
command line tool in the release's bin directory. It has different sub-commands that let you check for status, start and stop listeners, re-configure values and a couple of other administrative tasks.
vmq-admin
works by RPC'ing into the local VerneMQ node by default. For most commands you can add a --node
option and set values on other cluster nodes, even if the local VerneMQ node is down.
To check for the global cluster state in case the local VerneMQ node is down, you'll have to go to another node though.
vmq-admin
is a live re-configuration utility. Please note that all dynamically configured values will be reset by vernemq.conf upon broker restart.
Don't use this to wildly re-configure a production system without keeping track what you are doing. vmq-admin
gives you the flexibility to test stuff and react live, but please persistent any static configuration you need in the vernemq.conf file.
You can configure as many listeners as you wish in the vernemq.conf file. In addition to this, the vmq-admin listener
command let's you configure, start, stop and delete listeners on the fly. Those can be MQTT, WebSocket or Cluster listeners, in the command line output they will be tagged mqtt, ws or vmq accordingly.
To get info on a listener sub-command, invoke it with the --help option. Example: vmq-admin listener start --help
Listeners configured with the vmq-admin listener
command will not survive a broker restart. Live changes to listeners configured in vernemq.conf are possible, but the vernemq.conf listeners will just be restarted with a broker restart.
This will start an MQTT listener on port 1884
and IP address 192.168.1.50
. If you want to start a WebSocket listener, just tell VerneMQ by adding the --websocket
flag. There are more options, mainly for configuring SSL (use vmq-admin listener start --help
).
You can isolate client connections accepted by a certain listener from other clients by setting a mountpoint.
To start an MQTT listener using defaults, just set the port and IP address as a minimum.
You can add the -k
or --kill_sessions
switch to that command. This will disconnect all client connections setup by that listener. In combination with a mountpoint, this can be useful for terminating clients for a specific application, or to force re-connects to another cluster node (to prepare for a cluster leave for your node).
Everything you need to know to work with the VerneMQ HTTP administration interface
The VerneMQ HTTP API is enabled by default and installs an HTTP handler on http://localhost:8888/api/v1
. To read more about configuring the HTTP listener, see HTTP Listener Configuration. You can configure a HTTP listener, or a HTTPS listener to serve the HTTP API v1.
The VerneMQ HTTP API uses basic authentication where an API key is passed as the username and the password is left empty. So the first step to us the HTTP API is to create an API key:
The key is persisted and available on all cluster nodes.
To list existing keys do:
To add an API key of your own choosing, do:
To delete an API key do:
The VerneMQ HTTP API is a wrapper over the vmq-admin
CLI tool, and anything that can be done using vmq-admin
can be done using the HTTP API. Note that the HTTP API is therefore subject to any changes made to the vmq-admin
tools and their flags & options structure. All requests are performed doing a HTTP GET and if no errors occurred an HTTP 200 OK code is returned with a possible non-empty JSON payload.
The API is using basic auth where the API key is passed as the username. An example using curl
would look like this:
The mapping between vmq-admin
and the HTTP API is straightforward, and if one is already familiar with how the vmq-admin
tool works, working with the API should be easy. The mapping works such that the command part of a vmq-admin
invocation is turned into a path, and the options and flags are turned into the query string.
A mandatory parameter like the client-id
in the vmq-admin session disconnect client-id=myclient
command should be translated as: ?client-id=myclient
.
An optional flag like --cleanup
in the vmq-admin session disconnect client-id=myclient --cleanup
command should be translated as: &--cleanup
Let's look at the cluster join command as an example, which looks like this:
This turns into a GET request:
To test, run it with curl
:
And the returned response would look like:
Below are some other examples.
Request:
Curl:
Response:
Request:
Curl:
Response:
Request:
Curl:
Response:
Request:
Curl:
Response:
Request:
Curl:
Response:
Request:
Curl:
Response:
The VerneMQ status page
VerneMQ comes with a built-in status page which by default is enabled and is available on http://localhost:8888/status
, see HTTP listeners.
The status page is a simple overview of the cluster and the individual nodes in the cluster as seen below:
In this section the subscription flow is described. VerneMQ provides several hooks to intercept the subscription flow. The most important ones are the auth_on_subscribe
and auth_on_subscribe_m5
hooks which act as an application level firewall granting or rejecting subscribe requests.
The auth_on_subscribe
and auth_on_subscribe_m5
hooks allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos. The auth_on_subscribe
hook is specified in the Erlang behaviour auth_on_subscribe_hook and the auth_on_subscribe
hook in the auth_on_subscribe_m5_hook behaviour available in the vernemq_dev repo.
The on_subscribe
and on_subscribe_m5
hooks allow your plugin to get informed about an authorized subscribe request. The on_subscribe
hook is specified in the Erlang behaviour on_subscribe_hook and the on_subscribe_m5
hook in the on_subscribe_m5_hook behaviour available in the vernemq_dev repo.
The on_unsubscribe
and on_unsubscribe_m5
hooks allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required. The on_subscribe
hook is specified in the Erlang behaviour on_unsubscribe_hook and the on_unsubscribe_m5
hook in the on_unsubscribe_m5_hook behaviour available in the vernemq_dev repo.
VerneMQ is implemented in Erlang OTP and therefore runs on top of the Erlang VM. For this reason plugins have to be developed in a programming language that runs on the Erlang VM. The most popular choice is obviously the Erlang programming language itself, but Elixir or Lisp flavoured Erlang LFE could be used too.
Be aware that in VerneMQ a plugin does NOT run in a sandboxed environment and misbehaviour could seriously harm the system (e.g. performance degradation, reduced availability as well as consistency, and message loss). Get in touch with us in case you require a review of your plugin.
This guide explains the different flows that expose different hooks to be used for custom plugins. It also describes the code structure a plugin must comply to in order to be successfully loaded and started by the VerneMQ plugin mechanism.
All the hooks that are currently exposed fall into one of three categories.
Hooks that allow you to change the protocol flow. An example could be to authenticate a client using the auth_on_register
hook.
Hooks that inform you about a certain action, that could be used for example to implement a custom logging or audit plugin.
Hooks that are called given a certain condition
Notice that some hooks come in two variants, for example the auth_on_register
and then auth_on_register_m5
hooks. The _m5
postfix refers to the fact that this hook is only invoked in an MQTT 5.0 session context whereas the other is invoked in a MQTT 3.1/3.1.1 session context.
Before going into the details, let's give a quick intro to the VerneMQ plugin system.
The VerneMQ plugin system allows you to load, unload, start and stop plugins during runtime, and you can even upgrade a plugin during runtime. To make this work it is required that the plugin is an OTP application and strictly follows the rules of implementing the Erlang OTP application behaviour. It is recommended to use the rebar3
toolchain to compile the plugin. VerneMQ comes with built-in support for the directory structure used by rebar3
.
Every plugin has to describe the hooks it is implementing as part of its application environment file. The vmq_acl
plugin for instance comes with the application environment file below:
Lines 6 to 10 instruct the plugin system to ensure that those dependent applications are loaded and started. If you're using third party dependencies make sure that they are available in compiled form and part of the plugin load path. Lines 16 to 20 allow the plugin system to compile the plugin rules. Yes, you've heard correctly. The rules are compiled into Erlang VM code to make sure the lookup and execution of plugin code is as fast as possible. Some hooks exist which are used internally such as the change_config/1
, we'll describe those at some other point.
The environment value for vmq_plugin_hooks
is a list of hooks. A hook is specified by {Module, Function, Arity, Options}
.
To streamline the plugin development we provide a different Erlang behaviour for every hook a plugin implements. Those behaviours are part of the vernemq_dev
library application, which you should add as a dependency to your plugin. vernemq_dev
also comes with a header file that contains all the type definitions used by the hooks.
It is possible to have multiple plugins serving the same hook. Depending on the hook the plugin chain is used differently. The most elaborate chains can be found for the hooks that deal with authentication and authorization flows. We also call them conditional chains as a plugin can give control away to the next plugin in the chain. The image show a sample plugin chain for the auth_on_register
hook.
Most hooks don't require conditions and are mainly used as event handlers. In this case all plugins in a chain are called. An example for such a hook would be the on_register
hook.
A rather specific case is the need to call only one plugin instead of iterating through the whole chain. VerneMQ uses such hooks for it's pluggable message storage system.
Unless you're implementing your custom message storage backend, you probably won't need this style of hook.
The position in the plugin call chain is currently implicitly given by the order the plugins have been started.
The plugin mechanism uses the application environment file to infer the applications that it has to load and start prior to starting the plugin itself. It internally uses the application:ensure_all_started/1
function call to start the plugin. If your setup is more complex you could override this behaviour by implementing a custom start/0
function inside a module that's named after your plugin.
The plugin mechanism uses application:stop/1
to stop and unload the plugin. This won't stop the dependent application started at startup. If you rely on third party applications that aren't started as part of the VerneMQ release, e.g. a database driver, you can implement a custom stop/0
function inside a module that's named after your plugin and properly stop the driver there.
The vmq_types.hrl
exposes all the type specs used by the hooks. The following types are used by the plugin system:
Developing VerneMQ plugins in Erlang is the most powerful way to extend the functionality of a VerneMQ broker but is a barrier developers not familiar with Erlang. For this reason we've implemented a VerneMQ extension that allows you to develop plugins using the Lua scripting language. This extension is called vmq_diversity and is shipped as part of VerneMQ.
vmq_diversity uses the Luerl Project, which is an implementation of Lua 5.2 in pure Erlang instead of the official Lua interpreter.
Moreover vmq_diversity provides simple Lua libraries to communicate with MySQL, PostgreSQL, MongoDB, and Redis within your Lua VerneMQ plugins. An additional Json encoding/decoding library as well as a generic HTTP client library provide your Lua scripts a great way to talk to external services.
To enable vmq_diversity
make sure to set:
To specify a script to load when VerneMQ starts can be done like this:
It is also possible to dynamically load a Lua script using vmq-admin
:
To reload a script after a change:
If the vmq_diversity
plugin is enabled the folder ./share/lua
folder is scanned for Lua scripts to automatically load during startup. The automatic load folder can be configured in the vernemq.conf
file by changing the vmq_diversity.script
setting.
A VerneMQ plugin typically consists of one or more implemented VerneMQ hooks. We tried to keep the differences between the traditional Erlang based and Lua based plugins as small as possible. Please check out the Plugin Development Guide for more information about the different flows and a description of the different hooks.
Let's start with a first very basic example that implements a basic authentication and authorization scheme.
This subsection describes the data providers currently available to a Lua script. Every data provider is backed by a connection pool that has to be configured by your script.
ensure_pool
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
user
: MySQL account name for login
password
: MySQL account password for login (in clear text).
host
: Host name for the MySQL server (default is localhost)
port
: Port that the MySQL server is listening on (default is 3306).
database
: MySQL database name.
encoding
: Encoding (default is latin1)
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
execute
Executes the provided SQL statement using a connection from the connection pool.
pool_id
: Name of the connection pool to use for this statement.
stmt
: A valid MySQL statement.
args...
: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true
or false
or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.
ensure_pool
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
user
: Postgres account name for login
password
: Postgres account password for login (in clear text).
host
: Host name for the Postgres server (default is localhost)
port
: Port that the Postgres server is listening on (default is 5432).
database
: Postgres database name.
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
execute
Executes the provided SQL statement using a connection from the connection pool.
pool_id
: Name of the connection pool to use for this statement.
stmt
: A valid MySQL statement.
args...
: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true
or false
or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.
ensure_pool
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
login
: MongoDB login name
password
: MongoDB password for login.
host
: Host name for the MongoDB server (default is localhost)
port
: Port that the MongoDB server is listening on (default is 27017).
database
: MongoDB database name.
w_mode
: Set mode for writes either to "unsafe" or "safe".
r_mode
: Set mode for reads either to "master" or "slave_ok".
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
insert
Insert the provided document (or list of documents) into the collection.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
doc_or_docs
: A single Lua table or a Lua array containing multiple Lua tables.
The provided document can set the document id using the _id
key. If the id isn't provided one gets autogenerated. The call returns the inserted document(s) or throws a badarg error if it cannot insert the document(s).
update
Updates all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
doc
: A single Lua table containing the update properties.
The call returns true
or throws a badarg error if it cannot update the document(s).
delete
Deletes all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
The call returns true
or throws a badarg error if it cannot delete the document(s).
find
Finds all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
args
: A Lua table that currently supports an optional projector=LuaTable
element.
The call returns a MongoDB cursor or throws a badarg error if it cannot setup the iterator.
next
Fetches next available document given a cursor object obtained via find
.
The call returns the next available document or false
if all documents have been fetched.
take
Fetches the next nr_of_docs
documents given a cursor object obtained via find
.
The call returns a Lua array containing the documents or false
if all documents have been fetched.
close
Closes and cleans up a cursor object obtained via find
.
The call returns true
.
find_one
Finds the first document in the collection that matches the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
args
: A Lua table that currently supports an optional projector=LuaTable
element.
The call returns the matched document or false
in case no document was found.
ensure_pool
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
password
: Redis password for login.
host
: Host name for the Redis server (default is localhost)
port
: Port that the Redis server is listening on (default is 6379).
database
: Redis database (default is 0).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
cmd
Executes the given Redis command.
pool_id
: Name of the connection pool
command
: Redis command string.
args...
: Extra args.
This call returns a Lua table, true
, false
, or nil
. In case it cannot parse the command a badarg error is thrown.
ensure_pool
Ensures that the pool named config.pool_id
is setup in the system, The config
argument is a lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
host
: Host name for the Memcached server (default is localhost)
port
: Port that the Redis server is listening on (default is 11211).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
flush_all(pool_id)
Flushes all data from the Memcached server. Use with care.
Returns true
.
get(pool_id, key)
Get data for key key
.
Returns the data for the key and otherwise false
.
set(pool_id, key, value, expiration)
Unconditionally set a value for a key.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
.
add(pool_id, key, value, expiration)
Add a key/value pair if the key doesn't already exist.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
if key
didn't already exist, false
otherwise.
replace(pool_id, key, value, expiration)
Replace a key/value pair if the key already exists.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
if key
already exists, false
otherwise.
delete(pool_id, key)
Delete key
and the associated value.
Returns true
if the key/value pair was deleted, false
otherwise
ensure_pool
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 10).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
get, put, post, delete
Executes a HTTP request with the given url and args.
url
: A valid http url.
body
: optional body to be included in the request.
headers
: optional Lua table containing extra headers to be included in the request.
This call returns false
in case of an error or a Lua table of the form:
body
Fetches the response body given a client ref obtained via the response Lua table.
This call returns false
in case of an error or the response body.
Before VerneMQ 1.5.0 it was imperative to call http.body(client_ref)
after doing a http request as otherwise the underlying connection would not be released back into the connection pool, leading to connection pool exhaustion. This problem would manifest itself as connection timeout errors. This is no longer necessary, the connection will be released automatically back to the connection pool.
encode
Encodes a Lua value to a JSON string.
This call returns false if it cannot encode the given value.
decode
Decodes a JSON string to a Lua value.
This call returns false if it cannot decode the JSON string.
Uses the VerneMQ logging infrastructure to log the given log_string
.
In the following we describe how a typical VerneMQ deployment can look and some of the concerns one have to take into account when designing such a system.
A typical VerneMQ deployment could from a high level look like the following:
In this scenario MQTT clients connect from the internet and are authenticated and authorized against the Authentication Management Service and publish and receive messages, either with each other or with the Backend-Services which might be responsible for sending control messages to the clients or storing and forwarding messages to other systems or databases for later processing.
To build and deploy a system such as the above a lot of decisions has to be made. These can concern how to do authentication and authorization, where to do TLS termination, how the load balancer should be configured (if one is needed at all), what the MQTT architecture and topic trees should look and how and to what level the system can/should scale. To simplify the following discussion we'll set a few requirements:
Clients connecting from the internet are using TLS client certificates
The messaging pattern is largely fan-in: The clients continuously publish a lot of messages to a set of topics which have to be handled by the Backend-Services.
The client sessions are persistent, which means the broker will store QoS 1 & 2 messages routed to the clients while the clients are offline.
In the following we'll cover some of these options and concerns.
Often a load balancer is deployed between MQTT clients and the VerneMQ cluster. One of the main purposes of the load balancer is to ensure that client connections are distributed between the VerneMQ nodes so each node has the same amount of connections. Usually a load balancer provides different load balancing strategies for deciding how to select the node where it should route an incoming connection. Examples of these are random, source hashing (based on source IP) or even protocol-aware balancing based on for example the MQTT client-id. The last two are examples of sticky balancing or session affine strategies where a client will always be routed to the same cluster node as long as the source IP or client-id remains the same.
When using a load balancer the client is no longer directly connected to the VerneMQ nodes and therefore the peer port and IP-address VerneMQ sees is therefore not that of the client, but of the load balancer. The peer information is often important for logging reasons or if a plugin checks it up against a white/black list.
To solve this problem VerneMQ supports the PROXY Protocol v1 and v2 which is designed to transport connection information across proxies. See here how to enable the proxy protocol for an MQTT listener. In case TLS is terminated at the load balancer and client certificates are used PROXY Protocol (v2) will also take care of forwarding TLS client certificate details.
Often if client certificates are used to verify and authenticate the clients. VerneMQ makes it possible to make the client certificate common name (CN) available for the authentication plugin system by overriding the MQTT username with the CN, before authentication is performed. If TLS is terminated at the load balancer then the PROXY Protocol would be used This works for both if TLS is terminated in a load balancer or if TLS is terminated directly in VerneMQ. In case TLS is terminated at the load balancer then the listener can be configured as follows to achieve this effect:
If TLS is terminated directly in VerneMQ the PROXY protocol isn't needed as the TLS client certificate is directly available in VerneMQ and the CN can be used to instead of the username by setting:
See the details in the MQTT listener section.
The actual authentication can then be handled by an authentication and authorization plugin like vmq_diversity which supports PostgreSQL, CockroachDB, MongoDB, Redis and MySQL as backends for storing credentials and ACL rules.
Another important aspect of running a VerneMQ is having proper monitoring and alerting in place. All the usual things should be monitored at the OS level such as memory and cpu usage and alerts should be put in place to actions can be taken if a disk is filling up or a VerneMQ node is starting to use too much CPU. VerneMQ exports a large number of metrics and depending on the use case these can be used as important indicators that the system is running
When designing a system like the one described here, there are a number of things to consider in order to get the best performance out of the available resources.
As mentioned earlier clients in this scenario are using persistent sessions. In VerneMQ a persistent session exists only on the VerneMQ node where the client connected. This implies that if the client using a persistent session later reconnects to another node, then the session, including any offline messages, will be moved to the new node. This has a certain overhead and can be avoided if the load balancer in front of VerneMQ is using a session affine load balancing strategy such as IP source hashing to assign the client connecting to a node. Of course this strategy isn't perfect if clients often change their IP addresses, but for most cases it is a huge improvement over a random load balancing strategy.
In many systems the MQTT clients provide a lot of data by periodically broadcasting data to the MQTT cluster. The amount of published messages can very easily become hard to manage for a single MQTT client. Further using normal MQTT subscriptions all subscribers would receive the same messages, so adding more subscribers to a topic doesn't help handling the amount of messages. To solve this VerneMQ implements a concept called shared subscriptions which makes it possible to distribute MQTT messages published to a topic over several MQTT clients. In this specific scenario this would mean the Backend-Services would consist of a set of clients subscribing to cluster nodes using shared subscriptions.
To avoid expensive intra-node communication, VerneMQ shared subscriptions support a policy called local_only
which means that messages being will be delivered to shared subscribers on the local node only and not forwarded to shared subscribers on other nodes in the cluster. With this policy messages for the backend-services can be delivered in the fastest and most expedient manner with the lowest overhead. See the shared subscriptions documentation for more information.
Controlling TCP buffer sizes is important in ensuring optimal memory usage. The rule is that the more bandwith or the lower latency required, the larger the TCP buffer sizes should be. Many IoT communicate with a very low bandwith and as such the server side TCP buffer sizes for these does not need to be very large. On the other hand, in this scenario the consumers handling the fan-ins in the Bacend-Services will have many (thousands or tens of thousands of messages per second) and they can benefit from larger TCP buffer sizes. Read more about tuning TCP buffer sizes here.
An important guideline in protecting a VerneMQ cluster from overload is to allow only what is necessary. This means having and enforcing sensible authentication and authorization rules as well as configuring conservatively so resources cannot be exhausted due to human error or MQTT clients that have turned mailicious. For example in VerneMQ it is possible to specify how many offline messages a persistent session can maximally hold via the max_offline_messages
setting - and it should then be set to the lowest acceptable value which works for all clients and/or use a plugin which is able to override such settings on a per-client basis. The load balancer can also play an important role in protecting the system in that it can control the connect rates as well as imposing bandwith restrictions on clients.
Somehow a system like this has to be deployed. How to do this will not be covered here, bit it is certainly possible to deploy VerneMQ using tools like Ansible, Chef or Puppet or use container solutions such as Kubernetes. For more information on how to deploy VerneMQ on Kubernetes check out our guide: VerneMQ on Kubernetes.
How to implement VerneMQ plugins using a HTTP interface
The VerneMQ Webhooks plugin provides an easy and flexible way to build powerful plugins for VerneMQ using web hooks. With VerneMQ Webhooks you are free to select the implementation language to match your technical requirements or the language in which you feel comfortable and productive in. You can use any modern language such as Python, Go, C#/.Net and indeed any language in which you can build something that can handle HTTP requests.
The idea of VerneMQ Webhooks very simple: you can register an HTTP endpoint with a VerneMQ plugin hook and whenever the hook (such as auth_on_register
) is called, the VerneMQ Webhooks plugin dispatches a HTTP post request to the registered endpoint. The HTTP post request contains a HTTP header like vernemq-hook: auth_on_register
and a JSON encoded payload. The endpoint then responds with code 200 on success and with a JSON encoded payload informing the VerneMQ Webhooks plugin which action to take (if any).
To enable webhooks make sure to set:
And then each webhook can be configured like this:
It is also possible to dynamically register webhooks at run-time:
See which endpoints are registered:
And finally deregistering an endpoint:
We recommend placing the endpoint implementation locally on each VerneMQ node such that each request can go over localhost without being subject to network issues. Also note that currently VerneMQ Webhooks does not encrypt requests in any way or use HTTPS, so care should be taken if the endpoints are made reachable over the network.
Each registered hook uses by default a connection pool containing maximally 100 connections. This can be changed by setting vmq_webhooks.pool_max_connections
to a different value. Similarly the vmq_webhooks.pool_timeout
configuration (value is in milliseconds) can be set to control how long an unused connection should stay in the connection pool before being closed and removed. The default value is 60000 (60 seconds).
These options are available in VerneMQ 1.4.0.
VerneMQ webhooks support caching of the auth_on_register
, auth_on_publish
and auth_on_subscribe
hooks.
This can be used to speed up authentication and authorization tremendously. All data passed to these hooks is used to look if the call is in the cache, except in the case of the auth_on_publish
where the payload is omitted.
To enable caching for an endpoint simply return the cache-control: max-age=AgeInSeconds
in the response headers to one of the mentioned hooks. If the call was successful (authentication granted), the request will be cached together with any modifiers, except for the payload
modifier in the auth_on_publish
hook.
Whenever a non-expired entry is looked up in the cache the endpoint will not be called and the modifiers of the cached entry will be returned, if any.
It is possible to inspect the cache using:
Cache entries are currently not actively disposed after expiry and will remain in memory.
All webhooks are called with method POST
. All hooks need to be answered with the HTTP code 200
to be considered successful. Any hook called that does not return the 200
code will be logged as an error as will any hook with an unparseable payload.
All hooks are called with the header vernemq-hook
which contains the name of the hook in question.
For detailed information about the hooks and when they are called, see the sections Session Lifecycle, Subscribe Flow and Publish Flow.
Note, when overriding a mountpoint or a client-id both have to be returned by the webhook implementation for it to have an effect.
Header: vernemq-hook: auth_on_register
Webhook example payload:
A minimal response indicating the authentication was successful looks like:
It is also possible to override various client specific settings by returning an array of modifiers:
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses:
Header: vernemq-hook: auth_on_subscribe
Webhook example payload:
A minimal response indicating the subscription authorization was successful looks like:
Another example where where the topics to subscribe have been rewritten looks like:
Note, you can also pass a qos
with value 128
which means it was either not possible or the client was not allowed to subscribe to that specific question.
Other possible responses:
Header: vernemq-hook: auth_on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
A minimal response indicating the publish was authorized looks like:
A more complex example where the publish topic, qos, payload and retain flag is rewritten looks like:
Other possible responses:
Header: vernemq-hook: on_register
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe
Webhook example payload:
Example response:
Other possible responses:
Header: vernemq-hook: on_deliver
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
Example response:
Other possible responses:
Header: vernemq-hook: on_offline_message
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_wakeup
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_offline
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_gone
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: auth_on_register_m5
Webhook example payload:
A minimal response indicating the authentication was successful looks like:
It is also possible to override various client specific settings by returning an array of modifiers:
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses:
Header vernemq-hook: on_auth_m5
Webhook example payload:
Note, as the authentication data is binary data it is base64 encoded.
A minimal response indicating the authentication was successful looks like:
If authentication were to continue for another round a reason code with value 24 (Continue Authentication) should be returned instead. See also the relevant section in the MQTT 5.0 specification.
Header: vernemq-hook: auth_on_subscribe_m5
Webhook example payload:
A minimal response indicating the subscription authorization was successful looks like:
Another example where where the topics to subscribe have been rewritten looks like:
Note, the forbidden/topic
has been rejected with the qos
value of 135 (Not authorized).
Other responses
Header: vernemq-hook: auth_on_publish_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
A minimal response indicating the publish was authorized looks like:
A response where the publish topic has been rewritten:
Other possible responses:
Header: vernemq-hook: on_register_m5
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish_m5
Note, in the example below the payload is base64 encoded .
Webhook example payload:
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe_m5
Webhook example payload:
Note, the qos value of 128
(Unspecified error) means the subscription was rejected.
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe_m5
Webhook example payload:
Example response:
Other possible responses:
Header: vernemq-hook: on_deliver_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
Example response:
Other possible responses:
Below is a very simple example of an endpoint implemented in Python. It uses the web
and json
modules and implements handlers for three different hooks: auth_on_register
, auth_on_publish
and auth_on_subscribe
.
The auth_on_register
hook only restricts access only to the user with username joe
and password secret
. The auth_on_subscribe
and auth_on_publish
hooks allow any subscription or publish to continue as is. These last two hooks are needed as the default policy is deny
.
You can loadtest VerneMQ with our vmq_mzbench tool. It is based on Machinezone's very powerful MZBench system and lets you narrow down what hardware specs are needed to meet your performance goals. You can state your requirements for latency percentiles (and much more) in a formal way, and let vmq_mzbench automatically fail, if it can't meet the requirements.
If you have an AWS account, vmq_mzbench can automagically provision worker nodes for you. You can also run it locally, of course.
Please follow the MZBench installation guide
Actually, you don't even have to install vmq_mzbench, if you don't want to. Your scenario file will automatically fetch vmq_mzbench for any test you do. vmq_mzbench runs every test independently, so it has a provisioning step for any test, even if you only run it on a local worker.
To install vmq_mzbench on your computer, go through the following steps:
To provision your tests from this local repository, you'll have to tell the scenario scripts to use rsync. Add this to the scenario file:
If you'd just like the script itself fetch vmq_mzbench, then you can direct it to github:
MZBench recently switched from an Erlang-styled Scenario DSL to a more python-like DSL dubbed BDL (Benchmark Definition Language). Have a look at the BDL examples on Github.
You can familiarize yourself quickly with MZBench's guide on writing loadtest scenarios.
There's not much to learn, just make sure you understand how pools and loops work. Then you can add the vmq_mzbench statement functions to the mix and define actual loadtest scenarios.
Currently vmq_mzbench exposes the following statement functions for use in MQTT scenario files:
random_client_id(State, Meta, I)
: Create a random client Id of length I
fixed_client_id(State, Meta, Name, Id)
: Create a deterministic client Id with schema Name ++ "-" ++ Id
worker_id(State, Meta)
: Get the internal, sequential worker Id
client(State, Meta)
: Get the client Id you set yourself during connection setup with the option {t, client, "client"}
connect(State, Meta, ConnectOpts)
: Connect to the broker with the options given in ConnectOpts
disconnect(State, Meta)
: Disconnect normally
subscribe(State, Meta, Topic, QoS)
: Subscribe to Topic with Quality of Service QoS
unsubscribe(State, Meta, Topic)
: Unubscribe from Topic
publish(State, Meta, Topic, Payload, QoS)
: Publish a message with binary Payload to Topic with QoS
publish(State, Meta, Topic, Payload, QoS, RetainFlag)
: Publish a message with binary Payload to Topic with QoS and RetainFlag
It's easy to add more statement functions to the MQTT worker if needed, get in touch with us.
This guide describes how to deploy a VerneMQ cluster on Kubernetes
Kubernetes (K8s) is possibly the most mature technology for deploying Docker containers at scale. While running a single Docker container is supposed to be easy, running a Kubernetes cluster definitely isn't. That's why we recommended to work with a certified Kubernetes partner such as Amazon AWS EKS, Google Cloud GKE, Microsoft Azure AKS, or DigitalOcean.
If your applications already live in Docker containers and are deployed on Kubernetes it can be beneficial to also run VerneMQ on Kubernetes. This guide covers how to successfully deploy a VerneMQ cluster on Kubernetes. Multiple options exist to deploy a VerneMQ cluster at this point. This guide describes how to use the official Helm chart as well as the still experimental Kubernetes Operator.
For the sake of clarity, this guide defines the following terms:
Kubernetes Node: A single virtual or physical machine in a Kubernetes cluster.
Kubernetes Cluster: A group of nodes firewalled from the internet, that are the primary compute resources managed by Kubernetes.
Edge router: A router that enforces the firewall policy for your cluster. This could be a gateway managed by a cloud provider or a physical piece of hardware.
Cluster network: A set of links, logical or physical, that facilitate communication within a cluster according to the Kubernetes networking model.
Service: A Kubernetes Service that identifies a set of pods using label selectors. Unless mentioned otherwise, Services are assumed to have virtual IPs only routable within the cluster network
VerneMQ Cluster: A group of VerneMQ containers that are connected via the Erlang Distribution as well as the VerneMQ clustering mechanism.
This guide assumes that you're familiar with Kubernetes
Helm calls itself the package manager for Kubernetes. In Helm a package is called a chart. VerneMQ comes with such a Helm chart simplifying the initial setup tremendously. If you don't have setup Helm yet, please navigate through their quickstart guide.
Once Helm is properly setup just run the following command in your shell.
This will deploy a single node VerneMQ cluster. Have a look at the possible configuration here.
A Kubernetes Operator is a method of packaging, deploying and managing a Kubernetes application. The VerneMQ Operator is basically just a Pod with the task to deploy a VerneMQ cluster given a so called Custom Resource Definition (CRD). The VerneMQ CRD aims that all required configuration can be made through the CRD and no further configuration should be required. The following command installs the operator along a two node VerneMQ cluster into the namespace messaging
This will result in the following Pods:
And the following cluster status:
At this point you would like to further configure authentication and authorization. The following port forwards may be useful at this point.
kubectl port-forward svc/vernemq-k8s --namespace messaging 1883:1883
kubectl port-forward svc/vernemq-k8s --namespace messaging 8888:8888
In a VerneMQ cluster it doesn't matter to which node a MQTT client connects, subscribes or publishes. A VerneMQ cluster looks like one big MQTT broker to the outside. While this is the main idea of VerneMQ it comes with a cost, namely the data replication/synchronization overhead when 'persistent' clients hop from one pod to the other. As a consequence, we recommend to intelligently choose how to load balance your MQTT clients.
Load balancing in Kubernetes is configured via the Service object. Multiple service types exist:
The ClusterIP type is the default and only permits access from within the Kubernetes cluster. Other pods in the Kubernetes cluster can access VerneMQ via ClusterIP:Port
. The underlying balancing strategy is based on the settings of kube-proxy. Also this type requires that one terminates TLS either in VerneMQ directly or via a different Pod e.g. HAproxy.
The NodePort type uses ClusterIP under the hood but allocates a Port on every Kubernetes node and routes incoming traffic from NodeIP:NodePort
to the ClusterIP:Port
. Like with ClusterIP this type requires that one terminates TLS either in VerneMQ directly or via a different Pod e.g. HAproxy.
The Loadbalancer type uses an external load balancer provided by the cloud provider. In fact this Service type only provides the glue code required to interact with the Loadbalancing services from different cloud providers. If you're running a bare-metal Kubernetes cluster you won't be able to use this Service type, unless you deploy a Kubernetes aware network loadbalancer yourself. Check out MetalLB, which provides a network loadbalancer for bare-metal Kubernetes clusters.
Every Kubernetes node runs a kube-proxy. kube-proxy maps virtual IP addresses to services and creates the required routes in the system so that pods can communicate with each other.
kube-proxy supports multiple modes of operation: - userspace since v1.0 - iptables default since v1.2 - ipvs stable since v1.11, only available if the Kernel of the Kubernetes node supports it.
The performance and scalability characteristics of VerneMQ depend on the proxy-mode and the related configurations. This is especially true for load-balancing specific functionality such as session affinity. E.g. only ipvs supports an efficient way to provide session affinity via the source hashing strategy.
Ingress controllers provide another way to do load balancing and TLS termination in a Kubernetes cluster. However the officially supported ingress controllers nginx and GCE focus on balancing HTTP requests instead of plain TCP connections. Therefore their support for TLS termination is also limited to HTTPS.
Multiple third-party ingress controllers exist, however most of them focus on handling HTTP requests. One of the exceptions is Voyager by AppsCode an ingress controller based on HAProxy, which also efficiently terminates TLS.
Use an external loadbalancer provided by the cloud provider that is capable of terminating TLS and apply a load balancing strategy that provides session affinity e.g. via source hashing.
Terminate TLS outside VerneMQ.
Configure the Pod NodeAffinity correctly to ensure that only one VerneMQ pod is scheduled on any Kubernetes cluster node.
It's preferred to have a smaller number of Pods that are very powerful in terms of available CPU and RAM than the opposite.
VerneMQ can consume a large number of open file handles when thousands of clients are connected as every connection requires at least one file handle.
Most operating systems can change the open-files limit using the ulimit -n
command. Example:
However, this only changes the limit for the current shell session. Changing the limit on a system-wide, permanent basis varies more between systems.
On most Linux distributions, the total limit for open files is controlled by sysctl
.
As seen above, it is generally set high enough for VerneMQ. If you have other things running on the system, you might want to consult the sysctl manpage manpage for how to change that setting. However, what most needs to be changed is the per-user open files limit. This requires editing /etc/security/limits.conf
, for which you'll need superuser access. If you installed VerneMQ from a binary package, add lines for the vernemq
user like so, substituting your desired hard and soft limits:
On Ubuntu, if you’re always relying on the init scripts to start VerneMQ, you can create the file /etc/default/vernemq and specify a manual limit like so:
This file is automatically sourced from the init script, and the VerneMQ process started by it will properly inherit this setting. As init scripts are always run as the root user, there’s no need to specifically set limits in /etc/security/limits.conf
if you’re solely relying on init scripts.
On CentOS/RedHat systems, make sure to set a proper limit for the user you’re usually logging in with to do any kind of work on the machine, including managing VerneMQ. On CentOS, sudo
properly inherits the values from the executing user.
It can be helpful to enable PAM user limits so that non-root users, such as the vernemq
user, may specify a higher value for maximum open files. For example, follow these steps to enable PAM user limits and set the soft and hard values for all users of the system to allow for up to 65536 open files.
Edit /etc/pam.d/common-session
and append the following line:
If /etc/pam.d/common-session-noninteractive
exists, append the same line as above.
Save and close the file.
Edit /etc/security/limits.conf
and append the following lines to the file:
Save and close the file.
(optional) If you will be accessing the VerneMQ nodes via secure shell (ssh), you should also edit /etc/ssh/sshd_config
and uncomment the following line:
and set its value to yes
as shown here:
Restart the machine so that the limits to take effect and verify
that the new limits are set with the following command:
Edit /etc/security/limits.conf
and append the following lines to
the file:
Save and close the file.
Restart the machine so that the limits to take effect and verify that the new limits are set with the following command:
In the above examples, the open files limit is raised for all users of the system. If you prefer, the limit can be specified for the vernemq
user only by substituting the two asterisks (*) in the examples with vernemq
.
In Solaris 8, there is a default limit of 1024 file descriptors per process. In Solaris 9, the default limit was raised to 65536. To increase the per-process limit on Solaris, add the following line to /etc/system
:
Reference:
To check the current limits on your Mac OS X system, run:
The last two columns are the soft and hard limits, respectively.
To adjust the maximum open file limits in OS X 10.7 (Lion) or newer, edit /etc/launchd.conf
and increase the limits for both values as appropriate.
For example, to set the soft limit to 16384 files, and the hard limit to 32768 files, perform the following steps:
Verify current limits:
The response output should look something like this:
Edit (or create) /etc/launchd.conf
and increase the limits. Add lines that look like the following (using values appropriate to your environment):
Save the file, and restart the system for the new limits to take effect. After restarting, verify the new limits with the launchctl limit command:
The response output should look something like this:
Attributions
This work, "Open File Limits", is a derivative of Open File Limits by Riak, used under Creative Commons Attribution 3.0 Unported License. "Open File Limits" is licensed under Creative Commons Attribution 3.0 Unported License by Erlio GmbH.
You need to know about and configure a couple of Operating System and Erlang VM configs to operate VerneMQ efficiently. First, make sure you have set appropriate OS file limits according to our guide here. Second, when you run into performance problems, don't forget to check the settings in the vernemq.conf
file. (Can't open more than 10k connections? Well, is the listener configured to open more than 10k?)
This is the number one topic to look at, if you need to keep an eye on RAM usage.
Context: All network I/O in Erlang uses an internal driver. This driver will allocate and handle an internal application side buffer for every TCP connection. The default size of these buffers will determine your overall RAM use in VerneMQ. The sndbuf and recbuf of the TCP socket will not count towards VerneMQ RAM, but will be used by the Linux Kernel.
VerneMQ calculates the buffer size from the OS level TCP send and receive buffers:
val(buffer) >= max(val(sndbuf),val(recbuf))
Those values correspond to net.ipv4.tcp_wmem
and net.ipv4.tcp_rmem
in your OS's sysctl configuration. One way to minimize RAM usage is therefore to configure those settings (Debian example):
This would result in a 32KB application buffer for every connection.
If your VerneMQ use case requires the use of different TCP buffer optimisations (per groups of clients for instance) you will have to make sure the that the Linux OS buffer configuration, namely net.ipv4.tcp_wmem
and net.ipv4.tcp_rmem
allows for this kind of flexibility, allowing for small TCP buffers and big TCP buffers at the same time.
Example 1 above would allow VerneMQ to allocate minimal TCP read and write buffers of 4KB in the Linux Kernel, a max read buffer of 32KB in the kernel, and a max write buffer of 65KB in the kernel. VerneMQ itself would set its own internal per connection buffer to 65KB in addition.