Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Configure how VerneMQ handles certain aspects of MQTT
Everything you must know to properly configure VerneMQ
Configure Non-Standard MQTT Options VerneMQ Supports.
Configure WebSocket Listeners for VerneMQ.
As well as being available as packages that can be installed directly into the operating systems, VerneMQ is also available as a Docker image. Below is an example of how to set up a couple of VerneMQ
Netdata Metrics
listener.ws.default = 127.0.0.1:9001
listener.wss.wss_default = 127.0.0.1:9002
# To use WSS, you'll have to configure additional options for your WSS listener (called `wss_default` here):
listener.wss.wss_default.cafile = ./etc/cacerts.pem
listener.wss.wss_default.certfile = ./etc/cert.pem
listener.wss.wss_default.keyfile = ./etc/key.pem$ sudo vmq-admin --help
Usage: vmq-admin <sub-command>
Administrate the cluster.
Sub-commands:
node Manage this node
cluster Manage this node's cluster membership
session Retrieve session information
retain Show and filter MQTT retained messages
plugin Manage plugin system
listener Manage listener interfaces
metrics Retrieve System Metrics
api-key Manage API keys for the HTTP management interface
trace Trace various aspects of VerneMQ
Use --help after a sub-command for more details.1000. Set to -1 for no limit. This option protects a client session from overload by dropping messages (of any QoS).vmq-admin command:retry_interval = 20max_inflight_messages = 20max_online_messages = 1000max_offline_messages = 1000override_max_online_messages = offmax_client_id_size = 23topic_max_depth = 20persistent_client_expiration = 1wmax_message_size = 0docker run --name vernemq1 -d vernemq/vernemqdocker run -p 1883:1883 --name vernemq1 -d vernemq/vernemqdocker run -e "DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on" --name vernemq1 -d vernemq/vernemqdocker run -e "DOCKER_VERNEMQ_DISCOVERY_NODE=<IP-OF-VERNEMQ1>" --name vernemq2 -d vernemq/vernemqdocker exec vernemq1 vmq-admin cluster show
+--------------------+-------+
| Node |Running|
+--------------------+-------+
|[email protected]| true |
|[email protected]| true |
+--------------------+-------+Key = ValueVerneMQ can be installed on Debian or Ubuntu-based systems using the binary package we provide.
Everything you must know to properly configure and deploy a VerneMQ Cluster
erlang.distribution.port_range.minimum = 6000
erlang.distribution.port_range.maximum = 7999How to setup and configure the HTTP listener.
Managing VerneMQ tcp listeners
vmq-admin listener show +----+-------+------------+-----+----------+---------+
|type|status | ip |port |mountpoint|max_conns|
+----+-------+------------+-----+----------+---------+
|vmq |running|192.168.1.50|44053| | 30000 |
|mqtt|running|192.168.1.50|1883 | | 30000 |
+----+-------+------------+-----+----------+---------+
`vmq-admin listener start address=192.168.1.50 port=1884 --mountpoint /test --nr_of_acceptors=10 --max_connections=1000Description and Configuration of the built-in Monitoring mechanism
vmq-admin metrics showvmq-admin metrics show -d# The number of AUTH packets received.
counter.mqtt_auth_received = 0
# The number of times a MQTT queue process has been initialized from offline storage.
counter.queue_initialized_from_storage = 0
# The number of PUBLISH packets sent.
counter.mqtt_publish_sent = 10
# The number of bytes used for storing retained messages.
gauge.retain_memory = 21184mqtt_connack_not_authorized_sent
mqtt_connack_bad_credentials_sent
mqtt_connack_server_unavailable_sent
mqtt_connack_identifier_rejected_sent
mqtt_connack_unacceptable_protocol_sent
mqtt_connack_accepted_sentInspecting the retained message store
Description and Configuration of the $SYSTree Monitoring Feature
Description and Configuration of the Graphite exporter
The VerneMQ Status Page
MQTT consumers can share and loadbalance a topic subscription.
The VerneMQ health checker
listener.vmq.clustering = 0.0.0.0:44053listener.http.default = 127.0.0.1:8888vmq-admin metrics show --return_code=not_authorized
counter.mqtt_connack_sent = 0$ vmq-admin retain show
+------------------+----------------+
| payload | topic |
+------------------+----------------+
| a-third-message | a/third/topic |
|some-other-message|some/other/topic|
| a-message | some/topic |
| a-message | another/topic |
+------------------+----------------+$ vmq-admin retain show --payload --topic=some/topic
+---------+
| payload |
+---------+
|a-message|
+---------+graphite_enabled = on
graphite_host = carbon.hostedgraphite.com
graphite_port = 2003
graphite_interval = 20000
graphite_api_key = YOUR-GRAPHITE-API-KEY
graphite.interval = 15000# set the connect timeout (defaults to 5000 ms)
graphite_connect_timeout = 10000
# set a reconnect timeout (default to 15000 ms)
graphite_reconnect_timeout = 10000
# set a custom graphite prefix (defaults to '')
graphite_prefix = vernemq allow_multiple_sessions = on
queue_deliver_mode = balance20000 milliseconds.systree_interval = 20000systree_enabled = offwhereis vernemq
vernemq: /usr/sbin/vernemq /usr/lib/vernemq /etc/vernemq /usr/share/vernemqlog.error.file = /path/to/log/filelog.crash = on | offlog.crash.file = /path/to/log/filelog.crash.maximum_message_size = 64KB## Acceptable values:
## - a byte size with units, e.g. 10GB
log.crash.size = 10MB
## For acceptable values see https://github.com/basho/lager/blob/master/README.md#internal-log-rotation
log.crash.rotation = $D0log.crash.rotation.keep = 5log.syslog = onlistener.https.default = 127.0.0.1:443
listeners.https.default.http_modules = vmq_status_http, vmq_health_http, vmq_metrics_http
listener.https.mgmt = 127.0.0.1:444
listeners.https.mgmt.http_modules = vmq_mgmt_httpvmq-admin listener stop address=192.168.1.50 port=1884vmq-admin listener restart address=192.168.1.50 port=1884vmq-admin listener delete address=192.168.1.50 port=1884$ vmq-admin retain show --payload a-message --topic
+-------------+
| topic |
+-------------+
| some/topic |
|another/topic|
+-------------+$ sudo vmq-admin retain --help
Usage: vmq-admin retain show
Show and filter MQTT retained messages.
Default options:
--payload --topic
Options
--limit=<NumberOfResults>
Limit the number of results returned. Defaults is 100.
--payload
--topic
--mountpointmosquitto_sub -t '$SYS/<node-name>/#' -u <username> -P <password> -dInspecting and managing MQTT sessions
Working with shared subscriptions
Description and Configuration of the Prometheus exporter

rebar3 new app name="myplugin" desc="this is my first VerneMQ plugin"
===> Writing myplugin/src/myplugin_app.erl
===> Writing myplugin/src/myplugin_sup.erl
===> Writing myplugin/src/myplugin.app.src
===> Writing myplugin/rebar.config
===> Writing myplugin/.gitignore
===> Writing myplugin/LICENSE
===> Writing myplugin/README.md{erl_opts, [debug_info]}.
{deps, [{vernemq_dev,
{git, "git://github.com/vernemq/vernemq_dev.git", {branch, "master"}}}
]}.# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
- job_name: 'vernemq'
scrape_interval: 5s
scrape_timeout: 5s
static_configs:
- targets: ['localhost:8888']leveldb.maximum_memory.percent = 20rebar3 compile
===> Verifying dependencies...
===> Fetching vmq_commons ({git,
"git://github.com/vernemq/vernemq_dev.git",
{branch,"master"}})
===> Compiling vernemq_dev
===> Compiling mypluginallow_anonymous
topic_alias_max_broker
receive_max_broker
vmq_acl.acl_file
graphite_host
vmq_acl.acl_reload_interval
graphite_enabled
queue_type
suppress_lwt_on_session_takeover
max_message_size
vmq_passwd.password_file
graphite_port
max_client_id_size
upgrade_outgoing_qos
max_message_rate
graphite_interval
allow_multiple_sessions
systree_enabled
max_last_will_delay
retry_interval
receive_max_client
max_offline_messages
max_online_messages
max_inflight_messages
allow_register_during_netsplit
vmq_passwd.password_reload_interval
topic_alias_max_client
systree_interval
allow_publish_during_netsplit
coordinate_registrations
remote_enqueue_timeout
persistent_client_expiration
allow_unsubscribe_during_netsplit
graphite_include_labels
shared_subscription_policy
queue_deliver_mode
allow_subscribe_during_netsplitvmq-admin set max_client_id_size=45vmq-admin set max_client_id_size=45 [email protected]vmq-admin set max_client_id_size=45 --allvmq-admin show max_client_id_size retry_interval+----------------------+------------------+--------------+
| node |max_client_id_size|retry_interval|
+----------------------+------------------+--------------+
|[email protected]| 28 | 20 |
+----------------------+------------------+--------------+
`vmq-admin show max_client_id_size retry_interval --node [email protected]vmq-admin show max_client_id_size retry_interval --all+----------------------+------------------+--------------+
| node |max_client_id_size|retry_interval|
+----------------------+------------------+--------------+
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 33 | 20 |
|[email protected]| 28 | 20 |
+----------------------+------------------+--------------+$ vmq-admin session show
+---------+---------+----------+---------+---------+---------+
|client_id|is_online|mountpoint|peer_host|peer_port| user |
+---------+---------+----------+---------+---------+---------+
| client2 | true | |127.0.0.1| 37098 |undefined|
| client1 | true | |127.0.0.1| 37094 |undefined|
+---------+---------+----------+---------+---------+---------+$ vmq-admin session show --node --is_online --client_id=client1
+---------+--------------+
|is_online| node |
+---------+--------------+
| true |[email protected]|
+---------+--------------+$ vmq-admin session show --topic --client_id
+---------+----------------+
|client_id| topic |
+---------+----------------+
| client2 |some/other/topic|
| client1 |some/other/topic|
| client1 | some/topic |
+---------+----------------+$ vmq-admin session show --topic --client_id --topic=some/topic
+---------+----------+
|client_id| topic |
+---------+----------+
| client1 |some/topic|
+---------+----------+$ vmq-admin session show --client_id=client1 --queue_started_at --session_started_at
+----------------+------------------+
|queue_started_at|session_started_at|
+----------------+------------------+
| 1549379963575 | 1549379974905 |
+----------------+------------------+$ vmq-admin session disconnect client-id=client1 --cleanup$ vmq-admin session reauthorize username=username client-id=client1
Unchangedrandomlocal_onlyprefer_online_before_localrandomprefer_localprefer_online_before_localprefer_locallocal_onlyshared_subscription_policy = prefer_localmosquitto_sub -h mqtt.example.io -p 1883 -q 2 -t \$share/group/topicname
mosquitto_sub -h mqtt.example.io -p 1883 -q 2 -t \$share/group/topicname/#mosquito_pub -h mqtt.example.io -p 1883 -t topicname -m "This is a test message"
mosquito_pub -h mqtt.example.io -p 1883 -t topicname/group1 -m "This is a test message"VerneMQ supports multiple ways to configure one or many MQTT listeners.
# defines the default nr of allowed concurrent
# connections per listener
listener.max_connections = 10000
# defines the nr. of acceptor processes waiting
# to concurrently accept new connections
listener.nr_of_acceptors = 10
# used when clients of a particular listener should
# be isolated from clients connected to another
# listener.
listener.mountpoint = offReal-time inspection
vmq-admin trace client client-id=<client-id>$ vmq-admin trace client client-id=client
No sessions found for client "client"
New session with PID <7616.3443.1> found for client "client"
<7616.3443.1> MQTT RECV: CID: "client" CONNECT(c: client, v: 4, u: username, p: password, cs: 1, ka: 30)
<7616.3443.1> Calling auth_on_register({{172,17,0,1},34274},{[],<<"client">>},username,password,true)
<7616.3443.1> Hook returned "ok"
<7616.3443.1> MQTT SEND: CID: "client" CONNACK(sp: 0, rc: 0)
<7616.3443.1> MQTT RECV: CID: "client" SUBSCRIBE(m1) with topics:
q:0, t: "topic"
<7616.3443.1> Calling auth_on_subscribe(username,{[],<<"client">>}) with topics:
q:0, t: "topic"
<7616.3443.1> Hook returned "ok"
<7616.3443.1> MQTT SEND: CID: "client" SUBACK(m1, qt[0])
<7616.3443.1> Trace session for client stoppedThis describes a quick way to create a VerneMQ cluster on developer's machines
VerneMQ can be installed on CentOS-based systems using the binary package we provide.
Loadtesting VerneMQ with vmq_mzbench
A quick and simple guide to get started with VerneMQ
listener.ssl.mountpoint = ssl-mountpoint
listener.tcp.listener1.mountpoint = tcp-listener1
listener.tcp.listener2.mountpoint = tcp-listener2listener.tcp.allowed_protocol_versions = 3,4,5listener.tcp.default = 127.0.0.1:1883
listener.ws.default = 127.0.0.1:8888listener.tcp.my_other = 127.0.0.1:18884
listener.tcp.my_other.max_connections = 100listener.ssl.cafile = /etc/ssl/cacerts.pem
listener.ssl.certfile = /etc/ssl/cert.pem
listener.ssl.keyfile = /etc/ssl/key.pem
listener.ssl.default = 127.0.0.1:8883listener.ssl.psk_support = on
listener.ssl.pskfile = /srv/vernemq/etc/vernemq.psk
listener.ssl.ciphers = PSK-AES256-GCM-SHA384:PSK-AES256-CBC-SHA:PSK-AES128-GCM-SHA256listener.ssl.require_certificate = onlistener.ssl.use_identity_as_username = onvmq-admin cluster join discovery-node=<OtherClusterNode>vmq-admin cluster leave node=<NodeThatShouldGo> (only the first step!)vmq-admin cluster show$ vmq-admin bridge show
+-----------------+-----------+----------+-------------------+
| endpoint |buffer size|buffer max|buffer dropped msgs|
+-----------------+-----------+----------+-------------------+
|192.168.1.10:1883| 0 | 0 | 0 |
+-----------------+-----------+----------+-------------------+vmq_bridge.tcp.br0 = 192.168.1.12:1883# use a clean session (defaults to 'off')
vmq_bridge.tcp.br0.cleansession = off | on
# set the client id (defaults to 'auto', which generates one)
vmq_bridge.tcp.br0.client_id = auto | my_bridge_client_id
# set keepalive interval (defaults to 60 seconds)
vmq_bridge.tcp.br0.keepalive_interval = 60
# set the username and password for the bridge connection
vmq_bridge.tcp.br0.username = my_bridge_user
vmq_bridge.tcp.br0.password = my_bridge_pwd
# set the restart timeout (defaults to 10 seconds)
vmq_bridge.tcp.br0.restart_timeout = 10
# VerneMQ indicates other brokers that the connection
# is established by a bridge instead of a normal client.
# This can be turned off if needed:
vmq_bridge.tcp.br0.try_private = off
# Set the maximum number of outgoing messages the bridge will buffer
# while not connected to the remote broker. Messages published while
# the buffer is full are dropped. A value of 0 means buffering is
# disabled.
vmq_bridge.tcp.br0.max_outgoing_buffered_messages = 100topic [[[ out | in | both ] qos-level] local-prefix remote-prefix]# share messages in both directions and use QoS 1
vmq_bridge.tcp.br0.topic.1 = /demo/+ both 1
# import the $SYS tree of the remote broker and
# prefix it with the string 'remote'
vmq_bridge.tcp.br0.topic.2 = $SYS/* in remote# define the CA certificate file or the path to the
# installed CA certificates
vmq_bridge.ssl.br0.cafile = cafile.crt
#or
vmq_bridge.ssl.br0.capath = /path/to/cacerts
# if the remote broker requires client certificate authentication
vmq_bridge.ssl.br0.certfile = /path/to/certfile.pem
# and the keyfile
vmq_bridge.ssl.br0.keyfile = /path/to/keyfile
# disable the verification of the remote certificate (defaults to 'off')
vmq_bridge.ssl.br0.insecure = off
# set the used tls version (defaults to 'tlsv1.2')
vmq_bridge.ssl.br0.tls_version = tlsv1.2$ sudo vmq-admin plugin disable --name vmq_bridge
$ sudo vmq-admin plugin enable --name vmq_bridge
$ sudo vmq-admin bridge startOptions
--mountpoint=<Mountpoint>
the mountpoint for the client to trace.
Defaults to "" which is the default mountpoint.
--rate-max=<RateMaxMessages>
the maximum number of messages for the given interval,
defaults to 10.
--rate-interval=<RateIntervalMS>
the interval in milliseconds over which the max number of messages
is allowed. Defaults to 100.
--trunc-payload=<SizeInBytes>
control when the payload should be truncated for display.
Defaults to 200.Cannot start trace as another trace is already running.$ sudo vmq-admin trace stop_allqueue_type = fifomax_message_rate = 2max_drain_time = 20max_msgs_per_drain_step = 1000vmq_reg_view = "vmq_reg_trie"reg_views = "[vmq_reg_trie]"outgoing_clustering_buffer_size = 15000listener.max_connection_lifetime = 25000git clone git://github.com/erlio/vmq_mzbench.git
cd vmq_mzbench
./rebar get-deps
./rebar compile{make_install, [
{rsync, "/path/to/your/installation/vmq_mzbench/"},
{exclude, "deps"}]},{make_install, [
{git, "git://github.com/erlio/vmq_mzbench.git"}]},vernemq startvernemq consoleq().vernemq pingLearn how to implement VerneMQ Plugins for customizing many aspects of how VerneMQ deals with client connections, subscriptions, and message flows.
Managing VerneMQ Plugins
vmq-admin plugin show+-----------+-----------+-----------------+-----------------------------+
| Plugin | Type | Hook(s) | M:F/A |
+-----------+-----------+-----------------+-----------------------------+
|vmq_passwd |application|auth_on_register |vmq_passwd:auth_on_register/5|
| vmq_acl |application| auth_on_publish | vmq_acl:auth_on_publish/6 |
| | |auth_on_subscribe| vmq_acl:auth_on_subscribe/3 |
+-----------+-----------+-----------------+-----------------------------+


vmq-passwd [-c | -D] passwordfile username
vmq-passwd -U passwordfilevmq-passwd -c /etc/vernemq/vmq.passwd henryvmq-passwd -D /etc/vernemq/vmq.passwd henryplugins.vmq_acl = offvmq_acl.acl_file = /etc/vernemq/vmq.aclvmq_acl.acl_reload_interval = 10topic [read|write] <topic>user <username>pattern [read|write] <topic>pattern write sensor/%u/data# ACL for anonymous clients
topic bar
topic write foo
topic read open_to_all
# ACL for user 'john'
user john
topic foo
topic read baz
topic write open_to_all{application, vmq_acl,
[
{description, "Simple File based ACL for VerneMQ"},
{vsn, git},
{registered, []},
{applications, [
kernel,
stdlib,
clique
]},
{mod, { vmq_acl_app, []}},
{env, [
{file, "priv/test.acl"},
{interval, 10},
{vmq_config_enabled, true},
{vmq_plugin_hooks, [
{vmq_acl, change_config, 1, [internal]},
{vmq_acl, auth_on_publish, 6, []},
{vmq_acl, auth_on_subscribe, 3, []}
]}
]}
]}.-type peer() :: {inet:ip_address(), inet:port_number()}.
-type username() :: binary() | undefined.
-type password() :: binary() | undefined.
-type client_id() :: binary().
-type mountpoint() :: string().
-type subscriber_id() :: {mountpoint(), client_id()}.
-type reg_view() :: atom().
-type topic() :: [binary()].
-type qos() :: 0 | 1 | 2.
-type routing_key() :: [binary()].
-type payload() :: binary().
-type flag() :: boolean().{make_install, [
{rsync, "/path/to/your/installation/vmq_mzbench/"},
{exclude, "deps"}]},{make_install, [
{git, "git://github.com/vernemq/vmq_mzbench.git"}]},$ sudo vmq-admin plugin show --internal
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| Plugin | Type | Hook(s) | M:F/A |
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| vmq_swc | application | metadata_put | vmq_swc_plugin:metadata_put/3 |
| | | metadata_get | vmq_swc_plugin:metadata_get/2 |
| | | metadata_delete | vmq_swc_plugin:metadata_delete/2 |
| | | metadata_fold | vmq_swc_plugin:metadata_fold/3 |
| | | metadata_subscribe | vmq_swc_plugin:metadata_subscribe/1 |
| | | cluster_join | vmq_swc_plugin:cluster_join/1 |
| | | cluster_leave | vmq_swc_plugin:cluster_leave/1 |
| | | cluster_members | vmq_swc_plugin:cluster_members/0 |
| | | cluster_rename_member | vmq_swc_plugin:cluster_rename_member/2 |
| | | cluster_events_add_handler | vmq_swc_plugin:cluster_events_add_handler/2 |
| | | cluster_events_delete_handler | vmq_swc_plugin:cluster_events_delete_handler/2 |
| | | cluster_events_call_handler | vmq_swc_plugin:cluster_events_call_handler/3 |
| | | | |
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| vmq_generic_msg_store | application | msg_store_write | vmq_generic_msg_store:msg_store_write/2 |
| | | msg_store_delete | vmq_generic_msg_store:msg_store_delete/2 |
| | | msg_store_find | vmq_generic_msg_store:msg_store_find/2 |
| | | msg_store_read | vmq_generic_msg_store:msg_store_read/2 |
| | | | |
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| vmq_config | module | change_config | vmq_config:change_config/1 |
| | | | |
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| vmq_acl | application | change_config | vmq_acl:change_config/1 |
| | | auth_on_publish | vmq_acl:auth_on_publish/6 |
| | | auth_on_subscribe | vmq_acl:auth_on_subscribe/3 |
| | | auth_on_publish_m5 | vmq_acl:auth_on_publish_m5/7 |
| | | auth_on_subscribe_m5 | vmq_acl:auth_on_subscribe_m5/4 |
| | | | |
+-----------------------+-------------+-------------------------------+------------------------------------------------+
| vmq_passwd | application | change_config | vmq_passwd:change_config/1 |
| | | auth_on_register | vmq_passwd:auth_on_register/5 |
| | | auth_on_register_m5 | vmq_passwd:auth_on_register_m5/6 |
| | | | |
+-----------------------+-------------+-------------------------------+------------------------------------------------+vmq-admin plugin enable --name=vmq_aclvmq-admin plugin disable --name=vmq_aclplugins.vmq_passwd = onplugins.myplugin = on
plugins.myplugin.path = /path/to/pluginvmq_passwd.password_file = ./etc/vmq.passwdsudo sysctl -w net.ipv4.tcp_rmem="4096 16384 32768"
sudo sysctl -w net.ipv4.tcp_wmem="4096 16384 32768"
# Nope, these values are not recommendations!
# You really need to decide yourself.[{vmq_server, [
{tcp_listen_options,
[{sndbuf, 4096},
{recbuf, 4096}]}]}].+P 256000
-env ERL_MAX_ETS_TABLES 256000
-env ERL_CRASH_DUMP /erl_crash.dump
-env ERL_FULLSWEEP_AFTER 0
-env ERL_MAX_PORTS 262144
+A 64
-setcookie vmq # Important: Use your own private cookie...
-name [email protected]
+K true
+sbt db
+sbwt very_long
+swt very_low
+sub true
+Mulmbcs 32767
+Mumbcgs 1
+Musmbcs 2047
# Nope, these values are not recommendations!
# You really need to decide yourself, again ;)In the following we describe how a typical VerneMQ deployment can look and some of the concerns one have to take into account when designing such a system.
How to change the open file limits
This guide describes how to deploy a VerneMQ cluster on Kubernetes

listener.tcp.proxy_protocol = on
listener.tcp.proxy_protocol_use_cn_as_username = onlistener.ssl.require_certificate = on
listener.ssl.use_identity_as_username = onlaunchctl limitcpu unlimited unlimited
filesize unlimited unlimited
data unlimited unlimited
stack 8388608 67104768
core 0 unlimited
rss unlimited unlimited
memlock unlimited unlimited
maxproc 709 1064
maxfiles 16384 32768vernemq soft nofile 4096
vernemq hard nofile 65536ulimit -n 65536session required pam_limits.so* soft nofile 65536
* hard nofile 65536#UseLogin noUseLogin yesulimit -a* soft nofile 65536
* hard nofile 65536ulimit -aset rlim_fd_max=65536launchctl limit maxfileslaunchctl limitcpu unlimited unlimited
filesize unlimited unlimited
data unlimited unlimited
stack 8388608 67104768
core 0 unlimited
rss unlimited unlimited
memlock unlimited unlimited
maxproc 709 1064
maxfiles 10240 10240limit maxfiles 16384 32768sudo sysctl -w net.ipv4.tcp_rmem="4096 16384 32768"
sudo sysctl -w net.ipv4.tcp_wmem="4096 16384 32768"
# Nope, these values are not recommendations!
# You really need to decide yourself.Example 1 (from Linux OS config):
net.ipv4.tcp_rmem="4096 16384 32768"
net.ipv4.tcp_wmem="4096 16384 65536"Example 2 (vernemq.conf):
listener.tcp.buffer_sizes = 4096,16384,32768Example 3: (vernemq.conf)
listener.tcp.my_listener.buffer_sizes = 4096,16384,32768[{vmq_server, [
{tcp_listen_options,
[{sndbuf, 4096},
{recbuf, 4096}]}]}].+P 256000
-env ERL_MAX_ETS_TABLES 256000
-env ERL_CRASH_DUMP /erl_crash.dump
-env ERL_FULLSWEEP_AFTER 0
-env ERL_MAX_PORTS 262144
+A 64
-setcookie vmq # Important: Use your own private cookie...
-name [email protected]
+K true
+sbt db
+sbwt very_long
+swt very_low
+sub true
+Mulmbcs 32767
+Mumbcgs 1
+Musmbcs 2047
# Nope, these values are not recommendations!
# You really need to decide yourself, again ;)launchctl limitcpu unlimited unlimited filesize unlimited unlimited data unlimited unlimited stack 8388608 67104768 core 0 unlimited rss unlimited unlimited memlock unlimited unlimited maxproc 709 1064 maxfiles 10240 10240
limit maxfiles 16384 32768
ulimit -n 262144sysctl fs.file-max
fs.file-max = 262144cat /proc/sys/fs/file-maxvernemq soft nofile 65536
vernemq hard nofile 262144ulimit -n 262144LimitNOFILE=infinitysession required pam_limits.so* soft nofile 65536
* hard nofile 262144#UseLogin noUseLogin yesulimit -a* soft nofile 65536
* hard nofile 262144ulimit -aset rlim_fd_max=262144launchctl limit maxfileshelm install vernemq/vernemqcurl -L https://codeload.github.com/vernemq/vmq-operator/zip/master --output repo.zip; \
unzip -j repo.zip '*/examples/only_vernemq/*' -d only_vernemq; \
kubectl apply -f only_vernemqkubectl get pods --namespace messaging
NAME READY STATUS RESTARTS AGE
vernemq-k8s-0 1/1 Running 0 53m
vernemq-k8s-1 1/1 Running 0 4m14s
vernemq-k8s-deployment-59f5684549-s7jd4 1/1 Running 0 2d17h
vmq-operator-76f5f78f96-2jbwt 1/1 Running 0 4m28skubectl exec vernemq-k8s-0 vmq-admin cluster show --namespace messaging
+-----------------------------------------------------------------+-------+
| Node |Running|
+-----------------------------------------------------------------+-------+
|vmq@vernemq-k8s-0.vernemq-k8s-service.messaging.svc.cluster.local| true |
|vmq@vernemq-k8s-1.vernemq-k8s-service.messaging.svc.cluster.local| true |
+-----------------------------------------------------------------+-------+launchctl limitcpu unlimited unlimited
filesize unlimited unlimited
data unlimited unlimited
stack 8388608 67104768
core 0 unlimited
rss unlimited unlimited
memlock unlimited unlimited
maxproc 709 1064
maxfiles 16384 32768$ vmq-admin api-key show
+----------------------------------+-------+---------------------+-------------+
| Key | Scope | Expires (UTC) | has expired |
+----------------------------------+-------+---------------------+-------------+
| q85i5HbFCDdAVLNJuOj48QktDbchvOMS | mgmt | 2023-04-04 10:00:00 | false |
+----------------------------------+-------+---------------------+-------------+
| JxctXkZ1OTVnlwvguSCE9KtujacMkOLF | mgmt | never | false |
+----------------------------------+-------+---------------------+-------------+vmq-admin api-key add key=mykeyvmq-admin api-key delete key=JxctXkZ1OTVnlwvguSCE9KtujacMkOLFmin_apikey_length = 30max_apikey_expiry_days = 180http_module.$module.auth.mode
listener.http.$name.http_module.$module.auth.mode
listener.https.$name.http_module.$module.auth.modecurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/show"vmq-admin cluster join [email protected]GET /api/v1/cluster/[email protected]curl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/cluster/[email protected]"{
"text": "Done",
"type": "text"
}GET /api/v1/cluster/showcurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/cluster/show"{
"type" : "table",
"table" : [
{
"Running" : true,
"Node" : "[email protected]"
}
]
}GET /api/v1/session/showcurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/show"{
"type" : "table",
"table" : [
{
"user" : "client1",
"peer_port" : 50402,
"is_online" : true,
"mountpoint" : "",
"client_id" : "mosq/qJpvoqe1PA4lBN1e4E",
"peer_host" : "127.0.0.1"
},
{
"user" : "client2",
"is_online" : true,
"peer_port" : 50406,
"peer_host" : "127.0.0.1",
"client_id" : "mosq/tikkXdlM28PaznBv2T",
"mountpoint" : ""
}
]
}
GET /api/v1/listener/showcurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/listener/show"{
"type" : "table",
"table" : [
{
"max_conns" : 10000,
"port" : "8888",
"mountpoint" : "",
"ip" : "127.0.0.1",
"type" : "http",
"status" : "running"
},
{
"status" : "running",
"max_conns" : 10000,
"port" : "44053",
"mountpoint" : "",
"ip" : "0.0.0.0",
"type" : "vmq"
},
{
"max_conns" : 10000,
"port" : "1883",
"mountpoint" : "",
"ip" : "127.0.0.1",
"type" : "mqtt",
"status" : "running"
}
]
}GET /api/v1/plugin/showcurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/plugin/show" {
"type" : "table",
"table" : [
{
"Hook(s)" : "auth_on_register\n",
"Plugin" : "vmq_passwd",
"M:F/A" : "vmq_passwd:auth_on_register/5\n",
"Type" : "application"
},
{
"Type" : "application",
"M:F/A" : "vmq_acl:auth_on_publish/6\nvmq_acl:auth_on_subscribe/3\n",
"Plugin" : "vmq_acl",
"Hook(s)" : "auth_on_publish\nauth_on_subscribe\n"
}
]
}GET /api/v1/set?allow_publish_during_netsplit=oncurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/set?allow_publish_during_netsplit=on"[]GET /api/v1/session/disconnect?client-id=myclient&--cleanupcurl "http://JxctXkZ1OTVnlwvguSCE9KtujacMkOLF@localhost:8888/api/v1/session/disconnect?client-id=myclient&--cleanup"[]VerneMQ supports multiple ways to authenticate and authorize new client connections using a database.
{
"pattern": "a/+/c"
}{
"pattern": "a/+/c",
"max_qos": 2,
"max_payload_size": 128,
"allowed_retain": true
}{
"pattern": "a/+/c",
"max_qos": 2,
"max_payload_size": 128,
"allowed_retain": true,
"modifiers": {
"topic": "new/topic",
"payload": "new payload",
"qos": 2,
"retain": true,
"mountpoint": "other-mountpoint"
}
}{
"pattern": "a/+/c",
"max_qos": 2
}{
"pattern": "a/+/c",
"max_qos": 2,
"modifiers": [
["new/topic/1", 1],
["new/topic/2", 1]
]
}vmq_diversity.auth_postgres.enabled = on
vmq_diversity.postgres.host = 127.0.0.1
vmq_diversity.postgres.port = 5432
vmq_diversity.postgres.user = vernemq
vmq_diversity.postgres.password = vernemq
vmq_diversity.postgres.database = vernemq_db
vmq_diversity.cockroachdb.password_hash_method = cryptvmq_diversity.ssl.enabled = on##
## Default: off
##
## Acceptable values:
## - on or off
vmq_diversity.auth_postgres.enabled = off
##
## Default: localhost
##
## Acceptable values:
## - text
## vmq_diversity.postgres.host = localhost
##
## Default: 5432
##
## Acceptable values:
## - an integer
## vmq_diversity.postgres.port = 5432
##
## Default: root
##
## Acceptable values:
## - text
## vmq_diversity.postgres.user = root
##
## Default: password
##
## Acceptable values:
## - text
## vmq_diversity.postgres.password = password
##
## Default: vernemq_db
##
## Acceptable values:
## - text
## vmq_diversity.postgres.database = vernemq_db
## Specify if the postgresql driver should use TLS or not.
##
## Default: off
##
## Acceptable values:
## - on or off
vmq_diversity.postgres.ssl = off
## The cafile is used to define the path to a file containing
## the PEM encoded CA certificates that are trusted.
##
## Default:
##
## Acceptable values:
## - the path to a file
## vmq_diversity.postgres.cafile = ./etc/cafile.pem
## Set the path to the PEM encoded server certificate.
##
## Default:
##
## Acceptable values:
## - the path to a file
## vmq_diversity.postgres.certfile = ./etc/cert.pem
## Set the path to the PEM encoded key file.
##
## Default:
##
## Acceptable values:
## - the path to a file
## vmq_diversity.postgres.keyfile = ./etc/keyfile.pem
## The password hashing method to use in PostgreSQL:
##
## Default: crypt
##
## Acceptable values:
## - one of: crypt, bcrypt
vmq_diversity.postgres.password_hash_method = cryptCREATE EXTENSION pgcrypto;
CREATE TABLE vmq_auth_acl
(
mountpoint character varying(10) NOT NULL,
client_id character varying(128) NOT NULL,
username character varying(128) NOT NULL,
password character varying(128),
publish_acl json,
subscribe_acl json,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
);WITH x AS (
SELECT
''::text AS mountpoint,
'test-client'::text AS client_id,
'test-user'::text AS username,
'123'::text AS password,
gen_salt('bf')::text AS salt,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS publish_acl,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS subscribe_acl
)
INSERT INTO vmq_auth_acl (mountpoint, client_id, username, password, publish_acl, subscribe_acl)
SELECT
x.mountpoint,
x.client_id,
x.username,
crypt(x.password, x.salt),
publish_acl,
subscribe_acl
FROM x;vmq_diversity.auth_cockroachdb.enabled = on
vmq_diversity.cockroachdb.host = 127.0.0.1
vmq_diversity.cockroachdb.port = 26257
vmq_diversity.cockroachdb.user = vernemq
vmq_diversity.cockroachdb.password = vernemq
vmq_diversity.cockroachdb.database = vernemq_db
vmq_diversity.cockroachdb.ssl = on
vmq_diversity.cockroachdb.password_hash_method = bcryptCREATE TABLE vmq_auth_acl
(
mountpoint character varying(10) NOT NULL,
client_id character varying(128) NOT NULL,
username character varying(128) NOT NULL,
password character varying(128),
publish_acl json,
subscribe_acl json,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
);WITH x AS (
SELECT
''::text AS mountpoint,
'test-client1'::text AS client_id,
'test-user1'::text AS username,
'$2a$12$97PlnSsouvCV7HaxDPV80.EXfsKM4Fg7DAwWhSbGJ6O5CpNep20n2'::text AS hash,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS publish_acl,
'[{"pattern": "a/b/c"}, {"pattern": "c/b/#"}]'::json AS subscribe_acl
)
INSERT INTO vmq_auth_acl (mountpoint, client_id, username, password, publish_acl, subscribe_acl)
SELECT
x.mountpoint,
x.client_id,
x.username,
x.hash,
publish_acl,
subscribe_acl
FROM x;vmq_diversity.auth_mysql.enabled = on
vmq_diversity.mysql.host = 127.0.0.1
vmq_diversity.mysql.port = 3306
vmq_diversity.mysql.user = vernemq
vmq_diversity.mysql.password = vernemq
vmq_diversity.mysql.database = vernemq_db
vmq_diversity.mysql.password_hash_method = passwordCREATE TABLE vmq_auth_acl
(
mountpoint VARCHAR(10) NOT NULL,
client_id VARCHAR(128) NOT NULL,
username VARCHAR(128) NOT NULL,
password VARCHAR(128),
publish_acl TEXT,
subscribe_acl TEXT,
CONSTRAINT vmq_auth_acl_primary_key PRIMARY KEY (mountpoint, client_id, username)
)INSERT INTO vmq_auth_acl
(mountpoint, client_id, username,
password, publish_acl, subscribe_acl)
VALUES
('', 'test-client', 'test-user', PASSWORD('123'),
'[{"pattern":"a/b/c"},{"pattern":"c/b/#"}]',
'[{"pattern":"a/b/c"},{"pattern":"c/b/#"}]');vmq_diversity.auth_mongodb.enabled = on
vmq_diversity.mongodb.host = 127.0.0.1
vmq_diversity.mongodb.port = 27017
# vmq_diversity.mongodb.login =
# vmq_diversity.mongodb.password =
# vmq_diversity.mongodb.database =vmq_diversity.auth_mongodb.enabled = on
vmq_diversity.mongodb.srv = vmqtest.08t1b.mongodb.net
vmq_diversity.mongodb.login = username
vmq_diversity.mongodb.password = secretpass
# vmq_diversity.mongodb.database =db.vmq_acl_auth.insert({
mountpoint: '',
client_id: 'test-client',
username: 'test-user',
passhash: '$2a$12$WDzmynWSMRVzfszQkB2MsOWYQK9qGtfjVpO8iBdimTOjCK/u6CzJK',
publish_acl: [
{pattern: 'a/b/c'},
{pattern: 'a/+/d'}
],
subscribe_acl: [
{pattern: 'a/#'}
]
})vmq_diversity.auth_redis.enabled = on
vmq_diversity.redis.host = 127.0.0.1
vmq_diversity.redis.port = 6379
# vmq_diversity.redis.user = "default"
# vmq_diversity.redis.password =
# vmq_divserity.redis.database = 0SET "[\"\",\"test-client\",\"test-user\"]" "{\"passhash\":\"$2a$12$WDzmynWSMRVzfszQkB2MsOWYQK9qGtfjVpO8iBdimTOjCK/u6CzJK\",\"subscribe_acl\":[{\"pattern\":\"a/+/c\"}]}"$ vmq-admin script load path=/Abs/Path/To/script.lua$ vmq-admin script reload path=/Abs/Path/To/script.lua-- the function that implements the auth_on_register/5 hook
-- the reg object contains everything required to authenticate a client
-- reg.addr: IP Address e.g. "192.168.123.123"
-- reg.port: Port e.g. 12345
-- reg.mountpoint: Mountpoint e.g. ""
-- reg.username: UserName e.g. "test-user"
-- reg.password: Password e.g. "test-password"
-- reg.client_id: ClientId e.g. "test-id"
-- reg.clean_session: CleanSession Flag true
function my_auth_on_register(reg)
-- only allow clients connecting from this host
if reg.addr == "192.168.10.10" then
--only allow clients with this username
if reg.username == "demo-user" then
-- only allow clients with this clientid
if reg.client_id == "demo-id" then
return true
end
end
end
return false
end
-- the function that implements the auth_on_publish/6 hook
-- the pub object contains everything required to authorize a publish request
-- pub.mountpoint: Mountpoint e.g. ""
-- pub.client_id: ClientId e.g. "test-id"
-- pub.topic: Publish Topic e.g. "test/topic"
-- pub.qos: Publish QoS e.g. 1
-- pub.payload: Payload e.g. "hello world"
-- pub.retain: Retain flag e.g. false
function my_auth_on_publish(pub)
-- only allow publishes on this topic with QoS = 0
if pub.topic == "demo/topic" and pub.qos == 0 then
return true
end
return false
end
-- the function that implements the auth_on_subscribe/3 hook
-- the sub object contains everything required to authorize a subscribe request
-- sub.mountpoint: Mountpoint e.g. ""
-- sub.client_id: ClientId e.g. "test-id"
-- sub.topics: A list of Topic/QoS Pairs e.g. { {"topic/1", 0}, {"topic/2, 1} }
function my_auth_on_subscribe(sub)
local topic = sub.topics[1]
if topic then
-- only allow subscriptions for the topic "demo/topic" with QoS = 0
if topic[1] == "demo/topic" and topic[2] == 0 then
return true
end
end
return false
end
-- the hooks table specifies which hooks this plugin is implementing
hooks = {
auth_on_register = my_auth_on_register,
auth_on_publish = my_auth_on_publish,
auth_on_subscribe = my_auth_on_subscribe
}mysql.ensure_pool(config)mysql.execute(pool_id, stmt, args...)postgres.ensure_pool(config)postgres.execute(pool_id, stmt, args...)mongodb.ensure_pool(config)mongodb.insert(pool_id, collection, doc_or_docs)mongodb.update(pool_id, collection, selector, doc)mongodb.delete(pool_id, collection, selector)mongodb.find(pool_id, collection, selector, args)mongodb.next(cursor)mongodb.take(cursor, nr_of_docs)mongodb.close(cursor)mongodb.find_one(pool_id, collection, selector, args)redis.ensure_pool(config)redis.cmd(pool_id, command, args...)memcached.ensure_pool(config)http.ensure_pool(config)http.get(pool_id, url, body, headers)
http.put(pool_id, url, body, headers)
http.post(pool_id, url, body, headers)
http.delete(pool_id, url, body, headers)response = {
status = HTTP_STATUS_CODE,
headers = Lua Table containing response headers,
ref = Client Ref
}http.body(client_ref)json.encode(val)json.decode(json_string)log.info(log_string)
log.error(log_string)
log.warning(log_string)
log.debug(log_string)vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://127.0.0.1/myendpointsvmq_webhooks.mywebhook1.no_payload = on$ vmq-admin webhooks register hook=auth_on_register endpoint="http://localhost"$ vmq-admin webhooks show$ vmq-admin webhooks deregister hook=auth_on_register endpoint="http://localhost"$ vmq-admin webhooks cache show{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"password": "password",
"mountpoint": "",
"client_id": "clientid",
"clean_session": false
}{
"result": "ok"
}{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000,
"retry_interval": 20000
}
}{
"result": "next"
}{
"result": {
"error": "not_allowed"
}
}{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}{
"result": "ok"
}{
"result": "ok",
"topics":
[{"topic": "rewritten/topic",
"qos": 0}]
}{
"result": "next"
}{
"result": { "error": "some error message" }
}{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}{
"result": "ok"
}{
"result": "ok",
"modifiers": {
"topic": "rewritten/topic",
"qos": 2,
"payload": "rewritten payload",
"retain": true
}
}{
"result": "next"
}{
"result": { "error": "some error message" }
}{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"mountpoint": "",
"client_id": "clientid"
}{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topics":
["a/b", "c/d"]
}{
"result": "ok",
"topics":
["rewritten/topic"]
}{
"result": "next"
}{
"result": { "error": "some error message" }
}{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topic": "a/b",
"payload": "hello"
}{
"result": "ok",
"modifiers":
{
"topic": "rewritten/topic",
"payload": "rewritten payload"
}
}{
"result": "next"
}{
"client_id": "clientid",
"mountpoint": "",
"qos": "1",
"topic": "sometopic",
"payload": "payload",
"retain": false
}{
"client_id": "clientid",
"mountpoint": ""
}{
"client_id": "clientid",
"mountpoint": ""
}{
"client_id": "clientid",
"mountpoint": ""
}{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"password": "password",
"clean_start": true,
"properties": {}
}{
"result": "ok"
}{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000
}
}{
"result": "next"
}{
"result": {
"error": "not_allowed"
}
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMA==",
"p_authentication_method": "AUTH_METHOD"
}
} "modifiers": {
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMQ==",
"p_authentication_method": "AUTH_METHOD"
}
"reason_code": 0
},
"result": "ok"
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
}
],
"properties": {}
}{
"result": "ok"
}{
"modifiers": {
"topics": [
{
"qos": 2,
"topic": "rewritten/topic"
},
{
"qos": 135,
"topic": "forbidden/topic"
}
]
},
"result": "ok"
}{
"result": "next"
}{
"result": {
"error": "not_allowed"
}
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "some/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}{
"result": "ok"
}{
"modifiers": {
"topic": "rewritten/topic"
},
"result": "ok"
}{
"result": "next"
}{
"result": {
"error": "not_allowed"
}
}{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"properties": {
}
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "test/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
},
{
"topic": "test/topic",
"qos": 128
}
],
"properties": {
}
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
"test/topic"
],
"properties": {
}
}{
"modifiers": {
"topics": [
"rewritten/topic"
]
},
"result": "ok"
}{
"result": "next"
}{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topic": "test/topic",
"payload": "message payload",
"properties": {
}
}{
"result": "ok"
}{
"result": "next"
}import web
import json
urls = ('/.*', 'hooks')
app = web.application(urls, globals())
class hooks:
def POST(self):
# fetch hook and request data
hook = web.ctx.env.get('HTTP_VERNEMQ_HOOK')
data = json.loads(web.data())
# print the hook and request data to the console
print
print 'hook:', hook
print ' data: ', data
# dispatch to appropriate function based on the hook.
if hook == 'auth_on_register':
return handle_auth_on_register(data)
elif hook == 'auth_on_publish':
return handle_auth_on_publish(data)
elif hook == 'auth_on_subscribe':
return handle_auth_on_subscribe(data)
else:
web.ctx.status = 501
return "not implemented"
def handle_auth_on_register(data):
# only allow user 'joe' with password 'secret', reject all others.
if "joe" == data['username']:
if "secret" == data['password']:
return json.dumps({'result': 'ok'})
return json.dumps({'result': {'error': 'not allowed'}})
def handle_auth_on_publish(data):
# accept all publish requests
return json.dumps({'result': 'ok'})
def handle_auth_on_subscribe(data):
# accept all subscribe requests
return json.dumps({'result': 'ok'})
if __name__ == '__main__':
app.run()