MQTT/native cloud ingest (#4203)

* add MQTT draft PR

* add updates

* edits

* Updates to MQTT

* Update content/influxdb/v2.3/write-data/no-code/load-data.md

Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>

* Update content/influxdb/v2.3/write-data/no-code/load-data.md

Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>

* Update content/influxdb/v2.3/write-data/no-code/load-data.md

Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>

* Update content/influxdb/v2.3/write-data/no-code/load-data.md

Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>

* Update content/influxdb/v2.3/write-data/troubleshoot.md

Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>

* Add JSON example and update name to remove "cloud"

Co-authored-by: noramullen1 <nora@influxdata.com>
Co-authored-by: noramullen1 <42354779+noramullen1@users.noreply.github.com>
Co-authored-by: Jason Stirnaman <jstirnaman@influxdata.com>
pull/4260/head^2
kelseiv 2022-07-27 10:19:58 -07:00 committed by GitHub
parent 10901ac104
commit 1f6cbee243
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 172 additions and 3 deletions

View File

@ -4,7 +4,7 @@ seotitle: Load data source in UI
list_title: Load data source in UI
weight: 101
description: >
Load data from sources in the InfluxDB user interface (UI). Choose from popular client libraries (such as Python, Ruby, Scala, and more!) or load data with a Telegraf plugin (like MQTT Consumer, MySQL, File, and many more!).
Load data from sources in the InfluxDB user interface (UI). Choose from popular client libraries (such as Python, Ruby, Scala, and more!), load data with a Telegraf plugin (like MQTT Consumer, MySQL, File, and many more!), and InfluxDB Cloud users have the option to load data natively by subscribing to an MQTT topic.
menu:
influxdb_2_3:
name: Load data source in UI
@ -17,6 +17,7 @@ Load data from the following sources in the InfluxDB user interface (UI):
- [Line protocol](#load-data-using-line-protocol)
- [Client libraries](#load-data-from-a-client-library-in-the-ui)
- [Telegraf plugins](#load-data-from-a-telegraf-plugin-in-the-ui)
- {{% cloud-only %}}[Cloud native subscriptions](#set-up-a-cloud-native-subscription){{% /cloud-only %}}
### Load CSV or line protocol in UI
@ -76,4 +77,142 @@ Load CSV or line protocol data by uploading a file or pasting the data manually
_See [Start Telegraf](/influxdb/cloud/write-data/no-code/use-telegraf/auto-config/#start-telegraf) below for detailed information about what each step does._
9. Once Telegraf is running, click **Listen for Data** to confirm Telegraf is successfully sending data to InfluxDB.
Once confirmed, a **Connection Found!** message appears.
10. Click **Finish**. Your Telegraf configuration name and the associated bucket name appears in the list of Telegraf configurations.
10. Click **Finish**. Your Telegraf configuration name and the associated bucket name appear in the list of Telegraf configurations.
{{% cloud-only %}}
### Set up a native subscription
To ingest MQTT (Message Queuing Telemetry Transport) data into InfluxDB, do the following to set up a Cloud native subscription:
1. [Subscribe to an MQTT topic](#subscribe-to-an-mqtt-topic) in InfluxDB Cloud by configuring an MQTT broker, and specifying the topic(s) to subscribe to.
2. [Define parsing rules](#define-parsing-rules) for JSON or regex data (line protocol requires no configuration).
{{% note %}}
For troubleshooting help, see [Troubleshoot MQTT ingest errors](/influxdb/cloud/write-data/troubleshoot/#troubleshoot-mqtt-ingest-errors/)
{{% /note %}}
#### Subscribe to an MQTT topic
1. In the navigation menu on the left, click **Load Data** > **Native Subscriptions**.
{{< nav-icon "data" >}}
2. Click **+ Create Subscription**.
3. On the **Setting Up - MQTT Subscriber** page, under **Connect to Broker**, enter the following:
- Subscription Name
- Description
- Protocol
- Hostname or IP address (hostname or URL of the MQTT broker)
- Port (TCP/IP port number the MQTT broker uses)
- Security details. Choose one of the following:
- **None**
- **Basic** (username/password)
- **Certificate**
4. Under **Subscribe to a topic**, in the **Topic** field, enter the MQTT topic name to subscribe to. Note, MQTT brokers typically support wildcard subscriptions with the wildcard characters `+` and `#`.
- To subscribe to all topics in a directory, use `+`. For example, if an `iotdevices` directory includes two directories called `asia` and `europe`, to subscribe to a `sensor` topic in either directory, use `iotdevices/+/sensors` to subscribe to `iotdevices/asia/sensors`, and `iotdevices/europe/sensors`.
- To subscribe to all topics in a directory, use `#`. For example, `iotdevices/#` subscribes to all topics in the `iotdevices` directory. For more information about MQTT subscriptions and wildcards, see [the MQTT specification for Topic Names and Topic Filters](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901241).
5. Under **Write Destination**, select an existing InfluxDB bucket to write data to or click **+ Create bucket**. For more information, see [Create a bucket](/influxdb/cloud/organizations/buckets/create-bucket/).
#### Define parsing rules
{{% note %}}
JSON parsing is faster and more efficient than string parsing. We recommend using JSON parsing when your MQTT messages are in JSON format.
{{% /note %}}
- Under **Define Data Parsing Rules**, select one of the following MQTT data formats:
- **Line protocol** (no configuration required)
- **JSON**. To define parsing rules to ingest JSON data, click the **JSON** tab below.
- **String**. To define parsing rules to ingest String data, click the **String** tab below.
{{< tabs-wrapper >}}
{{% tabs %}}
[Line protocol](#)
[JSON](#)
[String](#)
{{% /tabs %}}
<!-------------------------------- BEGIN Line protocol -------------------------------->
{{% tab-content %}}
Use line protocol to write data into InfluxDB. Line protocol doesn't require any parsing or configuration.
Select a **Timepstamp precision** from the dropdown menu:
- **MS**: Milliseconds
- **S**: Seconds
- **US**: Microseconds
- **NS**: Nanoseconds
{{% /tab-content %}}
<!-------------------------------- BEGIN JSON -------------------------------->
{{% tab-content %}}
Associate **JSON** key/value pairs with **InfluxDB elements** (measurements, timestamps, fields, or tags) using parsing rules.
{{% expand "Example JSON" %}}
```
{
"device_type":"temperature_sensor",
"device_id":2036,
“model_id”:”KN24683”,
"temperature":25.0,
"time":1653998899010000000
"error_state":"in_error"
}
```
{{% /expand %}}
1. On the **Setting Up - MQTT Connector** page, under **Data Format**, do the following:
1. (Optional) In the **JSON path to timestamp** field, specify the path in the MQTT message to the JSON key that holds the timestamp: for the example above, `"time":1653998899010000000`. Otherwise, InfluxDB automatically assigns a timestamp when messages are ingested into InfluxDB.
{{% warn %}}
**Important**: Configure the timestamp format that matches the format in your messages.
{{% /warn %}}
2. Configure the JSON parsing rules:
1. Under **Measurement**, enter the **JSON path** (start with `$.`) to assign the InfluxDB measurement key. For the above example, enter `$.temperature_sensor`.
3. Select the **Data Type** for the measurement.
4. Specify the JSON paths to tag and field names as needed, and then select the data type for the tag or field. At least one field is required. For the above example, add fields with the JSON paths `$.temperature` and `$.error_state` and a tag with the path `$.error_state`.
Note that JSON paths with arrays are supported, for example, `$.device_information.errors_encountered[0].error_number`.
{{% /tab-content %}}
<!-------------------------------- BEGIN String -------------------------------->
{{% tab-content %}}
Associate **String** key/value pairs with **InfluxDB elements** (measurements, timestamps, fields, or tags).
1. On the **Setting Up - MQTT Connector** page, under **Data Format**, do the following:
1. (Optional) In the **Regex pattern to find timestamp** field, enter the regex (regular expression) to find the timestamp in the MQTT message. Otherwise, InfluxDB automatically assigns a timestamp when messages are ingested into InfluxDB.
{{% note %}}
**Note**: Parsing rules only support finding one value at a time.
{{% /note %}}
For example, if the timestamp string is `time=1653998899010000000`, use a regex to find the string you're looking for and capture the timestamp:
- `time=([\s\S]*?)\n` (captures value after `=` until the EOL (end of line) is reached)
- `time=([\s\S]*?),` (captures value after `=` until comma is reached)
{{% warn %}}
**Important**: Configure the timestamp format that matches the format in your messages.
{{% /warn %}}
2. Under **Measurement**, enter the **JSON path** (start with `$.`) to assign the InfluxDB measurement key, for example, `$.device_id` or `$.device_information.device_id` for a nested measurement key.
3. Select the **Data Type** for the measurement.
4. Enter **Tag** and **Field**. At least one field is required. For tag and field names, use the regex to find the tag or field name, and what to capture. For example:
- `device_id=\d\d\d\d-([0-9][0-9][0-9][0-9])` (matches on the `device_id=` and also matches on the first four digits of the device id, and then captures the four digits.
5. Select the data type for the tag or field. JSON paths with arrays are supported, for example, `$.device_information.errors_encountered[0].error_number`.
{{% /tab-content %}}
{{< /tabs-wrapper >}}
{{% /cloud-only %}}

View File

@ -28,6 +28,7 @@ Learn how to avoid unexpected results and recover from errors when writing to In
- [Handle `write` and `delete` responses](#handle-write-and-delete-responses)
- [Troubleshoot failures](#troubleshoot-failures)
- [Troubleshoot rejected points](#troubleshoot-rejected-points)
- [Troubleshoot MQTT ingest errors](troubleshoot-mqtt-ingest-errors)
{{% /cloud-only %}}
@ -321,4 +322,33 @@ InfluxDB logs the following `rejected_points` entry to the `_monitoring` bucket:
|:------------------|:-------|:-------|:-------------------|:--------------|:---------|:------------|:----------------------------------|:---------|
| rejected_points | count | 1 | a7d5558b880a93da | temperature | String | airSensors | field type mismatch with schema | Float |
{{% cloud-only %}}
## Troubleshoot MQTT ingest errors
If you're having issues ingesting [MQTT data](/influxdb/v2.3/write-data/no-code/load-data/#set-up-a-cloud-native-subscription/) into InfluxDB, try the following:
- [Validate parsing rules and subscription path](#validate-parsing-rules-and-subscription-path).
- [View the `_monitoring` bucket](#view-the-monitoring-bucket) to check for parsing errors.
- [Set up a deadman check](#set-up-a-deadman-check).
### Validate parsing rules and subscription path
Verify that your parsing rules dont have any syntax or other errors by copying and pasting your MQTT message and your parsing rules into a tool. For JSON, we recommend [JSON Path](https://jsonpath.com/). For Regex, we recommend [Regex 101](https://regex101.com/).
### View the `_monitoring`bucket
If your parsing rules and subscription path are valid, check for InfluxDB Cloud errors from executing parsing rules in the `_monitoring` bucket.
1. In the navigation menu on the left, click **Data Explorer**.
2. Select the `_monitoring` bucket, and then the `subscription` measurement.
3. Click the CSV icon to save the contents as a CSV and view error messages.
### Set up a deadman check
If you have a steady stream of MQTT messages, we recommend setting up a deadman alert to detect when messages are not ingested into InfluxDB. See [Create a deadman check](/influxdb/cloud/monitor-alert/checks/create/#deadman-check/).
{{% /cloud-only %}}