docs: async-js (#4581)
* docs: async-js * docs: added missing async keyword * docs: fixed indentation * docs: Apply suggestions from code review Co-authored-by: Jason Stirnaman <stirnamanj@gmail.com> --------- Co-authored-by: kelseiv <47797004+kelseiv@users.noreply.github.com> Co-authored-by: Jason Stirnaman <stirnamanj@gmail.com>pull/4811/head
parent
fd1cf647da
commit
61c88ca2b1
|
@ -14,7 +14,7 @@ aliases:
|
||||||
|
|
||||||
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
||||||
|
|
||||||
The following example sends a Flux query to an InfluxDB bucket and outputs rows from an observable table.
|
The following example sends a Flux query to an InfluxDB bucket and outputs rows as a JavaScript _asynchronous iterable_ object.
|
||||||
|
|
||||||
## Before you begin
|
## Before you begin
|
||||||
|
|
||||||
|
@ -56,25 +56,21 @@ The following example sends a Flux query to an InfluxDB bucket and outputs rows
|
||||||
```
|
```
|
||||||
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
||||||
|
|
||||||
4. Use the `queryRows()` method of the query client to query InfluxDB.
|
4. Use the `iterateRows()` method of the query client to query InfluxDB.
|
||||||
`queryRows()` takes a Flux query and an [RxJS **Observer**](http://reactivex.io/rxjs/manual/overview.html#observer) object.
|
`iterateRows()` takes a Flux query and returns the [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) of metadata and rows as an asynchronous iterable (`AsyncIterable<Row>`).
|
||||||
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an [RxJS **Observable**](http://reactivex.io/rxjs/manual/overview.html#observable).
|
The following example shows how to write an asynchronous function that uses the `iterateRows()` method to query a bucket and uses the JavaScript `for await...of` statement to iterate over the query results:
|
||||||
`queryRows()` subscribes your observer to the observable.
|
|
||||||
Finally, the observer logs the rows from the response to the terminal.
|
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const observer = {
|
const myQuery = async () => {
|
||||||
next(row, tableMeta) {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(values)
|
||||||
console.log(
|
console.log(
|
||||||
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
myQuery()
|
||||||
queryApi.queryRows(fluxQuery, observer)
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
### Complete example
|
### Complete example
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ aliases:
|
||||||
|
|
||||||
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
||||||
|
|
||||||
The following example sends a Flux query to an InfluxDB bucket and outputs rows from an observable table.
|
The following example sends a Flux query to an InfluxDB bucket and outputs rows as a JavaScript _asynchronous iterable_ object.
|
||||||
|
|
||||||
## Before you begin
|
## Before you begin
|
||||||
|
|
||||||
|
@ -56,24 +56,20 @@ The following example sends a Flux query to an InfluxDB bucket and outputs rows
|
||||||
```
|
```
|
||||||
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
||||||
|
|
||||||
4. Use the `queryRows()` method of the query client to query InfluxDB.
|
4. Use the `iterateRows()` method of the query client to query InfluxDB.
|
||||||
`queryRows()` takes a Flux query and an [RxJS **Observer**](http://reactivex.io/rxjs/manual/overview.html#observer) object.
|
`iterateRows()` takes a Flux query and returns table as an asynchronous collection.
|
||||||
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an [RxJS **Observable**](http://reactivex.io/rxjs/manual/overview.html#observable).
|
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an as an AsyncIterable.
|
||||||
`queryRows()` subscribes your observer to the observable.
|
|
||||||
Finally, the observer logs the rows from the response to the terminal.
|
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const observer = {
|
const myQuery = async () => {
|
||||||
next(row, tableMeta) {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(values)
|
||||||
console.log(
|
console.log(
|
||||||
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
myQuery()
|
||||||
queryApi.queryRows(fluxQuery, observer)
|
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Complete example
|
### Complete example
|
||||||
|
|
|
@ -14,7 +14,7 @@ aliases:
|
||||||
|
|
||||||
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
||||||
|
|
||||||
The following example sends a Flux query to an InfluxDB bucket and outputs rows from an observable table.
|
The following example sends a Flux query to an InfluxDB bucket and outputs rows as asynchronous iterable.
|
||||||
|
|
||||||
## Before you begin
|
## Before you begin
|
||||||
|
|
||||||
|
@ -56,24 +56,20 @@ The following example sends a Flux query to an InfluxDB bucket and outputs rows
|
||||||
```
|
```
|
||||||
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
||||||
|
|
||||||
4. Use the `queryRows()` method of the query client to query InfluxDB.
|
4. Use the `iterateRows()` method of the query client to query InfluxDB.
|
||||||
`queryRows()` takes a Flux query and an [RxJS **Observer**](http://reactivex.io/rxjs/manual/overview.html#observer) object.
|
`iterateRows()` takes a Flux query and returns table as an asynchronous collection.
|
||||||
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an [RxJS **Observable**](http://reactivex.io/rxjs/manual/overview.html#observable).
|
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an as an AsyncIterable.
|
||||||
`queryRows()` subscribes your observer to the observable.
|
|
||||||
Finally, the observer logs the rows from the response to the terminal.
|
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const observer = {
|
const myQuery = async () => {
|
||||||
next(row, tableMeta) {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(values)
|
||||||
console.log(
|
console.log(
|
||||||
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
myQuery()
|
||||||
queryApi.queryRows(fluxQuery, observer)
|
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Complete example
|
### Complete example
|
||||||
|
|
|
@ -249,9 +249,7 @@ const influxdb = new InfluxDB({url: process.env.INFLUX_URL, token: process.env.I
|
||||||
|> last()`
|
|> last()`
|
||||||
const devices = {}
|
const devices = {}
|
||||||
|
|
||||||
return await new Promise((resolve, reject) => {
|
for await (const {row, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
queryApi.queryRows(fluxQuery, {
|
|
||||||
next(row, tableMeta) {
|
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(row)
|
||||||
const deviceId = o.deviceId
|
const deviceId = o.deviceId
|
||||||
if (!deviceId) {
|
if (!deviceId) {
|
||||||
|
@ -262,13 +260,9 @@ const influxdb = new InfluxDB({url: process.env.INFLUX_URL, token: process.env.I
|
||||||
if (!device.updatedAt || device.updatedAt < o._time) {
|
if (!device.updatedAt || device.updatedAt < o._time) {
|
||||||
device.updatedAt = o._time
|
device.updatedAt = o._time
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
error: reject,
|
|
||||||
complete() {
|
return devices
|
||||||
resolve(devices)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -284,26 +278,17 @@ for registered devices, processes the data, and returns a Promise with the resul
|
||||||
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
||||||
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
||||||
|
|
||||||
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI queryRows(query, consumer)` method.
|
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI iterateRows(query)` asynchronous method.
|
||||||
`queryRows` executes the `query` and provides the Annotated CSV result as an Observable to the `consumer`.
|
`iterateRows` executes the `query` and provides the Annotated CSV result as an AsyncIterable.
|
||||||
`queryRows` has the following TypeScript signature:
|
`iterateRows` has the following TypeScript signature:
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
queryRows(
|
iterateRows(
|
||||||
query: string | ParameterizedQuery,
|
query: string | ParameterizedQuery
|
||||||
consumer: FluxResultObserver<string[]>
|
): AsyncIterable<Row>
|
||||||
): void
|
|
||||||
```
|
```
|
||||||
|
|
||||||
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/QueryApi.ts){{% /caption %}}
|
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/af7cf3b6c1003ff0400e91bcb6a0b860668d6458/packages/core/src/QueryApi.ts){{% /caption %}}
|
||||||
|
|
||||||
The `consumer` that you provide must implement the [`FluxResultObserver` interface](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/results/FluxResultObserver.ts) and provide the following callback functions:
|
|
||||||
|
|
||||||
- `next(row, tableMeta)`: processes the next row and table metadata--for example, to prepare the response.
|
|
||||||
- `error(error)`: receives and handles errors--for example, by rejecting the Promise.
|
|
||||||
- `complete()`: signals when all rows have been consumed--for example, by resolving the Promise.
|
|
||||||
|
|
||||||
To learn more about Observers, see the [RxJS Guide](https://rxjs.dev/guide/observer).
|
|
||||||
|
|
||||||
## Create the API to register devices
|
## Create the API to register devices
|
||||||
|
|
||||||
|
|
|
@ -259,9 +259,7 @@ const influxdb = new InfluxDB({url: process.env.INFLUX_URL, token: process.env.I
|
||||||
|> last()`
|
|> last()`
|
||||||
const devices = {}
|
const devices = {}
|
||||||
|
|
||||||
return await new Promise((resolve, reject) => {
|
for await (const {row, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
queryApi.queryRows(fluxQuery, {
|
|
||||||
next(row, tableMeta) {
|
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(row)
|
||||||
const deviceId = o.deviceId
|
const deviceId = o.deviceId
|
||||||
if (!deviceId) {
|
if (!deviceId) {
|
||||||
|
@ -272,13 +270,9 @@ const influxdb = new InfluxDB({url: process.env.INFLUX_URL, token: process.env.I
|
||||||
if (!device.updatedAt || device.updatedAt < o._time) {
|
if (!device.updatedAt || device.updatedAt < o._time) {
|
||||||
device.updatedAt = o._time
|
device.updatedAt = o._time
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
error: reject,
|
|
||||||
complete() {
|
return devices
|
||||||
resolve(devices)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -294,26 +288,17 @@ for registered devices, processes the data, and returns a Promise with the resul
|
||||||
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
||||||
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
||||||
|
|
||||||
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI queryRows(query, consumer)` method.
|
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI iterateRows(query)` asynchronous method.
|
||||||
`queryRows` executes the `query` and provides the Annotated CSV result as an Observable to the `consumer`.
|
`iterateRows` executes the `query` and provides the Annotated CSV result as an AsyncIterable.
|
||||||
`queryRows` has the following TypeScript signature:
|
`iterateRows` has the following TypeScript signature:
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
queryRows(
|
iterateRows(
|
||||||
query: string | ParameterizedQuery,
|
query: string | ParameterizedQuery
|
||||||
consumer: FluxResultObserver<string[]>
|
): AsyncIterable<Row>
|
||||||
): void
|
|
||||||
```
|
```
|
||||||
|
|
||||||
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/QueryApi.ts){{% /caption %}}
|
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/af7cf3b6c1003ff0400e91bcb6a0b860668d6458/packages/core/src/QueryApi.ts){{% /caption %}}
|
||||||
|
|
||||||
The `consumer` that you provide must implement the [`FluxResultObserver` interface](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/results/FluxResultObserver.ts) and provide the following callback functions:
|
|
||||||
|
|
||||||
- `next(row, tableMeta)`: processes the next row and table metadata--for example, to prepare the response.
|
|
||||||
- `error(error)`: receives and handles errors--for example, by rejecting the Promise.
|
|
||||||
- `complete()`: signals when all rows have been consumed--for example, by resolving the Promise.
|
|
||||||
|
|
||||||
To learn more about Observers, see the [RxJS Guide](https://rxjs.dev/guide/observer).
|
|
||||||
|
|
||||||
## Create the API to register devices
|
## Create the API to register devices
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ aliases:
|
||||||
|
|
||||||
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
Use the [InfluxDB JavaScript client library](https://github.com/influxdata/influxdb-client-js) in a Node.js environment to query InfluxDB.
|
||||||
|
|
||||||
The following example sends a Flux query to an InfluxDB bucket and outputs rows from an observable table.
|
The following example sends a Flux query to an InfluxDB bucket and outputs rows as asynchronous iterable.
|
||||||
|
|
||||||
## Before you begin
|
## Before you begin
|
||||||
|
|
||||||
|
@ -56,24 +56,20 @@ The following example sends a Flux query to an InfluxDB bucket and outputs rows
|
||||||
```
|
```
|
||||||
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
Replace *`YOUR_BUCKET`* with the name of your InfluxDB bucket.
|
||||||
|
|
||||||
4. Use the `queryRows()` method of the query client to query InfluxDB.
|
4. Use the `iterateRows()` method of the query client to query InfluxDB.
|
||||||
`queryRows()` takes a Flux query and an [RxJS **Observer**](http://reactivex.io/rxjs/manual/overview.html#observer) object.
|
`iterateRows()` takes a Flux query and returns table as an asynchronous collection.
|
||||||
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an [RxJS **Observable**](http://reactivex.io/rxjs/manual/overview.html#observable).
|
The client returns [table](/{{% latest "influxdb" %}}/reference/syntax/annotated-csv/#tables) metadata and rows as an as an AsyncIterable.
|
||||||
`queryRows()` subscribes your observer to the observable.
|
|
||||||
Finally, the observer logs the rows from the response to the terminal.
|
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const observer = {
|
const myQuery = async () => {
|
||||||
next(row, tableMeta) {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(values)
|
||||||
console.log(
|
console.log(
|
||||||
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
myQuery()
|
||||||
queryApi.queryRows(fluxQuery, observer)
|
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Complete example
|
### Complete example
|
||||||
|
|
|
@ -259,26 +259,20 @@ const influxdb = new InfluxDB({url: process.env.INFLUX_URL, token: process.env.I
|
||||||
|> last()`
|
|> last()`
|
||||||
const devices = {}
|
const devices = {}
|
||||||
|
|
||||||
return await new Promise((resolve, reject) => {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
queryApi.queryRows(fluxQuery, {
|
const o = tableMeta.toObject(values)
|
||||||
next(row, tableMeta) {
|
|
||||||
const o = tableMeta.toObject(row)
|
|
||||||
const deviceId = o.deviceId
|
const deviceId = o.deviceId
|
||||||
if (!deviceId) {
|
if (!deviceId) {
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
const device = devices[deviceId] || (devices[deviceId] = {deviceId})
|
const device = devices[deviceId] || (devices[deviceId] = {deviceId})
|
||||||
device[o._field] = o._value
|
device[o._field] = o._value
|
||||||
if (!device.updatedAt || device.updatedAt < o._time) {
|
if (!device.updatedAt || device.updatedAt < o._time) {
|
||||||
device.updatedAt = o._time
|
device.updatedAt = o._time
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
error: reject,
|
|
||||||
complete() {
|
return devices
|
||||||
resolve(devices)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -294,26 +288,17 @@ for registered devices, processes the data, and returns a Promise with the resul
|
||||||
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
If you invoke the function as `getDevices()` (without a _`deviceId`_),
|
||||||
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
it retrieves all `deviceauth` points and returns a Promise with `{ DEVICE_ID: ROW_DATA }`.
|
||||||
|
|
||||||
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI queryRows(query, consumer)` method.
|
To send the query and process results, the `getDevices(deviceId)` function uses the `QueryAPI iterateRows(query)` asynchronous method.
|
||||||
`queryRows` executes the `query` and provides the Annotated CSV result as an Observable to the `consumer`.
|
`iterateRows` executes the `query` and provides the Annotated CSV result as an AsyncIterable.
|
||||||
`queryRows` has the following TypeScript signature:
|
`iterateRows` has the following TypeScript signature:
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
queryRows(
|
iterateRows(
|
||||||
query: string | ParameterizedQuery,
|
query: string | ParameterizedQuery
|
||||||
consumer: FluxResultObserver<string[]>
|
): AsyncIterable<Row>
|
||||||
): void
|
|
||||||
```
|
```
|
||||||
|
|
||||||
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/QueryApi.ts){{% /caption %}}
|
{{% caption %}}[@influxdata/influxdb-client-js QueryAPI](https://github.com/influxdata/influxdb-client-js/blob/af7cf3b6c1003ff0400e91bcb6a0b860668d6458/packages/core/src/QueryApi.ts){{% /caption %}}
|
||||||
|
|
||||||
The `consumer` that you provide must implement the [`FluxResultObserver` interface](https://github.com/influxdata/influxdb-client-js/blob/3db2942432b993048d152e0d0e8ec8499eedfa60/packages/core/src/results/FluxResultObserver.ts) and provide the following callback functions:
|
|
||||||
|
|
||||||
- `next(row, tableMeta)`: processes the next row and table metadata--for example, to prepare the response.
|
|
||||||
- `error(error)`: receives and handles errors--for example, by rejecting the Promise.
|
|
||||||
- `complete()`: signals when all rows have been consumed--for example, by resolving the Promise.
|
|
||||||
|
|
||||||
To learn more about Observers, see the [RxJS Guide](https://rxjs.dev/guide/observer).
|
|
||||||
|
|
||||||
## Create the API to register devices
|
## Create the API to register devices
|
||||||
|
|
||||||
|
|
|
@ -23,9 +23,7 @@ const INFLUX_BUCKET_AUTH = process.env.INFLUX_BUCKET_AUTH
|
||||||
|> last()`
|
|> last()`
|
||||||
const devices = {}
|
const devices = {}
|
||||||
console.log(`*** QUERY *** \n ${fluxQuery}`)
|
console.log(`*** QUERY *** \n ${fluxQuery}`)
|
||||||
return await new Promise((resolve, reject) => {
|
for await (const {row, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
queryApi.queryRows(fluxQuery, {
|
|
||||||
next(row, tableMeta) {
|
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(row)
|
||||||
const deviceId = o.deviceId
|
const deviceId = o.deviceId
|
||||||
if (!deviceId) {
|
if (!deviceId) {
|
||||||
|
@ -36,14 +34,8 @@ const INFLUX_BUCKET_AUTH = process.env.INFLUX_BUCKET_AUTH
|
||||||
if (!device.updatedAt || device.updatedAt < o._time) {
|
if (!device.updatedAt || device.updatedAt < o._time) {
|
||||||
device.updatedAt = o._time
|
device.updatedAt = o._time
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
error: reject,
|
return devices
|
||||||
complete() {
|
|
||||||
console.log(JSON.stringify(devices))
|
|
||||||
resolve(devices)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,21 +21,14 @@ const queryApi = new InfluxDB({url, token}).getQueryApi(org)
|
||||||
/** To avoid SQL injection, use a string literal for the query. */
|
/** To avoid SQL injection, use a string literal for the query. */
|
||||||
const fluxQuery = 'from(bucket:"air_sensor") |> range(start: 0) |> filter(fn: (r) => r._measurement == "temperature")'
|
const fluxQuery = 'from(bucket:"air_sensor") |> range(start: 0) |> filter(fn: (r) => r._measurement == "temperature")'
|
||||||
|
|
||||||
const fluxObserver = {
|
const myQuery = async () => {
|
||||||
next(row, tableMeta) {
|
for await (const {values, tableMeta} of queryApi.iterateRows(fluxQuery)) {
|
||||||
const o = tableMeta.toObject(row)
|
const o = tableMeta.toObject(values)
|
||||||
console.log(
|
console.log(
|
||||||
`${o._time} ${o._measurement} in ${o.region} (${o.sensor_id}): ${o._field}=${o._value}`
|
`${o._time} ${o._measurement} in '${o.location}' (${o.sensor_id}): ${o._field}=${o._value}`
|
||||||
)
|
)
|
||||||
},
|
|
||||||
error(error) {
|
|
||||||
console.error(error)
|
|
||||||
console.log('\nFinished ERROR')
|
|
||||||
},
|
|
||||||
complete() {
|
|
||||||
console.log('\nFinished SUCCESS')
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Execute a query and receive line table metadata and rows. */
|
/** Execute a query and receive line table metadata and rows. */
|
||||||
queryApi.queryRows(fluxQuery, fluxObserver)
|
myQuery()
|
||||||
|
|
Loading…
Reference in New Issue