Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Welcome to the VerneMQ documentation! This is a reference guide for most of the available features and options of VerneMQ. The Getting Started guide might be a good entry point.
For a more general overview on VerneMQ and MQTT, you might want to start with the introduction.
For downloading VerneMQ see Downloads.
Everything you must know to properly configure VerneMQ
Every VerneMQ node has to be configured. Depending on the installation method and chosen platform the configuration file vernemq.conf
resides at different locations. If VerneMQ was installed through a Linux package the default location for the configuration file is /etc/vernemq/vernemq.conf
.
A single setting is handled on one line.
Lines are structured Key = Value
Any line starting with # is a comment, and will be ignored
You certainly want to try out VerneMQ right away. For that you could disable authentication like so:
Set allow_anonymous = on
By default the vmq_acl
authorization plugin is enabled and configured to allow publishing and subscribing to any topic, see here for more information.
A quick and simple guide to get started with VerneMQ
VerneMQ is a high-performance, distributed MQTT message broker. It scales horizontally and vertically on commodity hardware to support a high number of concurrent publishers and consumers while maintaining low latency and fault tolerance. To use it, all you need to do is install the VerneMQ package.
Choose your OS and follow the instructions:
It is also possible to run VerneMQ using our Docker image:
To start a VerneMQ broker, use the vernemq start command in your Shell:
A successful start will return no output. If there is a problem starting the broker, an error message is printed to STDERR
.
To run VerneMQ with an attached interactive Erlang console:
A VerneMQ broker is typically started in console mode for debugging or troubleshooting purposes. Note that if you start VerneMQ in this manner, it is running as a foreground process that will exit when the console is closed.
You can close the console by issuing this command at the Erlang prompt:
Once your broker has started, you can initially check that it is running with the vernemq ping command:
The command will respond with pong
if the broker is running or Node <NodeName> not responding to pings
in case it’s not.
As you may have noticed, VerneMQ will warn you at startup when your system’s open files limit (ulimit -n
) is too low. You’re advised to increase the OS default open files limit when running VerneMQ. Read more about why and how in the .
Set the time in seconds after a QoS=1 or QoS=2
message has been sent that VerneMQ will wait before retrying when no response is received.
This option default to 20
seconds.
This option defines the maximum number of QoS 1 or 2 messages that can be in the process of being transmitted simultaneously.
Defaults to 20
messages, use 0
for no limit. The inflight window serves as a protection for sessions, on the incoming side.
The maximum number of messages to hold in the queue above those messages that are currently in flight. Defaults to 1000
. Set to -1
for no limit. This option protects a client session from overload by dropping messages (of any QoS).
Defaults to 1000
messages, use -1
for no limit. This parameter was named max_queued_messages
in 0.10.*
. Note that 0
will totally block message delivery from any queue!
This option specifies the maximum number of QoS 1 and 2 messages to hold in the offline queue.
Defaults to 1000
messages, use -1
for no limit, use 0
if no messages should be stored.
In contrast to the session based inflight window, max_online_messages and max_offline_messages serves as a protection of queues, on the outgoing side.
retry_interval = 20
max_inflight_messages = 20
max_online_messages = 1000
max_offline_messages = 1000
vernemq start
vernemq console
q().
vernemq ping
VerneMQ can be installed on CentOS-based systems using the binary package we provide.
Once you have downloaded the binary package, execute the following command to install VerneMQ:
sudo yum install vernemq-<%= latest_version() %>-1.el7.centos.x86_64.rpm
or:
sudo rpm -Uvh vernemq-<%= latest_version() %>-1.el7.centos.x86_64.rpm
Once you've installed VerneMQ, start it on your node:
service vernemq start
You can verify that VerneMQ is successfully installed by running:
rpm -qa | grep vernemq
If VerneMQ has been installed successfully vernemq
is returned.
Now that you've installed VerneMQ, check out How to configure VerneMQ.
As well as being available as packages that can be installed directly into the operating systems, VerneMQ is also available as a Docker image. Below is an example of how to set up a couple of VerneMQ
docker run --name vernemq1 -d erlio/docker-vernemq
Somtimes you need to configure a forwarding for ports (on a Mac for example):
docker run -p 1883:1883 --name vernemq1 -d erlio/docker-vernemq
This starts a new node that listens on 1883 for MQTT connections and on 8080 for MQTT over websocket connections. However, at this moment the broker won't be able to authenticate the connecting clients. To allow anonymous clients use the DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
environment variable.
docker run -e "DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on" --name vernemq1 -d erlio/docker-vernemq
This allows a newly started container to automatically join a VerneMQ cluster. Assuming you started your first node like the example above you could autojoin the cluster (which currently consists of a single container 'vernemq1') like the following:
docker run -e "DOCKER_VERNEMQ_DISCOVERY_NODE=<IP-OF-VERNEMQ1>" --name vernemq2 -d erlio/docker-vernemq
(Note, you can find the IP of a docker container using docker inspect <CONTAINER_NAME> | grep \"IPAddress\"
).
To check if the above containers have successfully clustered you can issue the vmq-admin
command:
docker exec vernemq1 vmq-admin cluster show
+--------------------+-------+
| Node |Running|
+--------------------+-------+
|[email protected]| true |
|[email protected]| true |
+--------------------+-------+
VerneMQ can be installed on Debian or Ubuntu-based systems using the binary package we provide.
Once you have downloaded the binary package, execute the following command to install VerneMQ:
sudo dpkg -i vernemq_<%= latest_version() %>-1_amd64.deb
You can verify that VerneMQ is successfully installed by running:
dpkg -s vernemq | grep Status
If VerneMQ has been installed successfully Status: install ok installed
is returned.
Once you've installed VerneMQ, start it on your node:
service vernemq start
The whereis vernemq
command will give you a couple of directories:
whereis vernemq
vernemq: /usr/sbin/vernemq /usr/lib/vernemq /etc/vernemq /usr/share/vernemq
Path
Description
/usr/sbin/vernemq:
the vernemq and vmq-admin commands
/usr/lib/vernemq
the vernemq package
/etc/vernemq
the vernemq.conf file
/usr/share/vernemq
the internal vernemq schema files
/var/lib/vernemq
the vernemq data dirs for LevelDB (Metadata Store and Message Store)
Now that you've installed VerneMQ, check out How to configure VerneMQ.
VerneMQ supports multiple ways to configure one or many MQTT listeners.
Listeners specify on which IP address and port VerneMQ should accept new incoming connections. Depending on the chosen transport (TCP, SSL, WebSocket) different configuration parameters have to be provided. VerneMQ allows to write the listener configurations in a hierarchical manner, enabling very flexible setups. VerneMQ applies reasonable defaults on the top level, which can be of course overridden if needed.
# defines the default nr of allowed concurrent
# connections per listener
listener.max_connections = 10000
# defines the nr. of acceptor processes waiting
# to concurrently accept new connections
listener.nr_of_acceptors = 10
# used when clients of a particular listener should
# be isolated from clients connected to another
# listener.
listener.mountpoint = off
These are the only default parameters that are applied for all transports, and the only one that are of interest for plain TCP and WebSocket listeners.
These global defaults can be overridden for a specific transport protocol listener.tcp.CONFIG = VAL
, or even for a specific listener listener.tcp.LISTENER.CONFIG = VAL
. The placeholder LISTENER
is freely chosen and is only used as a reference for further configuring this particular listener.
Normally, an MQTT broker hosts one single topic tree. This means that all topics are accessible to all publishers and subscribers (limited by the ACLs you configured, of course). Mountpoints are a way to host multiple topic trees in a single broker. They are completely separated and clients with different topic trees cannot publish messages to each other. This could be useful if you provide MQTT services to multiple separated use cases/verticals or clients, with a single broker. Note that mountpoints are configured via different listeners. As a consequence, the MQTT clients will have to connect to a specific port to connect to a specific topic space (mountpoint).
The mountpoints can be configured on the protocol level or configurred or overridden on the specific listener level.
listener.ssl.mountpoint = ssl-mountpoint
listener.tcp.listener1.mountpoint = tcp-listener1
listener.tcp.listener2.mountpoint = tcp-listener2
Since VerneMQ 1.5.0 it is possible to configure which MQTT protocol versions as listener will accept.
VerneMQ supports MQTT 3.1, 3.1.1, and 5.0 (since VerneMQ 1.6.0). To allow these protocol versions, set:
listener.tcp.allowed_protocol_versions = 3,4,5
Here 3,4,5
are the protocol level versions corresponding to MQTT 3.1, 3.1.1 and 5.0 respectively. The default value is 3,4
thus allowing MQTT 3.1 and 3.1.1, while MQTT 5.0 is disabled.
Listen on TCP port 1883 and for WebSocket Connections on port 8888:
listener.tcp.default = 127.0.0.1:1883
listener.ws.default = 127.0.0.1:8888
An additional listener can be added by using a different name. In the example above the name equals to default
and can be used for further configuring this particular listener. The following example demonstrates how an additional listener is defined as well as how the maximum number of connections can be limited for this listener:
listener.tcp.my_other = 127.0.0.1:18884
listener.tcp.my_other.max_connections = 100
VerneMQ listeners can be configured to accept connections from a proxy server that supports the PROXY protocol. This enables VerneMQ to retrieve peer information such as source IP/Port but also PROXY Version 2 protocol TLS client certificate details if the proxy was used to terminate TLS.
To enable the PROXY protocol for tcp listeners use listener.tcp.proxy_protocol = on
or for a specific listener use listener.tcp.LISTENER.proxy_protocol = on
.
Accepting SSL connections on port 8883:
listener.ssl.cafile = /etc/ssl/cacerts.pem
listener.ssl.certfile = /etc/ssl/cert.pem
listener.ssl.keyfile = /etc/ssl/key.pem
listener.ssl.default = 127.0.0.1:8883
If you want to use client certificates to authenticate your clients you have to set the following option:
listener.ssl.require_certificate = on
If you use client certificates and want to use the certificates CN value as a username you can set:
listener.ssl.use_identity_as_username = on
Both options require_certificate
and use_identity_as_username
default to off
.
The same configuration options can be used for securing WebSocket connections, just use wss
as the protocol identifier e.g. listener.wss.require_certificate
.
VerneMQ uses Google's LevelDB as a fast storage backend for messages and subscriber information. Each VerneMQ node runs its own embedded LevelDB store.
There's not much you need to know about LevelDB and VerneMQ. One really important thing to note is that LevelDB manages its own memory. This means that VerneMQ will not allocate and free memory for LevelDB. Instead you'll have to configure a configuration value in vernemq.conf that tells LevelDB how much memory it can use up.
Configuring LevelDB memory:
leveldb.maximum_memory.percent = 20
LevelDB means business with its allocated memory. It will eventually end up with the configured max, making it look like there's a memory leak, or even triggering OOM kills. Keep that in mind when configuring the percentage of RAM you give to LevelDB.
On every VerneMQ node you'll find the vmq-admin
command line tool in the release's bin directory. It has different sub-commands that let you check for status, start and stop listeners, re-configure values and a couple of other administrative tasks.
vmq-admin
is a live re-configuration utility. Please note that all dynamically configured values will be reset by vernemq.conf upon broker restart.
Don't use this to wildly re-configure a production system without keeping track what you are doing. vmq-admin
gives you the flexibility to test stuff and react live, but please persistent any static configuration you need in the vernemq.conf file.
Working with shared subscriptions
A shared subscription is a mechanism for distributing messages to a set of subscribers to shared subscription topic, such that each message is received by only one subscriber. This contrasts with normal subscriptions where each subscriber will receive a copy of the published message.
A shared subscription is on the form $share/sharename/topic
and subscribers to this topic will receive messages published to the topic topic
. The messages will be distributed according to the defined distribution policy.
When subscribing to a shared subscription using command line tools remember to quote the topic as some command line shells, like bash
, will otherwise expand the $share
part of the topic as an environment variable.
Currently three message distribution policies for shared subscriptions are supported: prefer_local
, random
and local_only
. Under the random
policy messages will be published to a random member of the shared subscription, if any exist. Under the prefer_local
policy messages will be delivered to a random node-local member of the shared subscription, if none exist, the message will be delivered to a random member of the shared subscription on a remote cluster node. Under the local_only
policy message will be delivered to a random node-local member of the shared subscription.
When a messages is being delivered to subscribers of a shared subscription, the message will be delivered to an online subscriber if possible, otherwise the message will be delivered to an offline subscriber.
Subscriptions Note: When subscribing to a shared topic, make sure to escape the $
So, for dash or bash shells
Publishing Note: When publishing to a shared topic, do not include the prefix $share/group/
as part of the publish topic name
This section elaborates how a VerneMQ cluster deals with network partitions (aka. netsplit or split brain situation). A netsplit is mostly the result of a failure of one or more network devices resulting in a cluster where nodes can no longer reach each other.
VerneMQ is able to detect a network partition, and by default it will stop serving CONNECT
, PUBLISH
, SUBSCRIBE
, and UNSUBSCRIBE
requests. A properly implemented client will always resend unacked commands and messages are therefore not lost (QoS 0 publishes will be lost). However, the time window between the network partition and the time VerneMQ detects the partition much can happen. Moreover, this time frame will be different on every participating cluster node. In this guide we're referring to this time frame as the Window of Uncertainty.
The behaviour during a netsplit is completely configurable via allow_register_during_netsplit
, allow_publish_during_netsplit
, allow_subscribe_during_netsplit
, and allow_unsubscribe_during_netsplit
. These options supersede the trade_consistency
option. In order to reach the same behaviour as trade_consistency = on
all the mentioned netsplit options have to set to on
.
VerneMQ follows an eventually consistent model for storing and replicating the subscription data. This also includes retained messages.
Due to the eventually consistent data model it is possible that during the Window of Uncertainty a publish won't take into account a subscription made on a remote node (in another partition). Obviously, VerneMQ can't deliver the message in this case. The same holds for delivering retained messages to remote subscribers.
last will
messages that are triggered during the Window of Uncertainty will be delivered to the reachable subscribers. Currently during a netsplit, but after the Window of Uncertainty last will messages will be lost.
Normally, client registration is synchronized using an elected leader node for the given client id. Such a synchronization removes the race condition between multiple clients trying to connect with the same client id on different nodes. However, during the Window of Uncertainty it is currently possible that VerneMQ fails to disconnect a client connected to a different node. Although this scenario sounds like artificially crafted it is possible to end up with duplicate clients connected to the cluster.
As soon as the partition is healed, and connectivity reestablished, the VerneMQ nodes replicate the latest changes made to the subscription data. This includes all the changes 'accidentally' made during the Window of Uncertainty. Using VerneMQ ensures that convergence regarding subscription data and retained messages is eventually reached.
Where should VerneMQ emit the default console log messages (which are typically at info
severity):
VerneMQ defaults to log the console messages to a file, which can specified by:
This option defaults to /var/log/vernemq/console.log
for Ubuntu, Debian, RHEL and Docker installs.
The default console logging level info
could be setting one of the following:
VerneMQ log error messages by default. One can change the default behaviour by setting:
VerneMQ defaults to log the error messages to a file, which can specified by:
This option defaults to /var/log/vernemq/error.log
for Ubuntu, Debian, RHEL and Docker installs.
VerneMQ log crash messages by default. One can change the default behaviour by setting:
VerneMQ defaults to log the crash messages to a file, which can specified by:
This option defaults to /var/log/vernemq/crash.log
for Ubuntu, Debian, RHEL and Docker installs.
The maximum sizes in bytes of inidividual messages in the crash log defaults to 64KB
but can be specified by:
VerneMQ rotate crash logs. By default, the crash log file is rotated at midnight or when the size exceeds 10MGB
. This behaviour can be changed by setting:
The default number of rotated log files is 5 and can be set with the option:
VerneMQ supports logging to SysLog, enable it by setting:
Logging to SysLog is disabled by default.
The systree functionality is enabled by default and reports the broker metrics at a fixed interval defined in the vernemq.conf
. The metrics defined are transformed to MQTT topics e.g. mqtt_publish_received
is transformed to $SYS/<nodename>/mqtt/publish/received
. <nodename>
is your node's name, as configured in the vernemq.conf
. To find it, you can grep the file for it: grep nodename vernemq.conf
The complete list of metrics can be found
This option defaults to 20000
milliseconds.
If the systree feature is not required it can be disabled in vernemq.conf
The feature and the interval can be changed at runtime using the vmq-admin
script.
Usage: vmq-admin set = ... [[--node | -n] | --all]
Example: vmq-admin set systree_interval=60000 -n [email protected]
Examples:
VerneMQ supports flows or SASL style authentication for MQTT 5.0 sessions. The enhanced authentication mechanism can be used for initial authentication when the client connects or to re-authenticate clients at a later point.
The on_auth_m5
hook allows the plugin to implement SASL style authentication flows by either accepting, rejecting (disconnecting the client) or continue the flow. The on_auth_m5
hook is specified in the Erlang behaviour in the repo.
The graphite exporter reports the broker metrics at a fixed interval (defined in milliseconds) to a graphite server. The necessary configuration is done inside the vernemq.conf
.
You can further tune the connection to the Graphite server:
How to setup and configure the HTTP listener.
The VerneMQ HTTP listener is used to serve various VerneMQ subsystems such as:
By default it runs on port 8888
. To disable the HTTP listener or change the port, adapt the configuration in vernemq.conf
:
shared_subscription_policy = prefer_local
mosquitto_sub -h mqtt.example.io -p 1883 -q 2 -t \$share/group/topicname
mosquitto_sub -h mqtt.example.io -p 1883 -q 2 -t \$share/group/topicname/#
mosquito_pub -h mqtt.example.io -p 1883 -t topicname -m "This is a test message"
mosquito_pub -h mqtt.example.io -p 1883 -t topicname/group1 -m "This is a test message"
log.console = off | file | console | both
log.console.file = /path/to/log/file
log.console.level = debug | info | warning | error
log.error = on | off
log.error.file = /path/to/log/file
log.crash = on | off
log.crash.file = /path/to/log/file
log.crash.maximum_message_size = 64KB
## Acceptable values:
## - a byte size with units, e.g. 10GB
log.crash.size = 10MB
## For acceptable values see https://github.com/basho/lager/blob/master/README.md#internal-log-rotation
log.crash.rotation = $D0
log.crash.rotation.keep = 5
log.syslog = on
graphite_enabled = on
graphite_host = carbon.hostedgraphite.com
graphite_port = 2003
graphite_interval = 20000
graphite_api_key = YOUR-GRAPHITE-API-KEY
# set the connect timeout (defaults to 5000 ms)
graphite_connect_timeout = 10000
# set a reconnect timeout (default to 15000 ms)
graphite_reconnect_timeout = 10000
# set a custom graphite prefix (defaults to '')
graphite_prefix = vernemq
systree_interval = 20000
systree_enabled = off
mosquitto_sub -t '$SYS/<node-name>/#' -u <username> -P <password> -d
listener.http.default = 127.0.0.1:8888
Many aspects of VerneMQ can be extended using plugins. The standard VerneMQ package comes with several official plugins. You can show the enabled & running plugins via:
vmq-admin plugin show
The command above displays all the enabled plugins together with the hooks they implement:
+-----------+-----------+-----------------+-----------------------------+
| Plugin | Type | Hook(s) | M:F/A |
+-----------+-----------+-----------------+-----------------------------+
|vmq_passwd |application|auth_on_register |vmq_passwd:auth_on_register/5|
| vmq_acl |application| auth_on_publish | vmq_acl:auth_on_publish/6 |
| | |auth_on_subscribe| vmq_acl:auth_on_subscribe/3 |
+-----------+-----------+-----------------+-----------------------------+
vmq-admin plugin enable --name=vmq_acl
This enables the ACL plugin. Because the vmq_acl
plugin is already started the above command won't succeed. In case the plugin sits in an external directory you must also to provide the --path=PathToPlugin
.
vmq-admin plugin disable --name=vmq_acl
To make a plugin start when VerneMQ starts they need to be configured in the main vernemq.conf
file.
The general syntax to enable a plugin is to add a line like plugins.pluginname = on
, using the vmq_passwd
plugin as an example:
plugins.vmq_passwd = on
And if the plugin is external the path can be specified like this:
plugins.myplugin = on
plugins.myplugin.path = /path/to/plugin
Plugin specific settings can be configured via myplugin.somesetting = value
, like:
vmq_passwd.password_file = ./etc/vmq.passwd
See the vernemq.conf
file for details.
There are a couple of hidden options you can set in the vernemq.conf
file. Hidden means that you have to add and set the value explicitly. Hidden options still have default values. Changing them should be considered advanced, possibly with the exception of setting a max_message_rate
.
Specify how the queue should deliver messages when multiple sessions are allowed. In case of fanout
all the attached sessions will receive the message, in case of balance
an attached session is choosen randomly.
queue_deliver_mode = balance
Specify how queues should process messages, either the fifo
or lifo
way. Default is fifo
.
queue_type = fifo
Specifies the maximum incoming publish rate per session per second. Depending on the underlying network buffers this rate isn't enforced. Defaults to 0
, which means no rate limits apply. Setting to a value of 2
limits any publisher to 2 messages per second, for instance.
max_message_rate = 2
Due to the eventual consistent nature of the subscriber store it is possible that during queue migration messages still arrive on the old cluster node. This parameter enables to compensate this by keeping the queue around for some time (in seconds) after it was migrated to the other cluster node.
max_drain_time = 20
Specifies the number of messages that are delivered to the remote node per drain step. A large value will provide a faster migration of a queue, but increases the waste of bandwidth in case the migration fails.
max_msgs_per_drain_step = 1000
Allows to select a new default reg_view. A reg_view is a pre-defined way to route messages. Multiple views can be loaded and used, but one has to be selected as a default. The default routing is vmq_reg_trie
, i.e. routing via the built-in trie data structure.
vmq_reg_view = "vmq_reg_trie"
A list of views that are started during startup. It's only used in plugins that want to choose dynamically between routing reg_views.
reg_views = "[vmq_reg_trie]"
An integer specifying how many bytes are buffered in case the remote node is not available. Default is 10000
outgoing_clustering_buffer_size = 15000
Set the maximum size for client ids, MQTT v3.1 specifies a limit of 23 characters.
max_client_id_size = 23
This option default to 23
.
This option allows persistent clients (those with clean_session
set to false
) to be removed if they do not reconnect within a certain time frame.
This is a non-standard option. As far as the MQTT specification is concerned, persistent clients are persisted forever.
The expiration period should be an integer followed by one of h
, d
, w
, m
, y
for hour, day, week, month, and year; or never
:
persistent_client_expiration = 1w
This option defaults to never
.
Limit the maximum publish payload size in bytes that VerneMQ allows. Messages that exceed this size won't be accepted.
max_message_size = 0
Defaults to 0
, which means that all valid messages are accepted. MQTT specification imposes a maximum payload size of 268435455 bytes.
VerneMQ supports the WebSocket protocol out of the box. To be able to open a WebSocket connection to VerneMQ, you have to configure a WebSocket listener or Secure WebSocket listener in the vernemq.conf
file first:
listener.ws.default = 127.0.0.1:9001
listener.wss.default = 127.0.0.1:9002
Keep in mind that you'll use MQTT-over-WebSocket, so you will need a Javascript library that implements the MQTT client behaviour. We have used the Eclipse Paho client as well as MQTT.js
You won't be able to open WebSocket connections on a base URL, always add the /mqtt
path.
You can dynamically re-configure most of VerneMQ's settings on a running node by using the vmq-admin set
command.
Settings dynamically configured with the vmq-admin set
command will be reset by vernemq.conf upon broker restart.
Let's change the max_client_id_size
as an example. (We might have noticed that some clients can't login because their client ID is too long, but we don't want to restart the broker for that). Note that you can also set multiple values with the same command.
vmq-admin set max_client_id_size=45
vmq-admin set max_client_id_size=45 [email protected]
vmq-admin set max_client_id_size=45 --all
You can show one or multiple values in a simple table:
vmq-admin show max_client_id_size retry_interval
+----------------------+------------------+--------------+
| node |max_client_id_size|retry_interval|
+----------------------+------------------+--------------+
|[email protected]| 28 | 20 |
+----------------------+------------------+--------------+
`
vmq-admin show max_client_id_size retry_interval --node [email protected]
vmq-admin show max_client_id_size retry_interval --all
+----------------------+------------------+--------------+
| node |max_client_id_size|retry_interval|
+----------------------+------------------+--------------+
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 28 | 20 |
+----------------------+------------------+--------------+
We recommend to use the rebar3
toolchain to generate the basic Erlang OTP application boilerplate and start from there.
rebar3 new app name="myplugin" desc="this is my first VerneMQ plugin"
===> Writing myplugin/src/myplugin_app.erl
===> Writing myplugin/src/myplugin_sup.erl
===> Writing myplugin/src/myplugin.app.src
===> Writing myplugin/rebar.config
===> Writing myplugin/.gitignore
===> Writing myplugin/LICENSE
===> Writing myplugin/README.md
Change the rebar.config
file to include the vernemq_dev
dependency:
{erl_opts, [debug_info]}.
{deps, [{vernemq_dev,
{git, "git://github.com/vernemq/vernemq_dev.git", {branch, "master"}}}
]}.
Compile the application, this will automatically fetch vernemq_dev
.
rebar3 compile
===> Verifying dependencies...
===> Fetching vmq_commons ({git,
"git://github.com/vernemq/vernemq_dev.git",
{branch,"master"}})
===> Compiling vernemq_dev
===> Compiling myplugin
Now you're ready to implement the hooks. Don't forget to add the proper vmq_plugin_hooks
entries to your src/myplugin.app.src
file.
For a complete example, see the vernemq_demo_plugin.
Description and Configuration of the Prometheus exporter
The Prometheus exporter is enabled by default and installs an HTTP handler on http://localhost:8888/metrics
. To read more about configuring the HTTP listener, see HTTP Listener Configuration.
Add the following configuration to the scrape_configs
section inside prometheus.yml
of your Prometheus server.
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
- job_name: 'vernemq'
scrape_interval: 5s
scrape_timeout: 5s
static_configs:
- targets: ['localhost:8888']
This tells Prometheus to scrape the VerneMQ metrics endpoint every 5 seconds.
Please follow the documentation on the Prometheus website to properly configure the metrics scraping as well as how to access those metrics and configure alarms and graphs.
You can loadtest VerneMQ with our vmq_mzbench tool. It is based on Machinezone's very powerful MZBench system and lets you narrow down what hardware specs are needed to meet your performance goals. You can state your requirements for latency percentiles (and much more) in a formal way, and let vmq_mzbench automatically fail, if it can't meet the requirements.
If you have an AWS account, vmq_mzbench can automagically provision worker nodes for you. You can also run it locally, of course.
Please follow the MZBench installation guide
Actually, you don't even have to install vmq_mzbench, if you don't want to. Your scenario file will automatically fetch vmq_mzbench for any test you do. vmq_mzbench runs every test independently, so it has a provisioning step for any test, even if you only run it on a local worker.
To install vmq_mzbench on your computer, go through the following steps:
git clone git://github.com/erlio/vmq_mzbench.git
cd vmq_mzbench
./rebar get-deps
./rebar compile
To provision your tests from this local repository, you'll have to tell the scenario scripts to use rsync. Add this to the scenario file:
{make_install, [
{rsync, "/path/to/your/installation/vmq_mzbench/"},
{exclude, "deps"}]},
If you'd just like the script itself fetch vmq_mzbench, then you can direct it to github:
{make_install, [
{git, "git://github.com/erlio/vmq_mzbench.git"}]},
You can familiarize yourself quickly with MZBench's guide on writing loadtest scenarios.
There's not much to learn, just make sure you understand how pools and loops work. Then you can add the vmq_mzbench statement functions to the mix and define actual loadtest scenarios.
Currently vmq_mzbench exposes the following statement functions for use in MQTT scenario files:
random_client_id(State, Meta, I)
: Create a random client Id of length I
fixed_client_id(State, Meta, Name, Id)
: Create a deterministic client Id with schema Name ++ "-" ++ Id
worker_id(State, Meta)
: Get the internal, sequential worker Id
client(State, Meta)
: Get the client Id you set yourself during connection setup with the option {t, client, "client"}
connect(State, Meta, ConnectOpts)
: Connect to the broker with the options given in ConnectOpts
disconnect(State, Meta)
: Disconnect normally
subscribe(State, Meta, Topic, QoS)
: Subscribe to Topic with Quality of Service QoS
unsubscribe(State, Meta, Topic)
: Unubscribe from Topic
publish(State, Meta, Topic, Payload, QoS)
: Publish a message with binary Payload to Topic with QoS
publish(State, Meta, Topic, Payload, QoS, RetainFlag)
: Publish a message with binary Payload to Topic with QoS and RetainFlag
It's easy to add more statement functions to the MQTT worker if needed, get in touch with us.
The VerneMQ status page
VerneMQ comes with a built-in status page which by default is enabled and is available on http://localhost:8888/status
, see HTTP listeners.
The status page is a simple overview of the cluster and the individual nodes in the cluster as seen below:
When working with a system like VerneMQ sometimes when troubleshooting it would be nice to know what a client is actually sending and receiving and what VerneMQ is doing with this information. For this purpose VerneMQ has a built-in tracing mechanism which is safe to use in production settings as there is very little overhead in running the tracer and has built-in protection mechanisms to stop traces that produce too much information.
To trace a client the following command is available:
See the available flags by calling vmq-admin trace client --help
.
A typical trace could look like the following:
In this particular trace a trace was started for the client with client-id client
. At first no clients are connected to the node where the trace has been started, but a little later the client connects and we see the trace come alive. The strange identifier <7616.3443.1>
is called a process identifier and is the identifier of the process in which the trace happened - this isn't relevant unless one wants to correlate the trace with log entries where process identifiers are also logged. Besides the process identifier there are some lines with MQTT SEND
and MQTT RECV
which are to be understood from the perspective of the broker. In the above trace this means that first the broker receives a CONNECT
frame and replies with a CONNACK
frame. Each MQTT event is annotated with the data from the MQTT frame to give as much detail and insight as possible.
Notice the auth_on_register
call between CONNECT
and CONNACK
which is the authentication plugin hook being called to authenticate the client. In this case the hook returned ok
which means the client was successfully authenticated.
Likewise notice the auth_on_subscribe
call between the SUBSCRIBE
and SUBACK
frames which is plugin hook used to authorize if this particular subscription should be allowed or not. In this case the subscription was authorized.
In this section the publish flow is described. VerneMQ provides multiple hooks throughout the flow of a message. The most important ones are the auth_on_publish
and auth_on_publish_m5
hooks which acts as an application level firewall granting or rejecting a publish message.
The auth_on_publish
and auth_on_publish_m5
hooks allow your plugin to grant or reject publish requests sent by a client. It also enables to rewrite the publish topic, payload, qos, or retain flag and in the case of auth_on_publish_m5
properties. The auth_on_publish
hook is specified in the Erlang behaviour and the auth_on_publish_m5
hook in the behaviour available in the repo.
Every plugin that implements the auth_on_publish
or auth_on_publish_m5
hooks are part of a conditional plugin chain. For this reason we allow the hook to return different values. In case the plugin can't validate the publish message it is best to return next
as this would allow subsequent plugins in the chain to validate the request. If no plugin is able to validate the request it gets automatically rejected.
The on_publish
and on_publish_m5
hooks allow your plugin to get informed about an authorized publish message. The hook is specified in the Erlang behaviour and the on_publish_m5
hook in the behaviour available in the repo.
The on_offline_message
hook allows your plugin to get notified about a new a queued message for a client that is currently offline. The hook is specified in the Erlang behaviour available in the repo.
The on_deliver
and on_deliver_m5
hooks allow your plugin to get informed about outgoing publish messages, but also allows you to rewrite topic and payload of the outgoing message. The hook is specified in the Erlang behaviour and the on_deliver_m5
hook in the behaviour available in the repo.
Every plugin that implements the on_deliver
or on_deliver_m5
hooks are part of a conditional plugin chain, although NO verdict is required in this case. The message gets delivered in any case. If your plugin uses this hook to rewrite the message the plugin system stops evaluating subsequent plugins in the chain.
Description and Configuration of the built-in Monitoring mechanism
VerneMQ can be monitored in several ways. We implemented native support for , , and .
The metrics are also available via the command line tool:
Or with:
Which will output the metrics together with a short description describing what the metric is about. An example looks like:
Notice that the metrics:
Are no longer used (always 0) and will be removed in the future. They were replaced with mqtt_connack_sent
using the return_code
label. For MQTT 5.0 the reason_code
label is used instead.
The output on the command line are aggregated by default, but details for a label can be shown as well, for example all metrics with the not_authorized
label:
All available labels can be show using vmq-admin metrics show --help
.
You can configure as many listeners as you wish in the vernemq.conf file. In addition to this, the vmq-admin listener
command let's you configure, start, stop and delete listeners on the fly. Those can be MQTT, WebSocket or Cluster listeners, in the command line output they will be tagged mqtt, ws or vmq accordingly.
Listeners configured with the vmq-admin listener
command will not survive a broker restart. Live changes to listeners configured in vernemq.conf are possible, but the vernemq.conf listeners will just be restarted with a broker restart.
This will start an MQTT listener on port 1884
and IP address 192.168.1.50
. If you want to start a WebSocket listener, just tell VerneMQ by adding the --websocket
flag. There are more options, mainly for configuring SSL (use vmq-admin listener start --help
).
You can isolate client connections accepted by a certain listener from other clients by setting a mountpoint.
To start an MQTT listener using defaults, just set the port and IP address as a minimum.
You can add the -k
or --kill_sessions
switch to that command. This will disconnect all client connections setup by that listener. In combination with a mountpoint, this can be useful for terminating clients for a specific application, or to force re-connects to another cluster node (to prepare for a cluster leave for your node).
VerneMQ provides multiple hooks throughout the lifetime of a session. The most important ones are the auth_on_register
and auth_on_register_m5
hooks which act as an application level firewall granting or rejecting new clients.
The auth_on_register
and auth_on_register_m5
hooks allow your plugin to grant or reject new client connections. Moreover it lets you exert fine grained control over the configuration of the client session. The auth_on_register
hook is specified in the Erlang behaviour and the auth_on_register_m5
hook in the behaviour available in the repo.
Every plugin that implements the auth_on_register
or auth_on_register_m5
hooks are part of a conditional plugin chain. For this reason we allow the hook to return different values depending on how the plugin grants or rejects this client. In case the plugin doesn't know the client it is best to return next
as this would allow subsequent plugins in the chain to validate this client. If no plugin is able to validate the client it gets automatically rejected.
The on_auth_m5
hook allows your plugin to implement MQTT enhanced authentication, see .
The on_register
and on_register_m5
hooks allow your plugin to get informed about a newly authenticated client. The hook is specified in the Erlang behaviour and the behaviour available in the repo.
Once a new client was successfully authenticated and the above described hooks have been called, the client attaches to its queue. If it is a returning client using clean_session=false
or if the client had previous sessions in the cluster, this process could take a while. (As offline messages are migrated to a new node, existing sessions are disconnected). The hook is called at the point where a queue has been successfully instantiated, possible offline messages migrated, and potential duplicate sessions have been disconnected. In other words: when the client has reached a completely initialized, normal state for accepting messages. The hook is specified in the Erlang behaviour on_client_wakeup_hook
available in the repo.
This hook is called if a client using clean_session=false
closes the connection or gets disconnected by a duplicate client. The hook is specified in the Erlang behaviour available in the repo.
This hook is called if a client using clean_session=true
closes the connection or gets disconnected by a duplicate client. The hook is specified in the Erlang behaviour available in the repo.
VerneMQ uses the Erlang distribution mechanism for most inter-node communication. VerneMQ identifies other machines in the cluster using Erlang identifiers (e.g. [email protected]
). Erlang resolves these node identifiers to a TCP port on a given machine via the Erlang Port Mapper daemon (epmd) running on each cluster node.
By default, epmd binds to TCP port 4369 and listens on the wildcard interface. For inter-node communication, Erlang uses an unpredictable port by default; it binds to port 0, which means the first available port.
For ease of firewall configuration, VerneMQ can be configured to instruct the Erlang interpreter to use a limited range of ports. For example, to restrict the range of ports that Erlang will use for inter-Erlang node communication to 6000-7999, add the following lines to vernemq.conf on each VerneMQ node:
The settings above are only used for distributing subscription updates and maintenance messages. For distributing the 'real' MQTT messages the proper vmq
listener must be configured in the vernemq.conf.
Attributions:
This section, "VerneMQ Inter-node Communication", is a derivative of Security and Firewalls by Riak, used under Creative Commons Attribution 3.0 Unported License.
The VerneMQ health checker
A simple way to gauge the health of a VerneMQ cluster is to query the /health
path on the .
The health check will return 200 when VerneMQ is accepting connections and is joined with the cluster (for clustered setups). 503 will be returned in case any of those two conditions are not met.
vmq-admin trace client client-id=<client-id>
$ vmq-admin trace client client-id=client
No sessions found for client "client"
New session with PID <7616.3443.1> found for client "client"
<7616.3443.1> MQTT RECV: CID: "client" CONNECT(c: client, v: 4, u: username, p: password, cs: 1, ka: 30)
<7616.3443.1> Calling auth_on_register({{172,17,0,1},34274},{[],<<"client">>},username,password,true)
<7616.3443.1> Hook returned "ok"
<7616.3443.1> MQTT SEND: CID: "client" CONNACK(sp: 0, rc: 0)
<7616.3443.1> MQTT RECV: CID: "client" SUBSCRIBE(m1) with topics:
q:0, t: "topic"
<7616.3443.1> Calling auth_on_subscribe(username,{[],<<"client">>}) with topics:
q:0, t: "topic"
<7616.3443.1> Hook returned "ok"
<7616.3443.1> MQTT SEND: CID: "client" SUBACK(m1, qt[0])
<7616.3443.1> Trace session for client stopped
vmq-admin listener show
+----+-------+------------+-----+----------+---------+
|type|status | ip |port |mountpoint|max_conns|
+----+-------+------------+-----+----------+---------+
|vmq |running|192.168.1.50|44053| | 30000 |
|mqtt|running|192.168.1.50|1883 | | 30000 |
+----+-------+------------+-----+----------+---------+
`
vmq-admin listener start address=192.168.1.50 port=1884 --mountpoint /test --nr_of_acceptors=10 --max_connections=1000
vmq-admin listener stop address=192.168.1.50 port=1884
vmq-admin listener restart address=192.168.1.50 port=1884
vmq-admin listener delete address=192.168.1.50 port=1884
allow_multiple_sessions = on
queue_deliver_mode = balance
erlang.distribution.port_range.minimum = 6000
erlang.distribution.port_range.maximum = 7999
listener.vmq.clustering = 0.0.0.0:44053
vmq-admin metrics show
vmq-admin metrics show -d
# The number of AUTH packets received.
counter.mqtt_auth_received = 0
# The number of times a MQTT queue process has been initialized from offline storage.
counter.queue_initialized_from_storage = 0
# The number of PUBLISH packets sent.
counter.mqtt_publish_sent = 10
# The number of bytes used for storing retained messages.
gauge.retain_memory = 21184
mqtt_connack_not_authorized_sent
mqtt_connack_bad_credentials_sent
mqtt_connack_server_unavailable_sent
mqtt_connack_identifier_rejected_sent
mqtt_connack_unacceptable_protocol_sent
mqtt_connack_accepted_sent
vmq-admin metrics show --return_code=not_authorized
counter.mqtt_connack_sent = 0
VerneMQ can be easily clustered. Clients can then connect to any cluster node and receive messages from any other cluster nodes. However, the MQTT specification gives certain guarantees that are hard to fulfill in a distributed environment, especially when network partitions occur. We'll discuss the way VerneMQ deals with network partitions in its own subsection
Set the Cookie! All cluster nodes need to be configured to use the same Cookie value. It can be set in the vernemq.conf
with the distributed_cookie
setting. Set the Cookie to a private value for security reasons!
vmq-admin cluster join discovery-node=<OtherClusterNode>
vmq-admin cluster leave node=<NodeThatShouldGo> (only the first step!)
A cluster leave will actually do a lot more work, and gives you some options to choose. The node leaving the cluster will go to great length trying to migrate its existing queues to other nodes. As queues (online or offline) are live processes in a VerneMQ node, it will only exit after it has migrated them.
Let's look at the steps in detail:
vmq-admin cluster leave node=<NodeThatShouldGo>
This first step will only stop the MQTT Listeners of the node to ensure that no new connections are accepted. It will not interrupt the existing connections, and behind the scenes the node will not leave the cluster yet. Existing clients are still able to publish and receive messages at this point.
The idea is to give a grace period with the hope that existing clients might re-connect (to another node). If you have decided that this period is over (after 5 minutes or 1 day is up to you), you proceed with step 2: disconnecting the rest of the clients.
vmq-admin cluster leave node=<NodeThatShouldGo> -k
The -k
flag will delete the MQTT Listeners of the leaving node, taking down all live connections. If this is what you want from the beginning, you can do this right away as a first step.
Now, queue migration is triggered by clients re-connecting to other nodes. They will claim their queue and it will get migrated. Still, there might be some offline queues remaining on the leaving node, because they were pre-existing or because some clients do not re-connect and do not reclaim their queues.
VerneMQ will throw an exception if there are remaining offline queues after a configurable timeout. The default is 60 seconds, but you can set it as an option to the cluster leave command. As soon as the exception shows in console or console.log, you can actually retry the cluster leave command (including setting a migration timeout (-t
), and an interval in seconds (-i
) indicating how often information on the migration progress should be printed to the console.log):
vmq-admin cluster leave node=<NodeThatShouldGo> -k -i 5 -t 120
After this timeout VerneMQ will forcefully migrate the remaining offline queues to other cluster nodes in a round robin manner. After doing that, it will stop the leaving VerneMQ node.
So, case A was the happy case. You left the cluster with your node in a controlled manner, and everything worked, including a complete queue (and message) transfer to other nodes.
Let's look at the second possibility where the node is already down. Your cluster is still counting on it though and possibly blocking new subscription for that reason, so you want to make the node leave.
To do this, use the same command(s) as in the first case. There is one important consequence to note: by making a stopped node leave, you basically throw away persistant queue content, as VerneMQ won't be able to migrate or deliver it.
Let's repeat that to make sure:
Case B: Currently the persisted QoS 1 & QoS 2 messages aren't replicated to the other nodes by the default message store backend. Currently you will lose the offline messages stored on the leaving node.
vmq-admin cluster show
Inspecting and managing MQTT sessions
VerneMQ comes with powerful tools for inspecting the state of MQTT sessions. To list current MQTT sessions simply invoke vmq-admin session show
:
$ vmq-admin session show
+---------+---------+----------+---------+---------+---------+
|client_id|is_online|mountpoint|peer_host|peer_port| user |
+---------+---------+----------+---------+---------+---------+
| client2 | true | |127.0.0.1| 37098 |undefined|
| client1 | true | |127.0.0.1| 37094 |undefined|
+---------+---------+----------+---------+---------+---------+
To see detailed information about the command see vmq-admin session show --help
.
The command is able to show a lot of different information about a client, for example the client id, the peer host and port if the client is online or offline and much more, see vmq-admin session show --help
for details. Further the information can also be used to filter information which is very helpful when wanting to narrow down the information to a single client.
A sample query which lists only the node where the client session exists and if the client is online would look like the following:
$ vmq-admin session show --node --is_online --client_id=client1
+---------+--------------+
|is_online| node |
+---------+--------------+
| true |[email protected]|
+---------+--------------+
Note, by default a maximum of 100 rows are returned from each node in the cluster. This is a mechanism to protect the cluster from overload as there can be millions of MQTT sessions and resulting rows. Use --limit=<RowLimit>
to override the default value.
Listing the clients and the subscriptions one can do the following:
$ vmq-admin session show --topic --client_id
+---------+----------------+
|client_id| topic |
+---------+----------------+
| client2 |some/other/topic|
| client1 |some/other/topic|
| client1 | some/topic |
+---------+----------------+
And to list only the clients subscribed to the topic some/topic
:
$ vmq-admin session show --topic --client_id --topic=some/topic
+---------+----------+
|client_id| topic |
+---------+----------+
| client1 |some/topic|
+---------+----------+
To figure out when the queue for a persisted session (clean_session=false) was created and when the client last connected one can use the --queue_started_at
and --session_started_at
to list the POSIX timestamps (in microseconds):
$ vmq-admin session show --client_id=client1 --queue_started_at --session_started_at
+----------------+------------------+
|queue_started_at|session_started_at|
+----------------+------------------+
| 1549379963575 | 1549379974905 |
+----------------+------------------+
Besides the examples above it is also possible to inspect the number of online or offline messages as well as their payloads and much more. See vmq-admin session show --help
for an exhaustive list of all the available options.
VerneMQ also supports disconnecting clients and reauthorizing client subscriptions. To disconnect a client and cleanup store messages and remove subscriptions one can invoke:
$ vmq-admin session disconnect client-id=client1 --cleanup
See vmq-admin session disconnect --help
for more options and details.
To reauthorize subscriptions for a client issue the following command:
$ vmq-admin session reauthorize username=username client-id=client1
Unchanged
This works by reapplying the logic in any installed auth_on_subscribe
or auth_on_subscribe_m5
plugin hooks to check the validity of the existing topics and removing those that are no longer allowed. In the example above the reauthorization of the client subscriptions resulted in no changes.
VerneMQ is implemented in Erlang OTP and therefore runs on top of the Erlang VM. For this reason plugins have to be developed in a programming language that runs on the Erlang VM. The most popular choice is obviously the Erlang programming language itself, but Elixir or Lisp flavoured Erlang LFE could be used too.
Be aware that in VerneMQ a plugin does NOT run in a sandboxed environment and misbehaviour could seriously harm the system (e.g. performance degradation, reduced availability as well as consistency, and message loss). Get in touch with us in case you require a review of your plugin.
This guide explains the different flows that expose different hooks to be used for custom plugins. It also describes the code structure a plugin must comply to in order to be successfully loaded and started by the VerneMQ plugin mechanism.
All the hooks that are currently exposed fall into one of three categories.
Hooks that allow you to change the protocol flow. An example could be to authenticate a client using the auth_on_register
hook.
Hooks that inform you about a certain action, that could be used for example to implement a custom logging or audit plugin.
Hooks that are called given a certain condition
Notice that some hooks come in two variants, for example the auth_on_register
and then auth_on_register_m5
hooks. The _m5
postfix refers to the fact that this hook is only invoked in an MQTT 5.0 session context whereas the other is invoked in a MQTT 3.1/3.1.1 session context.
Before going into the details, let's give a quick intro to the VerneMQ plugin system.
The VerneMQ plugin system allows you to load, unload, start and stop plugins during runtime, and you can even upgrade a plugin during runtime. To make this work it is required that the plugin is an OTP application and strictly follows the rules of implementing the Erlang OTP application behaviour. It is recommended to use the rebar3
toolchain to compile the plugin. VerneMQ comes with built-in support for the directory structure used by rebar3
.
Every plugin has to describe the hooks it is implementing as part of its application environment file. The vmq_acl
plugin for instance comes with the application environment file below:
Lines 6 to 10 instruct the plugin system to ensure that those dependent applications are loaded and started. If you're using third party dependencies make sure that they are available in compiled form and part of the plugin load path. Lines 16 to 20 allow the plugin system to compile the plugin rules. Yes, you've heard correctly. The rules are compiled into Erlang VM code to make sure the lookup and execution of plugin code is as fast as possible. Some hooks exist which are used internally such as the change_config/1
, we'll describe those at some other point.
The environment value for vmq_plugin_hooks
is a list of hooks. A hook is specified by {Module, Function, Arity, Options}
.
To streamline the plugin development we provide a different Erlang behaviour for every hook a plugin implements. Those behaviours are part of the vernemq_dev
library application, which you should add as a dependency to your plugin. vernemq_dev
also comes with a header file that contains all the type definitions used by the hooks.
It is possible to have multiple plugins serving the same hook. Depending on the hook the plugin chain is used differently. The most elaborate chains can be found for the hooks that deal with authentication and authorization flows. We also call them conditional chains as a plugin can give control away to the next plugin in the chain. The image show a sample plugin chain for the auth_on_register
hook.
Most hooks don't require conditions and are mainly used as event handlers. In this case all plugins in a chain are called. An example for such a hook would be the on_register
hook.
A rather specific case is the need to call only one plugin instead of iterating through the whole chain. VerneMQ uses such hooks for it's pluggable message storage system.
Unless you're implementing your custom message storage backend, you probably won't need this style of hook.
The plugin mechanism uses the application environment file to infer the applications that it has to load and start prior to starting the plugin itself. It internally uses the application:ensure_all_started/1
function call to start the plugin. If your setup is more complex you could override this behaviour by implementing a custom start/0
function inside a module that's named after your plugin.
The plugin mechanism uses application:stop/1
to stop and unload the plugin. This won't stop the dependent application started at startup. If you rely on third party applications that aren't started as part of the VerneMQ release, e.g. a database driver, you can implement a custom stop/0
function inside a module that's named after your plugin and properly stop the driver there.
The vmq_types.hrl
exposes all the type specs used by the hooks. The following types are used by the plugin system:
You need to know about and configure a couple of Operating System and Erlang VM configs to operate VerneMQ efficiently. First, make sure you have set appropriate OS file limits according to our . Second, when you run into performance problems, don't forget to check the . (Can't open more than 10k connections? Well, is the listener configured to open more than 10k?)
This is the number one topic to look at, if you need to keep an eye on RAM usage.
Context: All network I/O in Erlang uses an internal driver. This driver will allocate and handle an internal application side buffer for every TCP connection. The default size of these buffers will determine your overall RAM use.
VerneMQ calculates the buffer size from the OS level TCP send and receive buffers:
val(buffer) >= max(val(sndbuf),val(recbuf))
Those values correspond to net.ipv4.tcp_wmem
and net.ipv4.tcp_rmem
in your OS's sysctl configuration. One way to minimize RAM usage is therefore to configure those settings (Debian example):
This would result in a 32KB application buffer for every connection. On a multi-purpose server where you install VerneMQ as a test, you might not want to change your OS's TCP settings, of course. In that case, you can still configure the buffer sizes manually for VerneMQ by using the advanced.config
file.
The advanced.config
file is a supplementary configuration file that sits in the same directory as the vernemq.conf
. You can set additional config values for any of the OTP applications that are part of a VerneMQ release. To just configure the TCP buffer size manually, you can create an advanced.config
file:
For very advanced & custom configurations, you can add a vm.args
file to the same directory where the vernemq.conf
file is located. Its purpose is to configure parameters for the Erlang Virtual Machine. This will override any Erlang specific parameters your might have configured via the vernemq.conf
. Normally, VerneMQ auto-generates a vm.args file for every boot in /var/lib/vernemq/generated.configs/
(Debian package example) from vernemq.conf
and other potential configuration sources.
This is how a vm.args
might look like:
Using TLS will of course increase the CPU load during connection setup. Latencies in message delivery will be increased, and your overall message throughput per second will be lower.
TLS will require considerably more RAM. Instead of 2 Erlang processes per connection, TLS will use 3. You'll have a session process, a queue process, and a TLS handler process that can encapsulate quite a big state (> 30KB).
Erlang/OTP uses it's own TLS implementation, only using OpenSSL for crypto, but not connection handling. For situations with high connection setup rate or overall high connection churn rate, the Erlang TLS implementation might be too slow. On the other hand, Erlang TLS gives you great concurrency & fault isolation for long-lived connections.
Some Erlang deployments terminate SSL/TLS with an external component or with a load balancer component. Do some testing & try to find out what works best for you.
sudo sysctl -w net.ipv4.tcp_rmem="4096 16384 32768"
sudo sysctl -w net.ipv4.tcp_wmem="4096 16384 32768"
# Nope, these values are not recommendations!
# You really need to decide yourself.
[{vmq_server, [
{tcp_listen_options,
[{sndbuf, 4096},
{recbuf, 4096}]}]}].
+P 256000
-env ERL_MAX_ETS_TABLES 256000
-env ERL_CRASH_DUMP /erl_crash.dump
-env ERL_FULLSWEEP_AFTER 0
-env ERL_MAX_PORTS 262144
+A 64
-setcookie vmq # Important: Use your own private cookie...
-name [email protected]
+K true
+sbt db
+sbwt very_long
+swt very_low
+sub true
+Mulmbcs 32767
+Mumbcgs 1
+Musmbcs 2047
# Nope, these values are not recommendations!
# You really need to decide yourself, again ;)
{application, vmq_acl,
[
{description, "Simple File based ACL for VerneMQ"},
{vsn, git},
{registered, []},
{applications, [
kernel,
stdlib,
clique
]},
{mod, { vmq_acl_app, []}},
{env, [
{file, "priv/test.acl"},
{interval, 10},
{vmq_config_enabled, true},
{vmq_plugin_hooks, [
{vmq_acl, change_config, 1, [internal]},
{vmq_acl, auth_on_publish, 6, []},
{vmq_acl, auth_on_subscribe, 3, []}
]}
]}
]}.
-type peer() :: {inet:ip_address(), inet:port_number()}.
-type username() :: binary() | undefined.
-type password() :: binary() | undefined.
-type client_id() :: binary().
-type mountpoint() :: string().
-type subscriber_id() :: {mountpoint(), client_id()}.
-type reg_view() :: atom().
-type topic() :: [binary()].
-type qos() :: 0 | 1 | 2.
-type routing_key() :: [binary()].
-type payload() :: binary().
-type flag() :: boolean().
VerneMQ can consume a large number of open file handles when thousands of clients are connected as every connection requires at least one file handle.
Most operating systems can change the open-files limit using the ulimit -n
command. Example:
ulimit -n 65536
However, this only changes the limit for the current shell session. Changing the limit on a system-wide, permanent basis varies more between systems.
On most Linux distributions, the total limit for open files is controlled by sysctl
.
sysctl fs.file-max
fs.file-max = 50384
As seen above, it is generally set high enough for VerneMQ. If you have other things running on the system, you might want to consult the sysctl manpage manpage for how to change that setting. However, what most needs to be changed is the per-user open files limit. This requires editing /etc/security/limits.conf
, for which you'll need superuser access. If you installed VerneMQ from a binary package, add lines for the vernemq
user like so, substituting your desired hard and soft limits:
vernemq soft nofile 4096
vernemq hard nofile 65536
On Ubuntu, if you’re always relying on the init scripts to start VerneMQ, you can create the file /etc/default/vernemq and specify a manual limit like so:
ulimit -n 65536
This file is automatically sourced from the init script, and the VerneMQ process started by it will properly inherit this setting. As init scripts are always run as the root user, there’s no need to specifically set limits in /etc/security/limits.conf
if you’re solely relying on init scripts.
On CentOS/RedHat systems, make sure to set a proper limit for the user you’re usually logging in with to do any kind of work on the machine, including managing VerneMQ. On CentOS, sudo
properly inherits the values from the executing user.
It can be helpful to enable PAM user limits so that non-root users, such as the vernemq
user, may specify a higher value for maximum open files. For example, follow these steps to enable PAM user limits and set the soft and hard values for all users of the system to allow for up to 65536 open files.
Edit /etc/pam.d/common-session
and append the following line:
session required pam_limits.so
If /etc/pam.d/common-session-noninteractive
exists, append the same line as above.
Save and close the file.
Edit /etc/security/limits.conf
and append the following lines to the file:
* soft nofile 65536
* hard nofile 65536
Save and close the file.
(optional) If you will be accessing the VerneMQ nodes via secure shell (ssh), you should also edit /etc/ssh/sshd_config
and uncomment the following line:
#UseLogin no
and set its value to yes
as shown here:
UseLogin yes
Restart the machine so that the limits to take effect and verify
that the new limits are set with the following command:
ulimit -a
Edit /etc/security/limits.conf
and append the following lines to
the file:
* soft nofile 65536
* hard nofile 65536
Save and close the file.
Restart the machine so that the limits to take effect and verify that the new limits are set with the following command:
ulimit -a
In Solaris 8, there is a default limit of 1024 file descriptors per process. In Solaris 9, the default limit was raised to 65536. To increase the per-process limit on Solaris, add the following line to /etc/system
:
set rlim_fd_max=65536
Reference:
To check the current limits on your Mac OS X system, run:
launchctl limit maxfiles
The last two columns are the soft and hard limits, respectively.
To adjust the maximum open file limits in OS X 10.7 (Lion) or newer, edit /etc/launchd.conf
and increase the limits for both values as appropriate.
For example, to set the soft limit to 16384 files, and the hard limit to 32768 files, perform the following steps:
Verify current limits:
launchctl limit
The response output should look something like this:
cpu unlimited unlimited filesize unlimited unlimited data unlimited unlimited stack 8388608 67104768 core 0 unlimited rss unlimited unlimited memlock unlimited unlimited maxproc 709 1064 maxfiles 10240 10240
Edit (or create) /etc/launchd.conf
and increase the limits. Add lines that look like the following (using values appropriate to your environment):
limit maxfiles 16384 32768
Save the file, and restart the system for the new limits to take effect. After restarting, verify the new limits with the launchctl limit command:
launchctl limit
The response output should look something like this:
cpu unlimited unlimited filesize unlimited unlimited data unlimited unlimited stack 8388608 67104768 core 0 unlimited rss unlimited unlimited memlock unlimited unlimited maxproc 709 1064 maxfiles 16384 32768
Attributions
This work, "Open File Limits", is a derivative of Open File Limits by Riak, used under Creative Commons Attribution 3.0 Unported License. "Open File Limits" is licensed under Creative Commons Attribution 3.0 Unported License by Erlio GmbH.
VerneMQ supports multiple ways to authenticate and authorize new client connections using a database.
VerneMQ supports authentication and authorization using a number of popular databases and the below sections describe how to configure the different databases.
The database drivers are handled using the vmq_diversity
plugin and it therefore needs to be enabled:
plugins.vmq_diversity = on
When using database based authentication/authorization the enabled-by-default file based authentication and authorization are most likely not needed and should be disabled:
plugins.vmq_passwd = off
plugins.vmq_acl = off
You must set allow_anonymous = off
, otherwise VerneMQ won't use the database plugin for authentication and authorization.
In order to use a database for authentication and authorization the database must be properly configured and the auth-data (username, clientid, password, acls) to be present. The following sections show some sample requests that can be used to insert such data.
While the handling of authentication differs among the different databases, the handling of ACLs is roughly identical and make use of a JSON array containing one or many ACL objects per configured client.
A minimal publish & subscribe ACL JSON object takes the following form:
General ACL
{
"pattern": "a/+/c"
}
The pattern is a MQTT topic string that can contain MQTT wildcards, but also the template variables %m
(mountpoint), %u
(username), and %c
(client id) which are automatically substituted with the auth data provided.
Publish ACL
The publish ACL makes it possible to control the maximum QoS and payload size that is allowed, and if the message is allowed to be retained.
{
"pattern": "a/+/c",
"max_qos": 2,
"max_payload_size": 128,
"allowed_retain": true
}
Moreover, the publish ACL makes it possible to modify the properties of a published message through specifying one or multiple modifiers
. Please note that the modified message isn't re-validated by the ACL.
{
"pattern": "a/+/c",
"max_qos": 2,
"max_payload_size": 128,
"allowed_retain": true,
"modifiers": {
"topic": "new/topic",
"payload": "new payload",
"qos": 2,
"retain": true,
"mountpoint": "other-mountpoint"
}
}
Subscribe ACL
The subscribe ACL makes it possible to control the maxium QoS a client is allowed to subscribe to.
{
"pattern": "a/+/c",
"max_qos": 2
}
Like the publish ACL, the subscribe ACL makes it possible to change the current subscription request by returning a custom set of topic/qos pairs. Please note that the modified subscription isn't re-validated by the ACL.
{
"pattern": "a/+/c",
"max_qos": 2,
"modifiers": [
["new/topic/1", 1],
["new/topic/2", 1]
]
}
When deciding on which database to use one has to consider which kind of password hashing and key derivation functions are available and required. Different databases provide different mechanisms, for example PostgreSQL provides the pgcrypto
module which supports verifying hashed and salted passwords, while Redis has no such features. VerneMQ therefore also provides client-side password verification mechanisms such as bcrypt
.
There is a trade-off between verifying passwords on the client-side versus on the server-side. Verifying passwords client-side of course means doing the computations on the VerneMQ broker and this takes away resources from other tasks such as routing messages. With hashing functions such as bcrypt
which are designed specifically to be slow (proportional to the number of rounds) in order to make brute-force attacks infeasible, this can become a problem. For example, if verifying a password with bcrypt
takes 0.5 seconds then on a single threaded core 2 verifications/second are possible and using 4 single threaded cores 8 verifications/second. So, the number of rounds/security paramenters have a direct impact on the max number of verifications/second and hence also the maximum arrival rate of new clients per second.
For each database it is specified which password verification mechanisms are available and if they are client-side or server-side.
To enable PostgreSQL authentication and authorization the following need to be configured in the vernemq.conf
file:
vmq_diversity.auth_postgres.enabled = on
vmq_diversity.postgres.host = 127.0.0.1
vmq_diversity.postgres.port = 5432
vmq_diversity.postgres.user = vernemq
vmq_diversity.postgres.password = vernemq
vmq_diversity.postgres.database = vernemq_db
vmq_diversity.postgres.password_hash_method = crypt
PostgreSQL hashing methods:
method
client-side
server-side
bcrypt
✓
crypt
✓
The following SQL DDL must be applied, the pgcrypto
extension is required if using the server-side crypt
hashing method:
CREATE EXTENSION pgcrypto;
CREATE TABLE vmq_auth_acl
(
mountpoint character varying(10) NOT NULL,
client_id character varying(128) NOT NULL,
username character varying(128) NOT NULL,
password character varying(128),
publish_acl json,
subscribe_acl json,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
);
To enter new ACL entries use a query similar to the following:
WITH x AS (
SELECT
''::text AS mountpoint,
'test-client'::text AS client_id,
'test-user'::text AS username,
'123'::text AS password,
gen_salt('bf')::text AS salt,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS publish_acl,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS subscribe_acl
)
INSERT INTO vmq_auth_acl (mountpoint, client_id, username, password, publish_acl, subscribe_acl)
SELECT
x.mountpoint,
x.client_id,
x.username,
crypt(x.password, x.salt),
publish_acl,
subscribe_acl
FROM x;
To enable PostgreSQL authentication and authorization the following need to be configured in the vernemq.conf
file:
vmq_diversity.auth_cockroachdb.enabled = on
vmq_diversity.cockroachdb.host = 127.0.0.1
vmq_diversity.cockroachdb.port = 26257
vmq_diversity.cockroachdb.user = vernemq
vmq_diversity.cockroachdb.password = vernemq
vmq_diversity.cockroachdb.database = vernemq_db
vmq_diversity.cockroachdb.ssl = on
vmq_diversity.cockroachdb.password_hash_method = bcrypt
Notice that if the CockroachDB installation is secure, then TLS is required. If using an insecure installation without TLS, then vmq_diversity.cockroachdb.ssl
can be set to off
.
CockroachDB hashing methods:
method
client-side
server-side
bcrypt
✓
sha256
✓
The following SQL DDL must be applied:
CREATE TABLE vmq_auth_acl
(
mountpoint character varying(10) NOT NULL,
client_id character varying(128) NOT NULL,
username character varying(128) NOT NULL,
password character varying(128),
publish_acl json,
subscribe_acl json,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
);
To enter new ACL entries use a query similar to the following, the example is for the bcrypt
hashing method:
WITH x AS (
SELECT
''::text AS mountpoint,
'test-client1'::text AS client_id,
'test-user1'::text AS username,
'$2a$12$97PlnSsouvCV7HaxDPV80.EXfsKM4Fg7DAwWhSbGJ6O5CpNep20n2'::text AS hash,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS publish_acl,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS subscribe_acl
)
INSERT INTO vmq_auth_acl (mountpoint, client_id, username, password, publish_acl, subscribe_acl)
SELECT
x.mountpoint,
x.client_id,
x.username,
x.hash,
publish_acl,
subscribe_acl
FROM x;
For MySQL authentication and authorization configure the following in vernemq.conf
:
vmq_diversity.auth_mysql.enabled = on
vmq_diversity.mysql.host = 127.0.0.1
vmq_diversity.mysql.port = 3306
vmq_diversity.mysql.user = vernemq
vmq_diversity.mysql.password = vernemq
vmq_diversity.mysql.database = vernemq_db
vmq_diversity.mysql.password_hash_method = password
MySQL hashing methods:
method
client-side
server-side
sha256
✓
md5*
✓
sha1*
✓
password
✓
It should be noted that all the above options stores unsalted passwords which are vulnerable to rainbow table attacks, so the threat-model should be considered carefully when using these. Also note the methods marked with *
are no longer considered secure hashes.
The following SQL DDL must be applied:
CREATE TABLE vmq_auth_acl
(
mountpoint VARCHAR(10) NOT NULL,
client_id VARCHAR(128) NOT NULL,
username VARCHAR(128) NOT NULL,
password VARCHAR(128),
publish_acl TEXT,
subscribe_acl TEXT,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
)
To enter new ACL entries use a query similar to the following, the example uses PASSWWORD
to for password hashing:
INSERT INTO vmq_auth_acl
(mountpoint, client_id, username,
password, publish_acl, subscribe_acl)
VALUES
('', 'test-client', 'test-user', PASSWORD('123'),
'[{"pattern":"a/b/c"},{"pattern":"c/b/#"}]',
'[{"pattern":"a/b/c"},{"pattern":"c/b/#"}]');
Note, the PASSWORD()
hashing method needs to be changed according to the configuration set in vmq_diversity.mysql.password_hash_method
, it supports the options password
, md5
, sha1
and sha256
. Learn more about the MySQL equivalent for those methods on https://dev.mysql.com/doc/refman/8.0/en/encryption-functions.html.
The default password
method has been deprecated since MySQL 5.7.6 and not usable with MySQL 8.0.11+. Also, the MySQL authentication method caching_sha2_password
is not supported. This is the default in MySQL 8.0.4 and later, so you need to add: default_authentication_plugin=mysql_native_password
under [mysqld] in e.g. /etc/mysql/my.cnf.
For MongoDB authentication and authorization configure the following in vernemq.conf
:
vmq_diversity.auth_mongodb.enabled = on
vmq_diversity.mongodb.host = 127.0.0.1
vmq_diversity.mongodb.port = 27017
# vmq_diversity.mongodb.login =
# vmq_diversity.mongodb.password =
# vmq_diversity.mongodb.database =
MongoDB hashing methods:
method
client-side
server-side
bcrypt
✓
Insert the ACL using the mongo
shell or any software library. The passhash
property contains the bcrypt hash of the clients password.
db.vmq_acl_auth.insert({
mountpoint: '',
client_id: 'test-client',
username: 'test-user',
passhash: '$2a$12$WDzmynWSMRVzfszQkB2MsOWYQK9qGtfjVpO8iBdimTOjCK/u6CzJK',
publish_acl: [
{pattern: 'a/b/c'},
{pattern: 'a/+/d'}
],
subscribe_acl: [
{pattern: 'a/#'}
]
})
For Redis authentication and authorization configure the following in vernemq.conf
:
vmq_diversity.auth_redis.enabled = on
vmq_diversity.redis.host = 127.0.0.1
vmq_diversity.redis.port = 6379
# vmq_diversity.redis.password =
# vmq_divserity.redis.database = 0
Redis hashing methods:
method
client-side
server-side
bcrypt
✓
Insert the ACL using the redis-cli
shell or any software library. The passhash
property contains the bcrypt hash of the clients password. The key is an encoded JSON array containing the mountpoint, username, and client id. Note that no spaces are allowed between the array items.
SET "[\"\",\"test-client\",\"test-user\"]" "{\"passhash\":\"$2a$12$WDzmynWSMRVzfszQkB2MsOWYQK9qGtfjVpO8iBdimTOjCK/u6CzJK\",\"subscribe_acl\":[{\"pattern\":\"a/+/c\"}]}"
Note, currently bcrypt version 2a
(prefix $2a$
) is supported.
Bridges are a non-standard way, although kind of a de-facto standard among MQTT broker implementations, to connect two different MQTT brokers to eachother. This allows for example that a topic tree of a remote broker becomes part of the topic tree on the local broker. VerneMQ supports plain TCP connections as well as SSL connections.
in VerneMQ the bridge is distributed with VerneMQ as a plugin and is not enabled by default. After configuring the bridge as described below, make sure to enable the plugin by setting:
plugins.vmq_bridge = on
See Managing plugins for more information on working with plugins.
When the plugin is enabled a simple status interface is available:
$ vmq-admin bridge show
+-----------------+-----------+----------+-------------------+
| endpoint |buffer size|buffer max|buffer dropped msgs|
+-----------------+-----------+----------+-------------------+
|192.168.1.10:1883| 0 | 0 | 0 |
+-----------------+-----------+----------+-------------------+
Setup a bridge to a remote broker:
vmq_bridge.tcp.br0 = 192.168.1.12:1883
Different connection parameters can be set:
# use a clean session (defaults to 'off')
vmq_bridge.tcp.br0.cleansession = off | on
# set the client id (defaults to 'auto', which generates one)
vmq_bridge.tcp.br0.client_id = auto | my_bridge_client_id
# set keepalive interval (defaults to 60 seconds)
vmq_bridge.tcp.br0.keepalive_interval = 60
# set the username and password for the bridge connection
vmq_bridge.tcp.br0.username = my_bridge_user
vmq_bridge.tcp.br0.password = my_bridge_pwd
# set the restart timeout (defaults to 10 seconds)
vmq_bridge.tcp.br0.restart_timeout = 10
# VerneMQ indicates other brokers that the connection
# is established by a bridge instead of a normal client.
# This can be turned off if needed:
vmq_bridge.tcp.br0.try_private = off
# Set the maximum number of outgoing messages the bridge will buffer
# while not connected to the remote broker. Messages published while
# the buffer is full are dropped. A value of 0 means buffering is
# disabled.
vmq_bridge.tcp.br0.max_outgoing_buffered_messages = 100
Define the topics the bridge should incorporate in its local topic tree (by subscribing to the remote), or the topics it should export to the remote broker (by publishing to the remote). We share a similar configuration syntax to that used by the Mosquitto broker:
topic [[[ out | in | both ] qos-level] local-prefix remote-prefix]
topic
defines a topic pattern that is shared between the two brokers. Any topics matching the pattern (which may include wildcards) are shared. The second parameter defines the direction that the messages will be shared in, so it is possible to import messages from a remote broker usingin
, export messages to a remote broker usingout
or share messages inboth
directions. If this parameter is not defined, VerneMQ defaults toout
. The QoS level defines the publish/subscribe QoS level used for this topic and defaults to0
. (Source: mosquitto.conf)
The local-prefix
and remote-prefix
can be used to prefix incoming or outgoing publish messages.
Currently the #
wildcard is treated as a comment from the configuration parser, please use *
instead.
A simple example:
# share messages in both directions and use QoS 1
vmq_bridge.tcp.br0.topic.1 = /demo/+ both 1
# import the $SYS tree of the remote broker and
# prefix it with the string 'remote'
vmq_bridge.tcp.br0.topic.2 = $SYS/* in remote
SSL bridges support the same configuration parameters as TCP bridges, but need further instructions for handling the SSL specifics:
# define the CA certificate file or the path to the
# installed CA certificates
vmq_bridge.ssl.br0.cafile = cafile.crt
#or
vmq_bridge.ssl.br0.capath = /path/to/cacerts
# if the remote broker requires client certificate authentication
vmq_bridge.ssl.br0.certfile = /path/to/certfile.pem
# and the keyfile
vmq_bridge.ssl.br0.keyfile = /path/to/keyfile
# disable the verification of the remote certificate (defaults to 'off')
vmq_bridge.ssl.br0.insecure = off
# set the used tls version (defaults to 'tlsv1.2')
vmq_bridge.ssl.br0.tls_version = tlsv1.2
VerneMQ comes with a simple file-based password authentication mechanism which is enabled by default. If you don't need this it can be disabled by setting:
plugins.vmq_passwd = off
Per default VerneMQ doesn't accept any client that hasn't been configured using vmq-passwd
. If you want to change this and accept any client connection you can set:
allow_anonymous = on
In a production setup we recommend to use the provided password based authentication mechanism or implement your own authentication plugins.
VerneMQ periodically checks the specified password file.
vmq_passwd.password_file = /etc/vernemq/vmq.passwd
The check interval defaults to 10 seconds and can also be defined in the vernemq.conf
.
vmq_passwd.password_reload_interval = 10
Setting the password_reload_interval = 0
disables automatic reloading.
vmq-passwd
is a tool for managing password files for the VerneMQ broker. Usernames must not contain ":"
, passwords are stored in similar format to crypt(3).
How to use vmq-passwd
vmq-passwd [-c | -D] passwordfile username
vmq-passwd -U passwordfile
Options
-c
Creates a new password file. If the file already exists, it will be overwritten.
-D
Deletes the specified user from the password file.
-U
This option can be used to upgrade/convert a password file with plain text passwords into one using hashed passwords. It will modify the specified file. It does not detect whether passwords are already hashed, so using it on a password file that already contains hashed passwords will generate new hashes based on the old hashes and render the password file unusable. Note, with this option neither usernames or passwords may contain
":"
.
passwordfile
The password file to modify.
username
The username to add/update/delete.
Examples
Add a user to a new password file: (you can choose an arbitrary name for the password file, it only has to match the configuration in the VerneMQ configuration file).
vmq-passwd -c /etc/vernemq/vmq.passwd henry
Delete a user from a password file
vmq-passwd -D /etc/vernemq/vmq.passwd henry
Acknowledgements
The original version of vmq-passwd
was developed by Roger Light ([email protected]).
vmq-passwd
includes :
software developed by the [OpenSSL
Project](http://www.openssl.org/) for use in the OpenSSL Toolkit.
cryptographic software written by Eric Young
software written by Tim Hudson ([email protected])
VerneMQ comes with a simple ACL based authorization mechanism which is enabled by default. If you don't need this it can be disabled by setting:
plugins.vmq_acl = off
VerneMQ periodically checks the specified ACL file.
vmq_acl.acl_file = /etc/vernemq/vmq.acl
The check interval defaults to 10 seconds and can also be defined in the vernemq.conf
.
vmq_acl.acl_reload_interval = 10
Setting the acl_reload_interval = 0
disables automatic reloading.
Topic access is added with lines of the format:
topic [read|write] <topic>
The access type is controlled using read
or write
. If not provided then read an write access is granted for the topic
. The topic
can use the MQTT subscription wildcards +
or #
.
The first set of topics are applied to all anonymous clients (assuming allow_anonymous = on
). User specific ACLs are added after a user line as follows (this is the username not the client id):
user <username>
It is also possible to define ACLs based on pattern substitution within the the topic. The form is the same as for the topic keyword, but using pattern as the keyword.
pattern [read|write] <topic>
The patterns available for substitution are:
%c
to match the client id of the client
%u
to match the username of the client
The substitution pattern must be the only text for that level of hierarchy. Pattern ACLs apply to all users even if the user keyword has previously been given.
Example:
pattern write sensor/%u/data
VerneMQ currently doesn't cancel active subscriptions in case the ACL file revokes access for a topic.
# ACL for anonymous clients
topic bar
topic write foo
topic read all
# ACL for user 'john'
user john
topic foo
topic read baz
topic write all
Anonymous users are allowed to
publish & subscribe to topic bar.
publish to topic foo.
subscribe to topic all.
User john is allowed to
publish & subscribe to topic foo.
subscribe to topic baz.
publish to topic all.
Inspecting the retained message store
To list the retained messages simply invoke vmq-admin retain show
:
$ vmq-admin retain show
+------------------+----------------+
| payload | topic |
+------------------+----------------+
| a-third-message | a/third/topic |
|some-other-message|some/other/topic|
| a-message | some/topic |
| a-message | another/topic |
+------------------+----------------+
Note, by default a maximum of 100 results are returned. This is a mechanism to protect the from overload as there can be millions of retained messages. Use --limit=<RowLimit>
to override the default value.
Besides listing the retained messages it is also possible to filter them:
$ vmq-admin retain show --payload --topic=some/topic
+---------+
| payload |
+---------+
|a-message|
+---------+
In the above example we list only the payload for the topic some/topic
.
Another example where all topics are list with retained messages with a specific payload:
$ vmq-admin retain show --payload a-message --topic
+-------------+
| topic |
+-------------+
| some/topic |
|another/topic|
+-------------+
See the full set of options and documentation by invoking vmq-admin retain show --help
.
How to implement VerneMQ plugins using a HTTP interface
The VerneMQ Webhooks plugin provides an easy and flexible way to build powerful plugins for VerneMQ using web hooks. With VerneMQ Webhooks you are free to select the implementation language to match your technical requirements or the language in which you feel comfortable and productive in. You can use any modern language such as Python, Go, C#/.Net and indeed any language in which you can build something that can handle HTTP requests.
The idea of VerneMQ Webhooks very simple: you can register an HTTP endpoint with a VerneMQ plugin hook and whenever the hook (such as auth_on_register
) is called, the VerneMQ Webhooks plugin dispatches a HTTP post request to the registered endpoint. The HTTP post request contains a HTTP header like vernemq-hook: auth_on_register
and a JSON encoded payload. The endpoint then responds with code 200 on success and with a JSON encoded payload informing the VerneMQ Webhooks plugin which action to take (if any).
To enable webhooks make sure to set:
plugins.vmq_webhooks = on
And then each webhook can be configured like this:
vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://127.0.0.1/myendpoints
It is also possible to dynamically register webhooks at run-time:
$ vmq-admin webhooks register hook=auth_on_register endpoint="http://localhost"
See which endpoints are registered:
$ vmq-admin webhooks show
And finally deregistering an endpoint:
$ vmq-admin webhooks deregister hook=auth_on_register endpoint="http://localhost"
Each registered hook uses by default a connection pool containing maximally 100 connections. This can be changed by setting vmq_webhooks.pool_max_connections
to a different value. Similarly the vmq_webhooks.pool_timeout
configuration (value is in milliseconds) can be set to control how long an unused connection should stay in the connection pool before being closed and removed. The default value is 60000 (60 seconds).
These options are available in VerneMQ 1.4.0.
VerneMQ webhooks support caching of the auth_on_register
, auth_on_publish
and auth_on_subscribe
hooks.
This can be used to speed up authentication and authorization tremendously. All data passed to these hooks is used to look if the call is in the cache, except in the case of the auth_on_publish
where the payload is omitted.
To enable caching for an endpoint simply return the cache-control: max-age=AgeInSeconds
in the response headers to one of the mentioned hooks. If the call was successful (authentication granted), the request will be cached together with any modifiers, except for the payload
modifier in the auth_on_publish
hook.
Whenever a non-expired entry is looked up in the cache the endpoint will not be called and the modifiers of the cached entry will be returned, if any.
It is possible to inspect the cache using:
$ vmq-admin webhooks cache show
All webhooks are called with method POST
. All hooks need to be answered with the HTTP code 200
to be considered successful. Any hook called that does not return the 200
code will be logged as an error as will any hook with an unparseable payload.
All hooks are called with the header vernemq-hook
which contains the name of the hook in question.
For detailed information about the hooks and when they are called, see the sections Session Lifecycle, Subscribe Flow and Publish Flow.
Header: vernemq-hook: auth_on_register
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"password": "password",
"mountpoint": "",
"client_id": "clientid",
"clean_session": false
}
A minimal response indicating the authentication was successful looks like:
{
"result": "ok"
}
It is also possible to override various client specific settings by returning an array of modifiers:
{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000,
"retry_interval": 20000
}
}
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses:
{
"result": "next"
}
{
"result": {
"error": "not_allowed"
}
}
Header: vernemq-hook: auth_on_subscribe
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}
A minimal response indicating the subscription authorization was successful looks like:
{
"result": "ok"
}
Another example where where the topics to subscribe have been rewritten looks like:
{
"result": "ok",
"topics":
[{"topic": "rewritten/topic",
"qos": 0}]
}
Note, you can also pass a qos
with value 128
which means it was either not possible or the client was not allowed to subscribe to that specific question.
Other possible responses:
{
"result": "next"
}
{
"result": { "error": "some error message" }
}
Header: vernemq-hook: auth_on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}
A minimal response indicating the publish was authorized looks like:
{
"result": "ok"
}
A more complex example where the publish topic, qos, payload and retain flag is rewritten looks like:
{
"result": "ok",
"modifiers": {
"topic": "rewritten/topic",
"qos": 2,
"payload": "rewritten payload",
"retain": true
}
}
Other possible responses:
{
"result": "next"
}
{
"result": { "error": "some error message" }
}
Header: vernemq-hook: on_register
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"mountpoint": "",
"client_id": "clientid"
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topics":
["a/b", "c/d"]
}
Example response:
{
"result": "ok",
"topics":
["rewritten/topic"]
}
Other possible responses:
{
"result": "next"
}
{
"result": { "error": "some error message" }
}
Header: vernemq-hook: on_deliver
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topic": "a/b",
"payload": "hello"
}
Example response:
{
"result": "ok",
"modifiers":
{
"topic": "rewritten/topic",
"payload": "rewritten payload"
}
}
Other possible responses:
{
"result": "next"
}
Header: vernemq-hook: on_offline_message
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"qos": "1",
"topic": "sometopic",
"payload": "payload",
"retain": false
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_wakeup
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_offline
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_gone
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: auth_on_register_m5
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"password": "password",
"clean_start": true,
"properties": {}
}
A minimal response indicating the authentication was successful looks like:
{
"result": "ok"
}
It is also possible to override various client specific settings by returning an array of modifiers:
{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000
}
}
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses:
{
"result": "next"
}
{
"result": {
"error": "not_allowed"
}
}
Header vernemq-hook: on_auth_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMA==",
"p_authentication_method": "AUTH_METHOD"
}
}
Note, as the authentication data is binary data it is base64 encoded.
A minimal response indicating the authentication was successful looks like:
"modifiers": {
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMQ==",
"p_authentication_method": "AUTH_METHOD"
}
"reason_code": 0
},
"result": "ok"
}
If authentication were to continue for another round a reason code with value 24 (Continue Authentication) should be returned instead. See also the relevant section in the MQTT 5.0 specification.
Header: vernemq-hook: auth_on_subscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
}
],
"properties": {}
}
A minimal response indicating the subscription authorization was successful looks like:
{
"result": "ok"
}
Another example where where the topics to subscribe have been rewritten looks like:
{
"modifiers": {
"topics": [
{
"qos": 2,
"topic": "rewritten/topic"
},
{
"qos": 135,
"topic": "forbidden/topic"
}
]
},
"result": "ok"
}
Note, the forbidden/topic
has been rejected with the qos
value of 135 (Not authorized).
Other responses
{
"result": "next"
}
{
"result": {
"error": "not_allowed"
}
}
Header: vernemq-hook: auth_on_publish_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "some/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}
A minimal response indicating the publish was authorized looks like:
{
"result": "ok"
}
A response where the publish topic has been rewritten:
{
"modifiers": {
"topic": "rewritten/topic"
},
"result": "ok"
}
Other possible responses:
{
"result": "next"
}
{
"result": {
"error": "not_allowed"
}
}
Header: vernemq-hook: on_register_m5
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"properties": {
}
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish_m5
Note, in the example below the payload is base64 encoded .
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "test/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
},
{
"topic": "test/topic",
"qos": 128
}
],
"properties": {
}
}
Note, the qos value of 128
(Unspecified error) means the subscription was rejected.
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
"test/topic"
],
"properties": {
}
}
Example response:
{
"modifiers": {
"topics": [
"rewritten/topic"
]
},
"result": "ok"
}
Other possible responses:
{
"result": "next"
}
Header: vernemq-hook: on_deliver_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topic": "test/topic",
"payload": "message payload",
"properties": {
}
}
Example response:
{
"result": "ok"
}
Other possible responses:
{
"result": "next"
}
Below is a very simple example of an endpoint implemented in Python. It uses the web
and json
modules and implements handlers for three different hooks: auth_on_register
, auth_on_publish
and auth_on_subscribe
.
The auth_on_register
hook only restricts access only to the user with username joe
and password secret
. The auth_on_subscribe
and auth_on_publish
hooks allow any subscription or publish to continue as is. These last two hooks are needed as the default policy is deny
.
import web
import json
urls = ('/.*', 'hooks')
app = web.application(urls, globals())
class hooks:
def POST(self):
# fetch hook and request data
hook = web.ctx.env.get('HTTP_VERNEMQ_HOOK')
data = json.loads(web.data())
# print the hook and request data to the console
print
print 'hook:', hook
print ' data: ', data
# dispatch to appropriate function based on the hook.
if hook == 'auth_on_register':
return handle_auth_on_register(data)
elif hook == 'auth_on_publish':
return handle_auth_on_publish(data)
elif hook == 'auth_on_subscribe':
return handle_auth_on_subscribe(data)
else:
web.ctx.status = 501
return "not implemented"
def handle_auth_on_register(data):
# only allow user 'joe' with password 'secret', reject all others.
if "joe" == data['username']:
if "secret" == data['password']:
return json.dumps({'result': 'ok'})
return json.dumps({'result': {'error': 'not allowed'}})
def handle_auth_on_publish(data):
# accept all publish requests
return json.dumps({'result': 'ok'})
def handle_auth_on_subscribe(data):
# accept all subscribe requests
return json.dumps({'result': 'ok'})
if __name__ == '__main__':
app.run()
Developing VerneMQ plugins in Erlang is the most powerful way to extend the functionality of a VerneMQ broker but is a barrier developers not familiar with Erlang. For this reason we've implemented a VerneMQ extension that allows you to develop plugins using the Lua scripting language. This extension is called vmq_diversity and is shipped as part of VerneMQ.
Moreover vmq_diversity provides simple Lua libraries to communicate with MySQL, PostgreSQL, MongoDB, and Redis within your Lua VerneMQ plugins. An additional Json encoding/decoding library as well as a generic HTTP client library provide your Lua scripts a great way to talk to external services.
To enable vmq_diversity
make sure to set:
plugins.vmq_diversity = on
To specify a script to load when VerneMQ starts can be done like this:
vmq_diversity.myscript1.file = Path/to/Script.lua
It is also possible to dynamically load a Lua script using vmq-admin
:
$ vmq-admin script load path=/Abs/Path/To/script.lua
To reload a script after a change:
$ vmq-admin script reload path=/Abs/Path/To/script.lua
A VerneMQ plugin typically consists of one or more implemented VerneMQ hooks. We tried to keep the differences between the traditional Erlang based and Lua based plugins as small as possible. Please check out the Plugin Development Guide for more information about the different flows and a description of the different hooks.
Let's start with a first very basic example that implements a basic authentication and authorization scheme.
-- the function that implements the auth_on_register/5 hook
-- the reg object contains everything required to authenticate a client
-- reg.addr: IP Address e.g. "192.168.123.123"
-- reg.port: Port e.g. 12345
-- reg.mountpoint: Mountpoint e.g. ""
-- reg.username: UserName e.g. "test-user"
-- reg.password: Password e.g. "test-password"
-- reg.client_id: ClientId e.g. "test-id"
-- reg.clean_session: CleanSession Flag true
function my_auth_on_register(reg)
-- only allow clients connecting from this host
if reg.addr == "192.168.10.10" then
--only allow clients with this username
if reg.username == "demo-user" then
-- only allow clients with this clientid
if reg.client_id == "demo-id" then
return true
end
end
end
return false
end
-- the function that implements the auth_on_publish/6 hook
-- the pub object contains everything required to authorize a publish request
-- pub.mountpoint: Mountpoint e.g. ""
-- pub.client_id: ClientId e.g. "test-id"
-- pub.topic: Publish Topic e.g. "test/topic"
-- pub.qos: Publish QoS e.g. 1
-- pub.payload: Payload e.g. "hello world"
-- pub.retain: Retain flag e.g. false
function my_auth_on_publish(pub)
-- only allow publishes on this topic with QoS = 0
if pub.topic == "demo/topic" and pub.qos == 0 then
return true
end
return false
end
-- the function that implements the auth_on_subscribe/3 hook
-- the sub object contains everything required to authorize a subscribe request
-- sub.mountpoint: Mountpoint e.g. ""
-- sub.client_id: ClientId e.g. "test-id"
-- sub.topics: A list of Topic/QoS Pairs e.g. { {"topic/1", 0}, {"topic/2, 1} }
function my_auth_on_subscribe(sub)
local topic = sub.topics[1]
if topic then
-- only allow subscriptions for the topic "demo/topic" with QoS = 0
if topic[1] == "demo/topic" and topic[2] == 0 then
return true
end
end
return false
end
-- the hooks table specifies which hooks this plugin is implementing
hooks = {
auth_on_register = my_auth_on_register,
auth_on_publish = my_auth_on_publish,
auth_on_subscribe = my_auth_on_subscribe
}
This subsection describes the data providers currently available to a Lua script. Every data provider is backed by a connection pool that has to be configured by your script.
ensure_pool
mysql.ensure_pool(config)
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
user
: MySQL account name for login
password
: MySQL account password for login (in clear text).
host
: Host name for the MySQL server (default is localhost)
port
: Port that the MySQL server is listening on (default is 3306).
database
: MySQL database name.
encoding
: Encoding (default is latin1)
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
execute
mysql.execute(pool_id, stmt, args...)
Executes the provided SQL statement using a connection from the connection pool.
pool_id
: Name of the connection pool to use for this statement.
stmt
: A valid MySQL statement.
args...
: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true
or false
or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.
ensure_pool
postgres.ensure_pool(config)
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
user
: Postgres account name for login
password
: Postgres account password for login (in clear text).
host
: Host name for the Postgres server (default is localhost)
port
: Port that the Postgres server is listening on (default is 5432).
database
: Postgres database name.
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
execute
postgres.execute(pool_id, stmt, args...)
Executes the provided SQL statement using a connection from the connection pool.
pool_id
: Name of the connection pool to use for this statement.
stmt
: A valid MySQL statement.
args...
: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true
or false
or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.
ensure_pool
mongodb.ensure_pool(config)
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
login
: MongoDB login name
password
: MongoDB password for login.
host
: Host name for the MongoDB server (default is localhost)
port
: Port that the MongoDB server is listening on (default is 27017).
database
: MongoDB database name.
w_mode
: Set mode for writes either to "unsafe" or "safe".
r_mode
: Set mode for reads either to "master" or "slave_ok".
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
insert
mongodb.insert(pool_id, collection, doc_or_docs)
Insert the provided document (or list of documents) into the collection.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
doc_or_docs
: A single Lua table or a Lua array containing multiple Lua tables.
The provided document can set the document id using the _id
key. If the id isn't provided one gets autogenerated. The call returns the inserted document(s) or throws a badarg error if it cannot insert the document(s).
update
mongodb.update(pool_id, collection, selector, doc)
Updates all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
doc
: A single Lua table containing the update properties.
The call returns true
or throws a badarg error if it cannot update the document(s).
delete
mongodb.delete(pool_id, collection, selector)
Deletes all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
The call returns true
or throws a badarg error if it cannot delete the document(s).
find
mongodb.find(pool_id, collection, selector, args)
Finds all documents in the collection that match the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
args
: A Lua table that currently supports an optional projector=LuaTable
element.
The call returns a MongoDB cursor or throws a badarg error if it cannot setup the iterator.
next
mongodb.next(cursor)
Fetches next available document given a cursor object obtained via find
.
The call returns the next available document or false
if all documents have been fetched.
take
mongodb.take(cursor, nr_of_docs)
Fetches the next nr_of_docs
documents given a cursor object obtained via find
.
The call returns a Lua array containing the documents or false
if all documents have been fetched.
close
mongodb.close(cursor)
Closes and cleans up a cursor object obtained via find
.
The call returns true
.
find_one
mongodb.find_one(pool_id, collection, selector, args)
Finds the first document in the collection that matches the given selector.
pool_id
: Name of the connection pool to use for this statement.
collection
: Name of a MongoDB collection.
selector
: A single Lua table containing the filter properties.
args
: A Lua table that currently supports an optional projector=LuaTable
element.
The call returns the matched document or false
in case no document was found.
ensure_pool
redis.ensure_pool(config)
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
password
: Redis password for login.
host
: Host name for the Redis server (default is localhost)
port
: Port that the Redis server is listening on (default is 6379).
database
: Redis database (default is 0).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
cmd
redis.cmd(pool_id, command, args...)
Executes the given Redis command.
pool_id
: Name of the connection pool
command
: Redis command string.
args...
: Extra args.
This call returns a Lua table, true
, false
, or nil
. In case it cannot parse the command a badarg error is thrown.
ensure_pool
memcached.ensure_pool(config)
Ensures that the pool named config.pool_id
is setup in the system, The config
argument is a lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 5).
host
: Host name for the Memcached server (default is localhost)
port
: Port that the Redis server is listening on (default is 11211).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
flush_all(pool_id)
Flushes all data from the Memcached server. Use with care.
Returns true
.
get(pool_id, key)
Get data for key key
.
Returns the data for the key and otherwise false
.
set(pool_id, key, value, expiration)
Unconditionally set a value for a key.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
.
add(pool_id, key, value, expiration)
Add a key/value pair if the key doesn't already exist.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
if key
didn't already exist, false
otherwise.
replace(pool_id, key, value, expiration)
Replace a key/value pair if the key already exists.
key
: Key.
value
: Value.
expiration
time until key/value pair is deleted in seconds. This
parameter is optional with default value 0
(no expiration).
Returns value
if key
already exists, false
otherwise.
delete(pool_id, key)
Delete key
and the associated value.
Returns true
if the key/value pair was deleted, false
otherwise
ensure_pool
http.ensure_pool(config)
Ensures that the connection pool named config.pool_id
is setup in the system. The config
argument is a Lua table holding the following keys:
pool_id
: Name of the connection pool (mandatory).
size
: Size of the connection pool (default is 10).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true
.
get, put, post, delete
http.get(pool_id, url, body, headers)
http.put(pool_id, url, body, headers)
http.post(pool_id, url, body, headers)
http.delete(pool_id, url, body, headers)
Executes a HTTP request with the given url and args.
url
: A valid http url.
body
: optional body to be included in the request.
headers
: optional Lua table containing extra headers to be included in the request.
This call returns false
in case of an error or a Lua table of the form:
response = {
status = HTTP_STATUS_CODE,
headers = Lua Table containing response headers,
ref = Client Ref
}
body
http.body(client_ref)
Fetches the response body given a client ref obtained via the response Lua table.
This call returns false
in case of an error or the response body.
encode
json.encode(val)
Encodes a Lua value to a JSON string.
This call returns false if it cannot encode the given value.
decode
json.decode(json_string)
Decodes a JSON string to a Lua value.
This call returns false if it cannot decode the JSON string.
log.info(log_string)
log.error(log_string)
log.warning(log_string)
log.debug(log_string)
Uses the VerneMQ logging infrastructure to log the given log_string
.
In this section the subscription flow is described. VerneMQ provides several hooks to intercept the subscription flow. The most important ones are the auth_on_subscribe
and auth_on_subscribe_m5
hooks which act as an application level firewall granting or rejecting subscribe requests.
The auth_on_subscribe
and auth_on_subscribe_m5
hooks allow your plugin to grant or reject subscribe requests sent by a client. They also makes it possible to rewrite the subscribe topic and qos. The auth_on_subscribe
hook is specified in the Erlang behaviour and the auth_on_subscribe
hook in the behaviour available in the repo.
The on_subscribe
and on_subscribe_m5
hooks allow your plugin to get informed about an authorized subscribe request. The on_subscribe
hook is specified in the Erlang behaviour and the on_subscribe_m5
hook in the behaviour available in the repo.
The on_unsubscribe
and on_unsubscribe_m5
hooks allow your plugin to get informed about an unsubscribe request. They also allow you to rewrite the unsubscribe topic if required. The on_subscribe
hook is specified in the Erlang behaviour and the on_unsubscribe_m5
hook in the behaviour available in the repo.
Everything you need to know to work with the VerneMQ HTTP administration interface
The VerneMQ HTTP API is enabled by default and installs an HTTP handler on http://localhost:8888/api/v1
. To read more about configuring the HTTP listener, see HTTP Listener Configuration. You can configure a HTTP listener, or a HTTPS listener to serve the HTTP API v1.
The VerneMQ HTTP API uses basic authentication where an API key is passed as the username and the password is left empty. So the first step to us the HTTP API is to create an API key:
$ vmq-admin api-key create
JxctXkZ1OTVnlwvguSCE9KtujacMkOLF
The key is persisted and available on all cluster nodes.
To list existing keys do:
$ vmq-admin api-key show
+--------------------------------+
| Key |
+--------------------------------+
|JxctXkZ1OTVnlwvguSCE9KtujacMkOLF|
+--------------------------------+
To add an API key of your own choosing, do:
vmq-admin api-key add key=mykey
To delete an API key do:
vmq-admin api-key delete key=JxctXkZ1OTVnlwvguSCE9KtujacMkOLF
The VerneMQ HTTP API is a wrapper over the vmq-admin
CLI tool, and anything that can be done using vmq-admin
can be done using the HTTP API. Note that the HTTP API is therefore subject to any changes made to the vmq-admin
tools and their flags & options structure. All requests are performed doing a HTTP GET and if no errors occurred an HTTP 200 OK code is returned with a possible non-empty JSON payload.
The API is using basic auth where the API key is passed as the username. An example using curl
would look like this:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/show"
The mapping between vmq-admin
and the HTTP API is straightforward, and if one is already familiar with how the vmq-admin
tool works, working with the API should be easy. The mapping works such that the command part of a vmq-admin
invocation is turned into a path, and the options and flags are turned into the query string.
A mandatory parameter like the client-id
in the vmq-admin session disconnect client-id=myclient
command should be translated as: ?client-id=myclient
.
An optional flag like --cleanup
in the vmq-admin session disconnect client-id=myclient --cleanup
command should be translated as: &--cleanup
Let's look at the cluster join command as an example, which looks like this:
vmq-admin cluster join [email protected]
This turns into a GET request:
GET /api/v1/cluster/[email protected]
To test, run it with curl
:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/cluster/[email protected]"
And the returned response would look like:
{
"text": "Done",
"type": "text"
}
Below are some other examples.
Request:
GET /api/v1/cluster/show
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/cluster/show"
Response:
{
"type" : "table",
"table" : [
{
"Running" : true,
"Node" : "[email protected]"
}
]
}
Request:
GET /api/v1/session/show
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/show"
Response:
{
"type" : "table",
"table" : [
{
"user" : null,
"client_id" : "mosqsub/11690-algol",
"peer_host" : "localhost",
"pid" : "<0.551.0>",
"peer_port" : 50964
},
{
"peer_host" : "localhost",
"pid" : "<0.557.0>",
"peer_port" : 50996,
"user" : null,
"client_id" : "mosqsub/11767-algol"
}
]
}
Request:
GET /api/v1/listener/show
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/listener/show"
Response:
{
"type" : "table",
"table" : [
{
"max_conns" : 10000,
"port" : "8888",
"mountpoint" : "",
"ip" : "127.0.0.1",
"type" : "http",
"status" : "running"
},
{
"status" : "running",
"max_conns" : 10000,
"port" : "44053",
"mountpoint" : "",
"ip" : "0.0.0.0",
"type" : "vmq"
},
{
"max_conns" : 10000,
"port" : "1883",
"mountpoint" : "",
"ip" : "127.0.0.1",
"type" : "mqtt",
"status" : "running"
}
]
}
Request:
GET /api/v1/plugin/show
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/plugin/show"
Response:
{
"type" : "table",
"table" : [
{
"Hook(s)" : "auth_on_register\n",
"Plugin" : "vmq_passwd",
"M:F/A" : "vmq_passwd:auth_on_register/5\n",
"Type" : "application"
},
{
"Type" : "application",
"M:F/A" : "vmq_acl:auth_on_publish/6\nvmq_acl:auth_on_subscribe/3\n",
"Plugin" : "vmq_acl",
"Hook(s)" : "auth_on_publish\nauth_on_subscribe\n"
}
]
}
Request:
GET /api/v1/set?allow_publish_during_netsplit=on
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/set?allow_publish_during_netsplit=on"
Response:
[]
Request:
GET /api/v1/session/disconnect?client-id=myclient&--cleanup
Curl:
curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/disconnect?client-id=myclient&--cleanup"
Response:
[]