Enable the MQTT STREAM through the configuration file
In this section, we will describe how to enable MQTT streams by configuring the nanomq.conf configuration file.
Configuration
Here is a simple example MQTT STREAM configuration.
exchange_client.mq1 {
# # exchanges contains multiple MQ exchanger
exchange {
# # MQTT Topic for filtering messages and saving to queue
topic = "exchange/topic1",
# # MQ name
name = "exchange_no1",
# # MQ category. Only support Ringbus for now
ringbus = {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 1000,
fullOp = 3
}
}
}
- exchange_client: The client hosting the exchange.
- exchange: It is used to receive a specific topic message and put the message data into ringbus.
- ringbus: For storing mqtt messages, its capacity can be set, and it also provides fullOp, which is the behavior when the ringbus is full.
- cap: ringbus capacity size.
- fullOp: Four behaviors are currently provided when the ringbus is full.Note: If fullOp=3 you need to enable PARQUET compilation and configure parquet in nanomq.conf.
0: RB_FULL_NONE: When the ringbus is full, no action is taken and the message enqueue fail. 1: RB_FULL_DROP: When the ringbus is full, the data in the ringbus is discarded. 2: RB_FULL_RETURN: When the ringbus is full, the data in the ringbus is taken out and returned to the aio. 3: RB_FULL_FILE(Relying on parquet): When the ringbus is full, the data in the ringbus is written to the file.
# Enable PARQUET compile options cmake -DENABLE_PARQUET=ON ../
# Configure parquet in nanomq.conf parquet { compress = gzip encryption { key_id = kf key = "0123456789012345" type = AES_GCM_CTR_V1 } dir = "/tmp/nanomq-parquet" file_name_prefix = "nanomq-parquet-" file_count = 5 }
Testing the MQTT STREAM
This is followed by an example of fullOp of 3 for ringbus to test the offloading capability of mqtt stream. (Remember to enable parquet compile).
- Configuration
...
exchange_client.mq1 {
# # exchanges contains multiple MQ exchanger
exchange {
# # MQTT Topic for filtering messages and saving to queue
topic = "topic1",
# # MQ name
name = "exchange_no1",
# # MQ category. Only support Ringbus for now
ringbus = {
# # ring buffer name
name = "ringbus",
# # max length of ring buffer (msg count)
cap = 10000,
fullOp = 3
}
}
}
...
parquet {
compress = gzip
encryption {
key_id = kf
key = "0123456789012345"
type = AES_GCM_CTR_V1
}
dir = "/tmp/nanomq-parquet"
file_name_prefix = "nanomq-parquet-"
file_count = 5
}
...
- Start NanoMQ
nanomq start --conf=/etc/nanomq.conf
- Publish mqtt messages mqtt messages are sent using emqtt_bench
emqtt_bench pub -p 1883 -i 1 -I 1 -c 1 -s 10 -t "topic1" -V 5 -L 30005
- The NanoMQ log NanoMQ prints a log to indicate that the disk write was successful.
ringbuffer_parquet_cb: ringbus: parquet write to file: /tmp/nanomq-parquet/nanomq-parquet--79060194635079892~370618470429406089.parquet success
ringbuffer_parquet_cb: ringbus: parquet write to file: /tmp/nanomq-parquet/nanomq-parquet--370618474724373386~375711962341231626.parquet success
ringbuffer_parquet_cb: ringbus: parquet write to file: /tmp/nanomq-parquet/nanomq-parquet--375711966636198923~380805454253057163.parquet success
- File storage path
/tmp/nanomq-parquet/nanomq-parquet--370618474724373386~375711962341231626.parquet
/tmp/nanomq-parquet/nanomq-parquet--375711966636198923~380805454253057163.parquet
/tmp/nanomq-parquet/nanomq-parquet--79060194635079892~370618470429406089.parquet