fix(v3): python client synchronous and batch writing, remove async (#5141)
parent
e0bafcd9a1
commit
d1aac04309
|
@ -2,7 +2,6 @@
|
|||
title: Python client library for InfluxDB v3
|
||||
list_title: Python
|
||||
description: The InfluxDB v3 `influxdb3-python` Python client library integrates with Python scripts and applications to write and query data stored in an InfluxDB Cloud Dedicated database.
|
||||
external_url: https://github.com/InfluxCommunity/influxdb3-python
|
||||
menu:
|
||||
influxdb_cloud_dedicated:
|
||||
name: Python
|
||||
|
@ -12,6 +11,8 @@ influxdb/cloud-dedicated/tags: [python, gRPC, SQL, client libraries]
|
|||
weight: 201
|
||||
aliases:
|
||||
- /influxdb/cloud-dedicated/reference/client-libraries/v3/pyinflux3/
|
||||
related:
|
||||
- /influxdb/cloud-dedicated/query-data/execute-queries/troubleshoot/
|
||||
list_code_example: >
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
|
@ -19,7 +20,7 @@ list_code_example: >
|
|||
# Instantiate an InfluxDB client configured for a database
|
||||
|
||||
client = InfluxDBClient3(
|
||||
"https://us-east-1-1.aws.{{< influxdb/host >}}",
|
||||
"https://{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME",
|
||||
token="DATABASE_TOKEN")
|
||||
|
||||
|
@ -86,10 +87,9 @@ from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
|
|||
```
|
||||
|
||||
- [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB
|
||||
- `influxdb_client_3.Point`: a class for constructing a time series data
|
||||
- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data
|
||||
point
|
||||
- `influxdb_client_3.WriteOptions`: a class for configuring client
|
||||
write options.
|
||||
- [`influxdb_client_3.WriteOptions`](#class-writeoptions): a class for configuring client
|
||||
|
||||
## API reference
|
||||
|
||||
|
@ -136,11 +136,16 @@ Initializes and returns an `InfluxDBClient3` instance with the following:
|
|||
If `None`, writes are [synchronous](#synchronous-writing).
|
||||
- **flight_client_options** (dict): Options to use when querying InfluxDB.
|
||||
|
||||
#### Non-batch writing
|
||||
|
||||
When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks.
|
||||
|
||||
#### Batch writing
|
||||
|
||||
In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB.
|
||||
The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`.
|
||||
If a write fails, the client reschedules the write according to the `write_client_options` retry options.
|
||||
When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions.
|
||||
|
||||
To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example:
|
||||
|
||||
|
@ -165,11 +170,23 @@ To use batching mode, pass `WriteOptions` as key-value pairs to the client `writ
|
|||
{{< tabs-wrapper >}}
|
||||
{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from influxdb_client_3 import Point, InfluxDBClient3
|
||||
|
||||
points = [
|
||||
Point("home")
|
||||
.tag("room", "Kitchen")
|
||||
.field("temp", 25.3)
|
||||
.field('hum', 20.2)
|
||||
.field('co', 9),
|
||||
Point("home")
|
||||
.tag("room", "Living Room")
|
||||
.field("temp", 24.0)
|
||||
.field('hum', 20.0)
|
||||
.field('co', 5)]
|
||||
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
database="DATABASE_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write(record=points)
|
||||
|
@ -179,17 +196,17 @@ with InfluxDBClient3(token="DATABASE_TOKEN",
|
|||
|
||||
#### Synchronous writing
|
||||
|
||||
In synchronous mode, the client sends write requests immediately (not batched)
|
||||
and doesn't retry failed writes.
|
||||
|
||||
To use synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`.
|
||||
Synchronous mode is the default mode for writing data (in batch and non-batch modes).
|
||||
To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`.
|
||||
|
||||
### Examples
|
||||
|
||||
##### Initialize a client
|
||||
|
||||
The following example initializes a client for writing and querying the database.
|
||||
Given `_write_client_options=None`, the client uses synchronous mode when writing data.
|
||||
The following example initializes a client for writing and querying data in a {{% product-name %}} database.
|
||||
When writing or querying, the client waits synchronously for the response.
|
||||
|
||||
Given `client.write_client_options` doesn't set `WriteOptions`, the client uses the default [non-batch writing](#non-batch-writing) mode.
|
||||
|
||||
{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
|
@ -197,7 +214,6 @@ from influxdb_client_3 import InfluxDBClient3
|
|||
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
org="",
|
||||
database="DATABASE_NAME")
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
@ -207,36 +223,30 @@ Replace the following:
|
|||
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/)
|
||||
- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/) with read permissions on the specified database
|
||||
|
||||
|
||||
##### Initialize a client for batch writing
|
||||
|
||||
The following example shows how to initialize a client for writing and querying the database.
|
||||
The following example shows how to initialize a client for batch writing data to the database.
|
||||
When writing data, the client uses batch mode with default options and
|
||||
invokes the callback function defined for the response status (`success`, `error`, or `retry`).
|
||||
invokes the callback function, if specified, for the response status (success, error, or retryable error).
|
||||
|
||||
{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}}
|
||||
```py
|
||||
from influxdb_client_3 import Point,
|
||||
InfluxDBClient3,
|
||||
from influxdb_client_3 import InfluxDBClient3,
|
||||
write_client_options,
|
||||
WriteOptions,
|
||||
InfluxDBError
|
||||
|
||||
points = [Point("home").tag("room", "Kitchen").field("temp", 25.3),
|
||||
Point("home").tag("room", "Living Room").field("temp", 18.4)]
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate WriteOptions for batching
|
||||
write_options = WriteOptions()
|
||||
|
@ -245,11 +255,14 @@ invokes the callback function defined for the response status (`success`, `error
|
|||
retry_callback=retry,
|
||||
WriteOptions=write_options)
|
||||
|
||||
# Use the with...as statement to ensure the file is properly closed and resources
|
||||
# are released.
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write(record=points)
|
||||
client.write_file(file='./home.csv',
|
||||
timestamp_column='time', tag_columns=["room"])
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
|
@ -264,8 +277,6 @@ Replace the following:
|
|||
|
||||
Writes a record or a list of records to InfluxDB.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
|
||||
#### Syntax
|
||||
|
||||
```py
|
||||
|
@ -275,7 +286,12 @@ write(self, record=None, **kwargs)
|
|||
#### Parameters
|
||||
|
||||
- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`.
|
||||
- **`write_precision=`**: `"ms"`, `"s"`, `"us"`, `"ns"`. Default is `"ns"`.
|
||||
- **`database`**: The database to write to. Default is to write to the database specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -288,7 +304,7 @@ from influxdb_client_3 import InfluxDBClient3
|
|||
|
||||
points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000"
|
||||
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN", host="cluster-id.influxdb.io",
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME")
|
||||
|
||||
client.write(record=points, write_precision="s")
|
||||
|
@ -350,8 +366,7 @@ client.write(record=points, write_precision="s")
|
|||
### InfluxDBClient3.write_file
|
||||
|
||||
Writes data from a file to InfluxDB.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
Execution is synchronous.
|
||||
|
||||
#### Syntax
|
||||
|
||||
|
@ -378,6 +393,12 @@ write_file(self, file, measurement_name=None, tag_columns=[],
|
|||
- **`tag_columns`**: A list containing the names of tag columns.
|
||||
Columns not included in the list and not specified by another parameter are assumed to be fields.
|
||||
- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`.
|
||||
- **`database`**: The database to write to. Default is to write to the database specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -394,17 +415,16 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options,
|
|||
class BatchingCallback(object):
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate the callbacks
|
||||
callback = BatchingCallback()
|
||||
|
@ -420,18 +440,17 @@ write_options = WriteOptions(batch_size=500,
|
|||
wco = write_client_options(success_callback=callback.success,
|
||||
error_callback=callback.error,
|
||||
retry_callback=callback.retry,
|
||||
WriteOptions=write_options
|
||||
)
|
||||
WriteOptions=write_options)
|
||||
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
database="DATABASE_NAME",
|
||||
_write_client_options=wco) as client:
|
||||
|
||||
client.write_file(file='./out.csv', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"])
|
||||
client.write_file(file='./home.csv', timestamp_column='time',
|
||||
tag_columns=["room"])
|
||||
|
||||
client.write_file(file='./out.json', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"], date_unit='ns')
|
||||
client.write_file(file='./home.json', timestamp_column='time',
|
||||
tag_columns=["room"], date_unit='ns')
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
|
@ -450,14 +469,14 @@ query(self, query, language="sql", mode="all", **kwargs )
|
|||
|
||||
- **`query`** (str): the SQL or InfluxQL to execute.
|
||||
- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`.
|
||||
- **`mode`** (str): specifies what the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader) will return.
|
||||
Default is `"all"`.
|
||||
- `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`.
|
||||
- `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`.
|
||||
Returns `null` if there are no more messages.
|
||||
- `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`.
|
||||
- `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader).
|
||||
- `schema`: Return the schema for all record batches in the stream.
|
||||
- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader).
|
||||
Default is `"all"`.
|
||||
- `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`.
|
||||
- `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`.
|
||||
Returns `null` if there are no more messages.
|
||||
- `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`.
|
||||
- `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader).
|
||||
- `schema`: Return the schema for all record batches in the stream.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -539,7 +558,7 @@ The following example shows how to create a `Point`, and then write the
|
|||
data to InfluxDB.
|
||||
|
||||
```py
|
||||
point = Point("home").tag("room", "Kitchen").field("temp", 72)
|
||||
point = Point("home").tag("room", "Living Room").field("temp", 72)
|
||||
client.write(point)
|
||||
```
|
||||
|
||||
|
@ -620,10 +639,9 @@ client = InfluxDBClient3(
|
|||
|
||||
## Constants
|
||||
|
||||
- `influxdb_client_3.ASYNCHRONOUS`: Represents asynchronous write mode
|
||||
- `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode
|
||||
- `influxdb_client_3.WritePrecision`: Enum class that represents write precision
|
||||
|
||||
## Exceptions
|
||||
|
||||
- `influxdb_client_3.InfluxDBError`: Exception class raised for InfluxDB-related errors
|
||||
- `influxdb_client_3.InfluxDBError`: Exception class raised for InfluxDB-related errors
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
title: Python client library for InfluxDB v3
|
||||
list_title: Python
|
||||
description: The InfluxDB v3 `influxdb3-python` Python client library integrates with Python scripts and applications to write and query data stored in an InfluxDB Cloud Serverless bucket.
|
||||
external_url: https://github.com/InfluxCommunity/influxdb3-python
|
||||
menu:
|
||||
influxdb_cloud_serverless:
|
||||
name: Python
|
||||
|
@ -12,6 +11,8 @@ weight: 201
|
|||
influxdb/cloud-serverless/tags: [python, gRPC, SQL, client libraries]
|
||||
aliases:
|
||||
- /influxdb/cloud-serverless/reference/client-libraries/v3/pyinflux3/
|
||||
related:
|
||||
- /influxdb/cloud-serverless/query-data/execute-queries/troubleshoot/
|
||||
list_code_example: >
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
|
@ -44,7 +45,7 @@ to line protocol, and batch write line protocol data to InfluxDB HTTP APIs.
|
|||
|
||||
InfluxDB v3 client libraries can query {{% product-name %}} using SQL or InfluxQL.
|
||||
The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client
|
||||
in a convenient InfluxDB v3 interface for executing SQL queries, requesting
|
||||
in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting
|
||||
server metadata, and retrieving data from {{% product-name %}} using the Flight protocol with gRPC.
|
||||
|
||||
<!-- TOC -->
|
||||
|
@ -86,7 +87,7 @@ from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
|
|||
```
|
||||
|
||||
- [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB
|
||||
- `influxdb_client_3.Point`: a class for constructing a time series data
|
||||
- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data
|
||||
point
|
||||
- `influxdb_client_3.WriteOptions`: a class for configuring client
|
||||
write options.
|
||||
|
@ -125,22 +126,27 @@ __init__(self, host=None, org=None, database=None, token=None,
|
|||
|
||||
Initializes and returns an `InfluxDBClient3` instance with the following:
|
||||
|
||||
- A singleton _write client_ configured for writing to the database.
|
||||
- A singleton _Flight client_ configured for querying the database.
|
||||
- A singleton _write client_ configured for writing to the database (bucket).
|
||||
- A singleton _Flight client_ configured for querying the database (bucket).
|
||||
|
||||
### Parameters
|
||||
|
||||
- **org** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)).
|
||||
- **database** (str): The database to use for writing and querying.
|
||||
- **database** (str): The database (bucket) to use for writing and querying.
|
||||
- **write_client_options** (dict): Options to use when writing to InfluxDB.
|
||||
If `None`, writes are [synchronous](#synchronous-writing).
|
||||
- **flight_client_options** (dict): Options to use when querying InfluxDB.
|
||||
|
||||
#### Non-batch writing
|
||||
|
||||
When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks.
|
||||
|
||||
#### Batch writing
|
||||
|
||||
In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB.
|
||||
The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`.
|
||||
If a write fails, the client reschedules the write according to the `write_client_options` retry options.
|
||||
When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions.
|
||||
|
||||
To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example:
|
||||
|
||||
|
@ -163,9 +169,21 @@ To use batching mode, pass `WriteOptions` as key-value pairs to the client `writ
|
|||
3. Initialize the client, setting the `write_client_options` argument to `wco` from the preceding step.
|
||||
|
||||
{{< tabs-wrapper >}}
|
||||
{{% code-placeholders "BUCKET_(NAME|TOKEN)|API_TOKEN" %}}
|
||||
{{% code-placeholders "(BUCKET|API)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from influxdb_client_3 import Point, InfluxDBClient3
|
||||
|
||||
points = [
|
||||
Point("home")
|
||||
.tag("room", "Kitchen")
|
||||
.field("temp", 25.3)
|
||||
.field('hum', 20.2)
|
||||
.field('co', 9),
|
||||
Point("home")
|
||||
.tag("room", "Living Room")
|
||||
.field("temp", 24.0)
|
||||
.field('hum', 20.0)
|
||||
.field('co', 5)]
|
||||
|
||||
with InfluxDBClient3(token="API_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
|
@ -179,17 +197,15 @@ with InfluxDBClient3(token="API_TOKEN",
|
|||
|
||||
#### Synchronous writing
|
||||
|
||||
In synchronous mode, the client sends write requests immediately (not batched)
|
||||
and doesn't retry failed writes.
|
||||
|
||||
To use synchronous mode, set `write_client_options=None` or `write_client_options.write_type=WriteType.synchronous`.
|
||||
Synchronous mode is the default mode for writing data (in batch and non-batch modes).
|
||||
To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`.
|
||||
|
||||
### Examples
|
||||
|
||||
#### Initialize a client
|
||||
|
||||
The following example initializes a client for writing and querying the bucket.
|
||||
Given `write_client_options=None`, the client will use synchronous mode when writing data.
|
||||
The following example initializes a client for writing and querying data in a {{% product-name %}} database (bucket).
|
||||
When writing or querying, the client waits synchronously for the response.
|
||||
|
||||
{{% code-placeholders "BUCKET_(NAME|TOKEN)|API_TOKEN" %}}
|
||||
```py
|
||||
|
@ -197,7 +213,6 @@ from influxdb_client_3 import InfluxDBClient3
|
|||
|
||||
client = InfluxDBClient3(token="API_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
org="",
|
||||
database="BUCKET_NAME")
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
@ -207,36 +222,30 @@ Replace the following:
|
|||
- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [bucket](/influxdb/cloud-serverless/admin/buckets/)
|
||||
- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) with read permissions on the specified bucket
|
||||
|
||||
##### Initialize a client for batch writing
|
||||
|
||||
#### Initialize a client for batch writing
|
||||
|
||||
The following example shows how to initialize a client for writing and querying the database.
|
||||
The following example shows how to initialize a client for batch writing data to the bucket.
|
||||
When writing data, the client uses batch mode with default options and
|
||||
invokes the callback function defined for the response status (`success`, `error`, or `retry`).
|
||||
invokes the callback function, if specified, for the response status (success, error, or retryable error).
|
||||
|
||||
{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}}
|
||||
```py
|
||||
from influxdb_client_3 import Point,
|
||||
InfluxDBClient3,
|
||||
from influxdb_client_3 import InfluxDBClient3,
|
||||
write_client_options,
|
||||
WriteOptions,
|
||||
InfluxDBError
|
||||
|
||||
points = [Point("home").tag("room", "Kitchen").field("temp", 25.3),
|
||||
Point("home").tag("room", "Living Room").field("temp", 18.4)]
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate WriteOptions for batching
|
||||
write_options = WriteOptions()
|
||||
|
@ -245,11 +254,14 @@ invokes the callback function defined for the response status (`success`, `error
|
|||
retry_callback=retry,
|
||||
WriteOptions=write_options)
|
||||
|
||||
# Use the with...as statement to ensure the file is properly closed and resources
|
||||
# are released.
|
||||
with InfluxDBClient3(token="API_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="ignored", database="BUCKET_NAME",
|
||||
org="", database="BUCKET_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write(record=points)
|
||||
client.write_file(file='./home.csv',
|
||||
timestamp_column='time', tag_columns=["room"])
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
|
@ -258,14 +270,12 @@ Replace the following:
|
|||
- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [bucket](/influxdb/cloud-serverless/admin/buckets/)
|
||||
- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) with read permissions on the specified bucket
|
||||
|
||||
### Instance methods
|
||||
### InfluxDBClient3 instance methods
|
||||
|
||||
### InfluxDBClient3.write
|
||||
|
||||
Writes a record or a list of records to InfluxDB.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
|
||||
#### Syntax
|
||||
|
||||
```py
|
||||
|
@ -275,7 +285,12 @@ write(self, record=None, **kwargs)
|
|||
#### Parameters
|
||||
|
||||
- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`.
|
||||
- **`write_precision=`**: `"ms"`, `"s"`, `"us"`, `"ns"`. Default is `"ns"`.
|
||||
- **`database`**: The database (bucket) to write to. Default is to write to the database specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -349,8 +364,7 @@ client.write(record=points, write_precision="s")
|
|||
### InfluxDBClient3.write_file
|
||||
|
||||
Writes data from a file to InfluxDB.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
Execution is synchronous.
|
||||
|
||||
#### Syntax
|
||||
|
||||
|
@ -377,6 +391,12 @@ write_file(self, file, measurement_name=None, tag_columns=[],
|
|||
- **`tag_columns`**: A list containing the names of tag columns.
|
||||
Columns not included in the list and not specified by another parameter are assumed to be fields.
|
||||
- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`.
|
||||
- **`database`**: The database (bucket) to write to. Default is to write to the database (bucket) specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -393,17 +413,16 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options,
|
|||
class BatchingCallback(object):
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate the callbacks
|
||||
callback = BatchingCallback()
|
||||
|
@ -419,18 +438,17 @@ write_options = WriteOptions(batch_size=500,
|
|||
wco = write_client_options(success_callback=callback.success,
|
||||
error_callback=callback.error,
|
||||
retry_callback=callback.retry,
|
||||
WriteOptions=write_options
|
||||
)
|
||||
WriteOptions=write_options)
|
||||
|
||||
with InfluxDBClient3(token="API_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="BUCKET_NAME",
|
||||
database="BUCKET_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write_file(file='./out.csv', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"])
|
||||
client.write_file(file='./home.csv', timestamp_column='time',
|
||||
tag_columns=["room"])
|
||||
|
||||
client.write_file(file='./out.json', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"], date_unit='ns')
|
||||
client.write_file(file='./home.json', timestamp_column='time',
|
||||
tag_columns=["room"], date_unit='ns')
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
|
@ -449,14 +467,14 @@ query(self, query, language="sql", mode="all", **kwargs )
|
|||
|
||||
- **`query`** (str): the SQL or InfluxQL to execute.
|
||||
- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`.
|
||||
- **`mode`** (str): specifies what the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader) will return.
|
||||
Default is `"all"`.
|
||||
- `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`.
|
||||
- `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`.
|
||||
Returns `null` if there are no more messages.
|
||||
- `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`.
|
||||
- `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader).
|
||||
- `schema`: Return the schema for all record batches in the stream.
|
||||
- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader).
|
||||
Default is `"all"`.
|
||||
- `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`.
|
||||
- `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`.
|
||||
Returns `null` if there are no more messages.
|
||||
- `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`.
|
||||
- `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader).
|
||||
- `schema`: Return the schema for all record batches in the stream.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -538,7 +556,7 @@ The following example shows how to create a `Point`, and then write the
|
|||
data to InfluxDB.
|
||||
|
||||
```py
|
||||
point = Point("home").tag("room", "Kitchen").field("temp", 72)
|
||||
point = Point("home").tag("room", "Living Room").field("temp", 72)
|
||||
client.write(point)
|
||||
```
|
||||
|
||||
|
@ -610,19 +628,18 @@ cert = fh.read()
|
|||
fh.close()
|
||||
|
||||
client = InfluxDBClient3(
|
||||
token="DATABASE_TOKEN",
|
||||
token="API_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME",
|
||||
database="BUCKET_NAME",
|
||||
flight_client_options=flight_client_options(
|
||||
tls_root_certs=cert))
|
||||
```
|
||||
|
||||
## Constants
|
||||
|
||||
- `influxdb_client_3.ASYNCHRONOUS`: Represents asynchronous write mode
|
||||
- `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode
|
||||
- `influxdb_client_3.WritePrecision`: Enum class that represents write precision
|
||||
|
||||
## Exceptions
|
||||
|
||||
- `influxdb_client_3.InfluxDBError`: Exception raised for InfluxDB-related errors
|
||||
- `influxdb_client_3.InfluxDBError`: Exception class raised for InfluxDB-related errors
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
title: Python client library for InfluxDB v3
|
||||
list_title: Python
|
||||
description: The InfluxDB v3 `influxdb3-python` Python client library integrates with Python scripts and applications to write and query data stored in an InfluxDB Clustered database.
|
||||
external_url: https://github.com/InfluxCommunity/influxdb3-python
|
||||
menu:
|
||||
influxdb_clustered:
|
||||
name: Python
|
||||
|
@ -12,6 +11,8 @@ influxdb/clustered/tags: [python, gRPC, SQL, Flight SQL, client libraries]
|
|||
weight: 201
|
||||
aliases:
|
||||
- /influxdb/clustered/reference/client-libraries/v3/pyinflux3/
|
||||
related:
|
||||
- /influxdb/cloud-serverless/query-data/execute-queries/troubleshoot/
|
||||
list_code_example: >
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
|
@ -19,7 +20,7 @@ list_code_example: >
|
|||
# Instantiate an InfluxDB client configured for a database
|
||||
|
||||
client = InfluxDBClient3(
|
||||
"https://us-east-1-1.aws.{{< influxdb/host >}}",
|
||||
"https://{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME",
|
||||
token="DATABASE_TOKEN")
|
||||
|
||||
|
@ -47,6 +48,19 @@ The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.fli
|
|||
in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting
|
||||
server metadata, and retrieving data from {{% product-name %}} using the Flight protocol with gRPC.
|
||||
|
||||
<!-- TOC -->
|
||||
|
||||
- [Installation](#installation)
|
||||
- [Importing the module](#importing-the-module)
|
||||
- [API reference](#api-reference)
|
||||
- [Classes](#classes)
|
||||
- [Class InfluxDBClient3](#class-influxdbclient3)
|
||||
- [Class Point](#class-point)
|
||||
- [Class WriteOptions](#class-writeoptions)
|
||||
- [Functions](#functions)
|
||||
- [Constants](#constants)
|
||||
- [Exceptions](#exceptions)
|
||||
|
||||
## Installation
|
||||
|
||||
Install the client library and dependencies using `pip`:
|
||||
|
@ -72,9 +86,10 @@ Import specific class methods from the module:
|
|||
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
|
||||
```
|
||||
|
||||
- [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): an interface for [initializing a client](#initialization) and interacting with InfluxDB
|
||||
- [`influxdb_client_3.Point`](#class-point): an interface for constructing a time series data point
|
||||
- [`influxdb_client_3.WriteOptions`](#class-writeoptions): an interface for configuring write options for the client
|
||||
- [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB
|
||||
- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data
|
||||
point
|
||||
- [`influxdb_client_3.WriteOptions`](#class-writeoptions): a class for configuring client
|
||||
|
||||
## API reference
|
||||
|
||||
|
@ -113,52 +128,66 @@ Initializes and returns an `InfluxDBClient3` instance with the following:
|
|||
- A singleton _write client_ configured for writing to the database.
|
||||
- A singleton _Flight client_ configured for querying the database.
|
||||
|
||||
### Attributes
|
||||
### Parameters
|
||||
|
||||
- **`_org`** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)).
|
||||
- **`_database`** (str): The database to use for writing and querying.
|
||||
- **`_write_client_options`** (dict): Options passed to the write client for writing to InfluxDB.
|
||||
- **org** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)).
|
||||
- **database** (str): The database to use for writing and querying.
|
||||
- **write_client_options** (dict): Options to use when writing to InfluxDB.
|
||||
If `None`, writes are [synchronous](#synchronous-writing).
|
||||
- **`_flight_client_options`** (dict): Options passed to the Flight client for querying InfluxDB.
|
||||
- **flight_client_options** (dict): Options to use when querying InfluxDB.
|
||||
|
||||
#### Non-batch writing
|
||||
|
||||
When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks.
|
||||
|
||||
#### Batch writing
|
||||
|
||||
In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB.
|
||||
The client writes the batch to InfluxDB after reaching `_write_client_options.batch_size` or `_write_client_options.flush_interval`.
|
||||
If a write fails, the client reschedules the write according to the `_write_client_options` retry options.
|
||||
The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`.
|
||||
If a write fails, the client reschedules the write according to the `write_client_options` retry options.
|
||||
When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions.
|
||||
|
||||
To use batching mode, pass an instance of `WriteOptions` to the `InfluxDBClient3.write_client_options` argument--for example:
|
||||
To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example:
|
||||
|
||||
1. Instantiate `WriteOptions()` with defaults or with
|
||||
`WriteOptions.write_type=WriteType.batching`.
|
||||
|
||||
```py
|
||||
from influxdb_client_3 import WriteOptions
|
||||
|
||||
# Initialize batch writing default options (batch size, flush, and retry).
|
||||
# Returns an influxdb_client.client.write_api.WriteOptions object.
|
||||
# Create a WriteOptions instance for batch writes with batch size, flush, and retry defaults.
|
||||
write_options = WriteOptions()
|
||||
```
|
||||
|
||||
2. Call the [`write_client_options()` function](#function-write_client_optionskwargs) to create an options object that uses `write_options` from the preceding step.
|
||||
2. Pass `write_options` from the preceding step to the `write_client_options` function.
|
||||
|
||||
```py
|
||||
from influxdb_client_3 import write_client_options
|
||||
|
||||
# Create a dict of keyword arguments from WriteOptions
|
||||
wco = write_client_options(WriteOptions=write_options)
|
||||
```
|
||||
|
||||
3. Initialize the client, setting the `write_client_options` argument to the `wco` object from the preceding step.
|
||||
The output is a dict with `WriteOptions` key-value pairs.
|
||||
|
||||
3. Initialize the client, setting the `write_client_options` argument to `wco` from the preceding step.
|
||||
|
||||
{{< tabs-wrapper >}}
|
||||
{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from influxdb_client_3 import Point, InfluxDBClient3
|
||||
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
write_client_options=wco) as client:
|
||||
points = [
|
||||
Point("home")
|
||||
.tag("room", "Kitchen")
|
||||
.field("temp", 25.3)
|
||||
.field('hum', 20.2)
|
||||
.field('co', 9),
|
||||
Point("home")
|
||||
.tag("room", "Living Room")
|
||||
.field("temp", 24.0)
|
||||
.field('hum', 20.0)
|
||||
.field('co', 5)]
|
||||
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write(record=points)
|
||||
```
|
||||
|
@ -167,17 +196,17 @@ with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
|||
|
||||
#### Synchronous writing
|
||||
|
||||
In synchronous mode, the client sends write requests immediately (not batched)
|
||||
and doesn't retry failed writes.
|
||||
|
||||
To use synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`.
|
||||
Synchronous mode is the default mode for writing data (in batch and non-batch modes).
|
||||
To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`.
|
||||
|
||||
### Examples
|
||||
|
||||
##### Initialize a client
|
||||
|
||||
The following example initializes a client for writing and querying the database.
|
||||
Given `_write_client_options=None`, the client uses synchronous mode when writing data.
|
||||
The following example initializes a client for writing and querying data in a {{% product-name %}} database.
|
||||
When writing or querying, the client waits synchronously for the response.
|
||||
|
||||
Given `client.write_client_options` doesn't set `WriteOptions`, the client uses the default [non-batch writing](#non-batch-writing) mode.
|
||||
|
||||
{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
|
@ -192,38 +221,33 @@ client = InfluxDBClient3(token="DATABASE_TOKEN",
|
|||
|
||||
Replace the following:
|
||||
|
||||
- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/) with read permissions on the databases you want to query
|
||||
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/)
|
||||
- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/) with read permissions on the specified database
|
||||
|
||||
##### Initialize a client for batch writing
|
||||
|
||||
The following example shows how to initialize a client for writing and querying the database.
|
||||
The following example shows how to initialize a client for batch writing data to the database.
|
||||
When writing data, the client uses batch mode with default options and
|
||||
invokes the callback function for the response.
|
||||
invokes the callback function, if specified, for the response status (success, error, or retryable error).
|
||||
|
||||
{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}}
|
||||
```py
|
||||
from influxdb_client_3 import Point,
|
||||
InfluxDBClient3,
|
||||
from influxdb_client_3 import InfluxDBClient3,
|
||||
write_client_options,
|
||||
WriteOptions,
|
||||
InfluxDBError
|
||||
|
||||
points = [Point("home").tag("room", "Kitchen").field("temp", 25.3),
|
||||
Point("home").tag("room", "Living Room").field("temp", 18.4)]
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate WriteOptions for batching
|
||||
write_options = WriteOptions()
|
||||
|
@ -232,39 +256,27 @@ invokes the callback function for the response.
|
|||
retry_callback=retry,
|
||||
WriteOptions=write_options)
|
||||
|
||||
# Use the with...as statement to ensure the file is properly closed and resources
|
||||
# are released.
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
write_client_options=wco) as client:
|
||||
|
||||
client.write(record=points)
|
||||
client.write_file(file='./home.csv',
|
||||
timestamp_column='time', tag_columns=["room"])
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
Replace the following:
|
||||
|
||||
- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}:
|
||||
Your InfluxDB token with READ permissions on the databases you want to query.
|
||||
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}:
|
||||
The name of your InfluxDB database.
|
||||
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/)
|
||||
- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/) with read permissions on the specified database
|
||||
|
||||
### InfluxDBClient3 instance methods
|
||||
|
||||
<!-- TOC -->
|
||||
- [InfluxDBClient3.write](#influxdbclient3write)
|
||||
- [InfluxDBClient3.write_file](#influxdbclient3write_file)
|
||||
- [InfluxDBClient3.query](#influxdbclient3query)
|
||||
- [InfluxDBClient3.close](#influxdbclient3close)
|
||||
|
||||
### InfluxDBClient3.write
|
||||
|
||||
Writes a record or a list of records to InfluxDB.
|
||||
A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
|
||||
##### Attributes
|
||||
|
||||
- **`write_precision=`**: `"ms"`, `"s"`, `"us"`, `"ns"`. Default is `"ns"`.
|
||||
|
||||
#### Syntax
|
||||
|
||||
|
@ -272,6 +284,16 @@ The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ m
|
|||
write(self, record=None, **kwargs)
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`.
|
||||
- **`database`**: The database to write to. Default is to write to the database specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
##### Write a line protocol string
|
||||
|
@ -280,11 +302,11 @@ write(self, record=None, **kwargs)
|
|||
{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
|
||||
|
||||
points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000"
|
||||
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME", org="")
|
||||
database="DATABASE_NAME")
|
||||
|
||||
client.write(record=points, write_precision="s")
|
||||
```
|
||||
|
@ -293,7 +315,9 @@ client.write(record=points, write_precision="s")
|
|||
|
||||
##### Write data using points
|
||||
|
||||
The following example shows how to create a [`Point`](#class-point), and then write the
|
||||
The `influxdb_client_3.Point` class provides an interface for constructing a data
|
||||
point for a measurement and setting fields, tags, and the timestamp for the point.
|
||||
The following example shows how to create a `Point` object, and then write the
|
||||
data to InfluxDB.
|
||||
|
||||
```py
|
||||
|
@ -307,7 +331,7 @@ client.write(point)
|
|||
|
||||
`InfluxDBClient3` can serialize a dictionary object into line protocol.
|
||||
If you pass a `dict` to `InfluxDBClient3.write`, the client expects the `dict` to have the
|
||||
following _point_ data structure:
|
||||
following _point_ attributes:
|
||||
|
||||
- **measurement** (str): the measurement name
|
||||
- **tags** (dict): a dictionary of tag key-value pairs
|
||||
|
@ -320,22 +344,21 @@ data to InfluxDB.
|
|||
{{% influxdb/custom-timestamps %}}
|
||||
{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
|
||||
# Using point dictionary structure
|
||||
points = {
|
||||
"measurement": "home",
|
||||
"tags": {"room": "Kitchen", "sensor": "K001"},
|
||||
"fields": {"temp": 72.2, "hum": 36.9, "co": 4},
|
||||
"time": 1641067200
|
||||
}
|
||||
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME",
|
||||
org="")
|
||||
|
||||
client.write(record=points, write_precision="s")
|
||||
# Using point dictionary structure
|
||||
points = {
|
||||
"measurement": "home",
|
||||
"tags": {"room": "Kitchen", "sensor": "K001"},
|
||||
"fields": {"temp": 72.2, "hum": 36.9, "co": 4},
|
||||
"time": 1641067200
|
||||
}
|
||||
|
||||
client = InfluxDBClient3(token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
database="DATABASE_NAME")
|
||||
|
||||
client.write(record=points, write_precision="s")
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
{{% /influxdb/custom-timestamps %}}
|
||||
|
@ -343,8 +366,7 @@ data to InfluxDB.
|
|||
### InfluxDBClient3.write_file
|
||||
|
||||
Writes data from a file to InfluxDB.
|
||||
|
||||
The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ mode](#synchronous-writing).
|
||||
Execution is synchronous.
|
||||
|
||||
#### Syntax
|
||||
|
||||
|
@ -352,20 +374,18 @@ The client can write using [_batching_ mode](#batch-writing) or [_synchronous_ m
|
|||
write_file(self, file, measurement_name=None, tag_columns=[],
|
||||
timestamp_column='time', **kwargs)
|
||||
```
|
||||
##### Attributes
|
||||
#### Parameters
|
||||
|
||||
- **`file`**: A file containing records to write to InfluxDB.
|
||||
The following file formats and filename extensions are supported.
|
||||
The filename must end with one of the supported extensions.
|
||||
For more information about encoding and formatting data, see the documentation for each supported format.
|
||||
- **`file`** (str): A path to a file containing records to write to InfluxDB.
|
||||
The filename must end with one of the following supported extensions.
|
||||
For more information about encoding and formatting data, see the documentation for each supported format:
|
||||
|
||||
| Supported format | File name extension |
|
||||
|:---------------------------------------------------------------------------|:--------------------|
|
||||
| [Feather](https://arrow.apache.org/docs/python/feather.html) | `.feather` |
|
||||
| [Parquet](https://arrow.apache.org/docs/python/parquet.html) | `.parquet` |
|
||||
| [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) | `.csv` |
|
||||
| [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) | `.json` |
|
||||
| [ORC](https://arrow.apache.org/docs/python/orc.html) | `.orc` |
|
||||
- `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html)
|
||||
- `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html)
|
||||
- `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html)
|
||||
- `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html)
|
||||
- `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html)
|
||||
|
||||
- **`measurement_name`**: Defines the measurement name for records in the file.
|
||||
The specified value takes precedence over `measurement` and `iox::measurement` columns in the file.
|
||||
If no value is specified for the parameter, and a `measurement` column exists in the file, the `measurement` column value is used for the measurement name.
|
||||
|
@ -373,6 +393,12 @@ write_file(self, file, measurement_name=None, tag_columns=[],
|
|||
- **`tag_columns`**: A list containing the names of tag columns.
|
||||
Columns not included in the list and not specified by another parameter are assumed to be fields.
|
||||
- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`.
|
||||
- **`database`**: The database to write to. Default is to write to the database specified for the client.
|
||||
- **`**kwargs`**: Additional write options--for example:
|
||||
- **`write_precision`**: _Optional_. Default is `"ns"`.
|
||||
Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`.
|
||||
- **`write_client_options`**: _Optional_.
|
||||
Specifies callback functions and options for [batch writing](#batch-writing) mode.
|
||||
|
||||
#### Examples
|
||||
|
||||
|
@ -389,17 +415,16 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options,
|
|||
class BatchingCallback(object):
|
||||
|
||||
# Define callbacks for write responses
|
||||
def success(self, conf: (str, str, str)):
|
||||
"""BATCH WRITE SUCCESS."""
|
||||
print(f"Wrote batch: {conf}")
|
||||
def success(self, data: str):
|
||||
print(f"Successfully wrote batch: data: {data}")
|
||||
|
||||
def error(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE FAILURE."""
|
||||
print(f"Cannot write batch: {conf}, due to: {exception}")
|
||||
def error(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
def retry(self, conf: (str, str, str), exception: InfluxDBError):
|
||||
"""BATCH WRITE RETRY"""
|
||||
print(f"Retryable error occurs for batch: {conf}, retry: {exception}")
|
||||
def retry(self, data: str, exception: InfluxDBError):
|
||||
print(f"Failed retry writing batch: config: {self}, data: {data},
|
||||
error: {exception}")
|
||||
|
||||
# Instantiate the callbacks
|
||||
callback = BatchingCallback()
|
||||
|
@ -415,18 +440,17 @@ write_options = WriteOptions(batch_size=500,
|
|||
wco = write_client_options(success_callback=callback.success,
|
||||
error_callback=callback.error,
|
||||
retry_callback=callback.retry,
|
||||
WriteOptions=write_options
|
||||
)
|
||||
WriteOptions=write_options)
|
||||
|
||||
with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}",
|
||||
org="", database="DATABASE_NAME",
|
||||
database="DATABASE_NAME",
|
||||
_write_client_options=wco) as client:
|
||||
|
||||
client.write_file(file='./out.csv', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"])
|
||||
client.write_file(file='./home.csv', timestamp_column='time',
|
||||
tag_columns=["room"])
|
||||
|
||||
client.write_file(file='./out.json', timestamp_column='time',
|
||||
tag_columns=["provider", "machineID"], date_unit='ns')
|
||||
client.write_file(file='./home.json', timestamp_column='time',
|
||||
tag_columns=["room"], date_unit='ns')
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
|
@ -438,23 +462,70 @@ Returns all data in the query result as an Arrow table.
|
|||
#### Syntax
|
||||
|
||||
```py
|
||||
query(self, query, language="sql")
|
||||
query(self, query, language="sql", mode="all", **kwargs )
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
- **`query`** (str): the SQL or InfluxQL to execute.
|
||||
- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`.
|
||||
- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader).
|
||||
Default is `"all"`.
|
||||
- `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`.
|
||||
- `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`.
|
||||
Returns `null` if there are no more messages.
|
||||
- `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`.
|
||||
- `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader).
|
||||
- `schema`: Return the schema for all record batches in the stream.
|
||||
|
||||
#### Examples
|
||||
|
||||
##### Query using SQL
|
||||
|
||||
```py
|
||||
query = "select * from measurement"
|
||||
reader = client.query(query=query)
|
||||
table = client.query("SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'")
|
||||
# Filter columns.
|
||||
print(table.select(['room', 'temp']))
|
||||
# Use PyArrow to aggregate data.
|
||||
print(table.group_by('hum').aggregate([]))
|
||||
```
|
||||
|
||||
##### Query using InfluxQL
|
||||
|
||||
```py
|
||||
query = "select * from measurement"
|
||||
reader = client.query(query=query, language="influxql")
|
||||
query = "SELECT * FROM measurement WHERE time >= -90d"
|
||||
table = client.query(query=query, language="influxql")
|
||||
# Filter columns.
|
||||
print(table.select(['room', 'temp']))
|
||||
```
|
||||
|
||||
##### Read all data from the stream and return a pandas DataFrame
|
||||
|
||||
```py
|
||||
query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'"
|
||||
pd = client.query(query=query, mode="pandas")
|
||||
# Print the pandas DataFrame formatted as a Markdown table.
|
||||
print(pd.to_markdown())
|
||||
```
|
||||
|
||||
##### View the schema for all batches in the stream
|
||||
|
||||
```py
|
||||
table = client.query('''
|
||||
SELECT *
|
||||
FROM measurement
|
||||
WHERE time >= now() - INTERVAL '90 days''''
|
||||
)
|
||||
# Get the schema attribute value.
|
||||
print(table.schema)
|
||||
```
|
||||
|
||||
##### Retrieve the result schema and no data
|
||||
|
||||
```py
|
||||
query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'"
|
||||
schema = client.query(query=query, mode="schema")
|
||||
print(schema)
|
||||
```
|
||||
|
||||
### InfluxDBClient3.close
|
||||
|
@ -487,7 +558,7 @@ The following example shows how to create a `Point`, and then write the
|
|||
data to InfluxDB.
|
||||
|
||||
```py
|
||||
point = Point("home").tag("room", "Kitchen").field("temp", 72)
|
||||
point = Point("home").tag("room", "Living Room").field("temp", 72)
|
||||
client.write(point)
|
||||
```
|
||||
|
||||
|
@ -561,7 +632,6 @@ fh.close()
|
|||
client = InfluxDBClient3(
|
||||
token="DATABASE_TOKEN",
|
||||
host="{{< influxdb/host >}}",
|
||||
org="",
|
||||
database="DATABASE_NAME",
|
||||
flight_client_options=flight_client_options(
|
||||
tls_root_certs=cert))
|
||||
|
@ -569,35 +639,9 @@ client = InfluxDBClient3(
|
|||
|
||||
## Constants
|
||||
|
||||
- `influxdb_client_3.ASYNCHRONOUS`: Represents asynchronous write mode
|
||||
- `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode
|
||||
- `influxdb_client_3.WritePrecision`: Enum class that represents write precision
|
||||
|
||||
## Exceptions
|
||||
|
||||
- `influxdb_client_3.InfluxDBError`: Exception class raised for InfluxDB-related errors
|
||||
- [`pyarrow._flight.FlightUnavailableError`](#flightunavailableerror-could-not-get-default-pem-root-certs): Exception class raised for Flight gRPC errors
|
||||
|
||||
### Query exceptions
|
||||
|
||||
#### FlightUnavailableError: Could not get default pem root certs
|
||||
|
||||
[Specify the root certificate path](#specify-the-root-certificate-path) for the Flight gRPC client.
|
||||
|
||||
Non-POSIX-compliant systems (such as Windows) need to specify the root certificates in SslCredentialsOptions for the gRPC client, since the defaults are only configured for POSIX filesystems.
|
||||
|
||||
If unable to locate a root certificate for _gRPC+TLS_, the Flight client returns errors similar to the following:
|
||||
|
||||
```sh
|
||||
UNKNOWN:Failed to load file... filename:"/usr/share/grpc/roots.pem",
|
||||
children:[UNKNOWN:No such file or directory
|
||||
...
|
||||
Could not get default pem root certs...
|
||||
|
||||
pyarrow._flight.FlightUnavailableError: Flight returned unavailable error,
|
||||
with message: empty address list: . gRPC client debug context:
|
||||
UNKNOWN:empty address list
|
||||
...
|
||||
```
|
||||
|
||||
For more information about gRPC SSL/TLS client-server authentication, see [Using client-side SSL/TLS](https://grpc.io/docs/guides/auth/#using-client-side-ssltls) in the [gRPC.io Authentication guide](https://grpc.io/docs/guides/auth/).
|
Loading…
Reference in New Issue