Lua Scripting Support
Learn how to implement VerneMQ plugins using the Lua Scripting Language.
Developing VerneMQ plugins in Erlang is the most powerful way to extend the functionality of a VerneMQ broker but is a barrier developers not familiar with Erlang. For this reason we've implemented a VerneMQ extension that allows you to develop plugins using the Lua scripting language. This extension is called vmq_diversity and is shipped as part of VerneMQ.
vmq_diversity uses the Luerl Project, which is an implementation of Lua 5.2 in pure Erlang instead of the official Lua interpreter.
Moreover vmq_diversity provides simple Lua libraries to communicate with MySQL, PostgreSQL, MongoDB, and Redis within your Lua VerneMQ plugins. An additional Json encoding/decoding library as well as a generic HTTP client library provide your Lua scripts a great way to talk to external services.

Configuration

To enable vmq_diversity make sure to set:
1
plugins.vmq_diversity = on
Copied!
To specify a script to load when VerneMQ starts can be done like this:
1
vmq_diversity.myscript1.file = Path/to/Script.lua
Copied!
It is also possible to dynamically load a Lua script using vmq-admin:
1
$ vmq-admin script load path=/Abs/Path/To/script.lua
Copied!
To reload a script after a change:
1
$ vmq-admin script reload path=/Abs/Path/To/script.lua
Copied!
If the vmq_diversity plugin is enabled the folder ./share/lua folder is scanned for Lua scripts to automatically load during startup. The automatic load folder can be configured in the vernemq.conf file by changing the vmq_diversity.script setting.

Implementing a VerneMQ plugin

A VerneMQ plugin typically consists of one or more implemented VerneMQ hooks. We tried to keep the differences between the traditional Erlang based and Lua based plugins as small as possible. Please check out the Plugin Development Guide for more information about the different flows and a description of the different hooks.

Your first Lua plugin

Let's start with a first very basic example that implements a basic authentication and authorization scheme.
1
-- the function that implements the auth_on_register/5 hook
2
-- the reg object contains everything required to authenticate a client
3
-- reg.addr: IP Address e.g. "192.168.123.123"
4
-- reg.port: Port e.g. 12345
5
-- reg.mountpoint: Mountpoint e.g. ""
6
-- reg.username: UserName e.g. "test-user"
7
-- reg.password: Password e.g. "test-password"
8
-- reg.client_id: ClientId e.g. "test-id"
9
-- reg.clean_session: CleanSession Flag true
10
function my_auth_on_register(reg)
11
-- only allow clients connecting from this host
12
if reg.addr == "192.168.10.10" then
13
--only allow clients with this username
14
if reg.username == "demo-user" then
15
-- only allow clients with this clientid
16
if reg.client_id == "demo-id" then
17
return true
18
end
19
end
20
end
21
return false
22
end
23
24
-- the function that implements the auth_on_publish/6 hook
25
-- the pub object contains everything required to authorize a publish request
26
-- pub.mountpoint: Mountpoint e.g. ""
27
-- pub.client_id: ClientId e.g. "test-id"
28
-- pub.topic: Publish Topic e.g. "test/topic"
29
-- pub.qos: Publish QoS e.g. 1
30
-- pub.payload: Payload e.g. "hello world"
31
-- pub.retain: Retain flag e.g. false
32
function my_auth_on_publish(pub)
33
-- only allow publishes on this topic with QoS = 0
34
if pub.topic == "demo/topic" and pub.qos == 0 then
35
return true
36
end
37
return false
38
end
39
40
-- the function that implements the auth_on_subscribe/3 hook
41
-- the sub object contains everything required to authorize a subscribe request
42
-- sub.mountpoint: Mountpoint e.g. ""
43
-- sub.client_id: ClientId e.g. "test-id"
44
-- sub.topics: A list of Topic/QoS Pairs e.g. { {"topic/1", 0}, {"topic/2, 1} }
45
function my_auth_on_subscribe(sub)
46
local topic = sub.topics[1]
47
if topic then
48
-- only allow subscriptions for the topic "demo/topic" with QoS = 0
49
if topic[1] == "demo/topic" and topic[2] == 0 then
50
return true
51
end
52
end
53
return false
54
end
55
56
-- the hooks table specifies which hooks this plugin is implementing
57
hooks = {
58
auth_on_register = my_auth_on_register,
59
auth_on_publish = my_auth_on_publish,
60
auth_on_subscribe = my_auth_on_subscribe
61
}
Copied!
It is also possible to try the next plugin in the chain (see: Chaining) by returning next instead of false.

Data Providers

This subsection describes the data providers currently available to a Lua script. Every data provider is backed by a connection pool that has to be configured by your script.

MySQL

ensure_pool
1
mysql.ensure_pool(config)
Copied!
Ensures that the connection pool named config.pool_id is setup in the system. The config argument is a Lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 5).
    user: MySQL account name for login
    password: MySQL account password for login (in clear text).
    host: Host name for the MySQL server (default is localhost)
    port: Port that the MySQL server is listening on (default is 3306).
    database: MySQL database name.
    encoding: Encoding (default is latin1)
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
execute
1
mysql.execute(pool_id, stmt, args...)
Copied!
Executes the provided SQL statement using a connection from the connection pool.
    pool_id: Name of the connection pool to use for this statement.
    stmt: A valid MySQL statement.
    args...: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true or false or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.

PostgreSQL

ensure_pool
1
postgres.ensure_pool(config)
Copied!
Ensures that the connection pool named config.pool_id is setup in the system. The config argument is a Lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 5).
    user: Postgres account name for login
    password: Postgres account password for login (in clear text).
    host: Host name for the Postgres server (default is localhost)
    port: Port that the Postgres server is listening on (default is 5432).
    database: Postgres database name.
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
execute
1
postgres.execute(pool_id, stmt, args...)
Copied!
Executes the provided SQL statement using a connection from the connection pool.
    pool_id: Name of the connection pool to use for this statement.
    stmt: A valid MySQL statement.
    args...: A variable number of arguments can be passed to substitute statement parameters.
Depending on the statement this call returns true or false or a Lua array containing the resulting rows (as Lua tables). In case the statement cannot be executed a badarg error is thrown.

MongoDB

ensure_pool
1
mongodb.ensure_pool(config)
Copied!
Ensures that the connection pool named config.pool_id is setup in the system. The config argument is a Lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 5).
    login: MongoDB login name
    password: MongoDB password for login.
    host: Host name for the MongoDB server (default is localhost)
    port: Port that the MongoDB server is listening on (default is 27017).
    database: MongoDB database name.
    w_mode: Set mode for writes either to "unsafe" or "safe".
    r_mode: Set mode for reads either to "master" or "slave_ok".
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
insert
1
mongodb.insert(pool_id, collection, doc_or_docs)
Copied!
Insert the provided document (or list of documents) into the collection.
    pool_id: Name of the connection pool to use for this statement.
    collection: Name of a MongoDB collection.
    doc_or_docs: A single Lua table or a Lua array containing multiple Lua tables.
The provided document can set the document id using the _id key. If the id isn't provided one gets autogenerated. The call returns the inserted document(s) or throws a badarg error if it cannot insert the document(s).
update
1
mongodb.update(pool_id, collection, selector, doc)
Copied!
Updates all documents in the collection that match the given selector.
    pool_id: Name of the connection pool to use for this statement.
    collection: Name of a MongoDB collection.
    selector: A single Lua table containing the filter properties.
    doc: A single Lua table containing the update properties.
The call returns true or throws a badarg error if it cannot update the document(s).
delete
1
mongodb.delete(pool_id, collection, selector)
Copied!
Deletes all documents in the collection that match the given selector.
    pool_id: Name of the connection pool to use for this statement.
    collection: Name of a MongoDB collection.
    selector: A single Lua table containing the filter properties.
The call returns true or throws a badarg error if it cannot delete the document(s).
find
1
mongodb.find(pool_id, collection, selector, args)
Copied!
Finds all documents in the collection that match the given selector.
    pool_id: Name of the connection pool to use for this statement.
    collection: Name of a MongoDB collection.
    selector: A single Lua table containing the filter properties.
    args: A Lua table that currently supports an optional projector=LuaTable element.
The call returns a MongoDB cursor or throws a badarg error if it cannot setup the iterator.
next
1
mongodb.next(cursor)
Copied!
Fetches next available document given a cursor object obtained via find.
The call returns the next available document or false if all documents have been fetched.
take
1
mongodb.take(cursor, nr_of_docs)
Copied!
Fetches the next nr_of_docs documents given a cursor object obtained via find.
The call returns a Lua array containing the documents or false if all documents have been fetched.
close
1
mongodb.close(cursor)
Copied!
Closes and cleans up a cursor object obtained via find.
The call returns true.
find_one
1
mongodb.find_one(pool_id, collection, selector, args)
Copied!
Finds the first document in the collection that matches the given selector.
    pool_id: Name of the connection pool to use for this statement.
    collection: Name of a MongoDB collection.
    selector: A single Lua table containing the filter properties.
    args: A Lua table that currently supports an optional projector=LuaTable element.
The call returns the matched document or false in case no document was found.

Redis

ensure_pool
1
redis.ensure_pool(config)
Copied!
Ensures that the connection pool named config.pool_id is setup in the system. The config argument is a Lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 5).
    password: Redis password for login.
    host: Host name for the Redis server (default is localhost)
    port: Port that the Redis server is listening on (default is 6379).
    database: Redis database (default is 0).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
cmd
1
redis.cmd(pool_id, command, args...)
Copied!
Executes the given Redis command.
    pool_id: Name of the connection pool
    command: Redis command string.
    args...: Extra args.
This call returns a Lua table, true, false, or nil. In case it cannot parse the command a badarg error is thrown.

Memcached

ensure_pool
1
memcached.ensure_pool(config)
Copied!
Ensures that the pool named config.pool_id is setup in the system, The config argument is a lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 5).
    host: Host name for the Memcached server (default is localhost)
    port: Port that the Redis server is listening on (default is 11211).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
flush_all(pool_id)
Flushes all data from the Memcached server. Use with care.
Returns true.
get(pool_id, key)
Get data for key key.
Returns the data for the key and otherwise false.
set(pool_id, key, value, expiration)
Unconditionally set a value for a key.
    key: Key.
    value: Value.
    expiration time until key/value pair is deleted in seconds. This
    parameter is optional with default value 0 (no expiration).
Returns value.
add(pool_id, key, value, expiration)
Add a key/value pair if the key doesn't already exist.
    key: Key.
    value: Value.
    expiration time until key/value pair is deleted in seconds. This
    parameter is optional with default value 0 (no expiration).
Returns value if key didn't already exist, false otherwise.
replace(pool_id, key, value, expiration)
Replace a key/value pair if the key already exists.
    key: Key.
    value: Value.
    expiration time until key/value pair is deleted in seconds. This
    parameter is optional with default value 0 (no expiration).
Returns value if key already exists, false otherwise.
delete(pool_id, key)
Delete key and the associated value.
Returns true if the key/value pair was deleted, false otherwise

HTTP and Json Client Libraries

HTTP Client

ensure_pool
1
http.ensure_pool(config)
Copied!
Ensures that the connection pool named config.pool_id is setup in the system. The config argument is a Lua table holding the following keys:
    pool_id: Name of the connection pool (mandatory).
    size: Size of the connection pool (default is 10).
This call throws a badarg error in case it cannot setup the pool otherwise it returns true.
get, put, post, delete
1
http.get(pool_id, url, body, headers)
2
http.put(pool_id, url, body, headers)
3
http.post(pool_id, url, body, headers)
4
http.delete(pool_id, url, body, headers)
Copied!
Executes a HTTP request with the given url and args.
    url: A valid http url.
    body: optional body to be included in the request.
    headers: optional Lua table containing extra headers to be included in the request.
This call returns false in case of an error or a Lua table of the form:
1
response = {
2
status = HTTP_STATUS_CODE,
3
headers = Lua Table containing response headers,
4
ref = Client Ref
5
}
Copied!
body
1
http.body(client_ref)
Copied!
Fetches the response body given a client ref obtained via the response Lua table.
This call returns false in case of an error or the response body.
Before VerneMQ 1.5.0 it was imperative to call http.body(client_ref) after doing a http request as otherwise the underlying connection would not be released back into the connection pool, leading to connection pool exhaustion. This problem would manifest itself as connection timeout errors. This is no longer necessary, the connection will be released automatically back to the connection pool.

JSON

encode
1
json.encode(val)
Copied!
Encodes a Lua value to a JSON string.
This call returns false if it cannot encode the given value.
decode
1
json.decode(json_string)
Copied!
Decodes a JSON string to a Lua value.
This call returns false if it cannot decode the JSON string.

Logger

1
log.info(log_string)
2
log.error(log_string)
3
log.warning(log_string)
4
log.debug(log_string)
Copied!
Uses the VerneMQ logging infrastructure to log the given log_string.
Last modified 30d ago