Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add database creation/deletion for Influx 1.8 #544

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
97 changes: 88 additions & 9 deletions influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
A bucket belongs to an organization.
"""
import warnings

from influxdb_client.rest import ApiException
from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param

Expand All @@ -20,20 +20,19 @@ def __init__(self, influxdb_client):

def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None,
description=None, org=None) -> Bucket:
"""Create a bucket.
"""Create a bucket. Database creation via v1 API as fallback.

:param Bucket|PostBucketRequest bucket: bucket to create
:param bucket_name: bucket name
:param description: bucket description
:param org_id: org_id
:param bucket_name: bucket name
:param retention_rules: retention rules array or single BucketRetentionRules
:param str, Organization org: specifies the organization for create the bucket;
Take the ``ID``, ``Name`` or ``Organization``.
If not specified the default value from ``InfluxDBClient.org`` is used.
:return: Bucket
:return: Bucket or the request thread when falling back.
If the method is called asynchronously,
returns the request thread.
returns also the request thread.
"""
if retention_rules is None:
retention_rules = []
Expand All @@ -56,7 +55,48 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru
client=self._influxdb_client,
required_id=True))

return self._buckets_service.post_buckets(post_bucket_request=bucket)
try:
return self._buckets_service.post_buckets(post_bucket_request=bucket)
except ApiException:
# Fall back to v1 API if buckets are not supported
database_name = bucket_name if bucket_name is not None else bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is too general and can lead to user confusion. The better way would be to create a separate API for InfluxDB 1.8 something like influxdb_18_api.py.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing.

I would disagree to create a separate API (If I understand your suggestion right). The goal should be that code changes are not necessary regardless of what version of InfluxDB a user uses (1.8 or 2.0 and above).

create_bucket() should always succeed for both versions without extra error handling. Otherwise users could continue using the old influxdb-client for 1.8 and this one for >2.0 .

My suggestion would be to implement a more specific exception handler. I agree that just ApiException is too general. I will have a look into the returned ApiException.code.

Would this be acceptable for your project?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can accept this if the behaviour will be described in the documentation. Instead of catching exception you can check the InfluxDB version, something like here:

def _is_cloud_instance(self) -> bool:

return self.create_database(database=database_name, retention_rules=retention_rules)

def create_database(self, database=None, retention_rules=None):
"""Create a database at the v1 api (legacy).

:param database_name: name of the new database
:param retention_rules: retention rules array or single BucketRetentionRules
:return: Tuple (response body, status code, header dict)
"""
if database is None:
raise ValueError("Invalid value for `database`, must be defined.")

# Hedaer and local_var_params for standard procedures only
header_params = {}
header_params['Accept'] = self._influxdb_client.api_client.select_header_accept(
['application/json'])
header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type(
['application/json'])
local_var_params = locals()
local_var_params['kwargs'] = {}
all_params = []
self._buckets_service._check_operation_params(
"create_database", all_params, local_var_params
)

return self._influxdb_client.api_client.call_api(
'/query', 'POST',
header_params=header_params,
path_params={}, post_params=[],
files={}, auth_settings=[], collection_formats={},
query_params={'q': f'CREATE DATABASE {database}'},
async_req=local_var_params.get('async_req'),
_return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501
_preload_content=local_var_params.get('_preload_content', True),
_request_timeout=local_var_params.get('_request_timeout'),
urlopen_kw=None
)

def update_bucket(self, bucket: Bucket) -> Bucket:
"""Update a bucket.
Expand All @@ -71,17 +111,56 @@ def update_bucket(self, bucket: Bucket) -> Bucket:
return self._buckets_service.patch_buckets_id(bucket_id=bucket.id, patch_bucket_request=request)

def delete_bucket(self, bucket):
"""Delete a bucket.
"""Delete a bucket. Delete a database via v1 API as fallback.

:param bucket: bucket id or Bucket
:return: Bucket
:return: Bucket or the request thread when falling back
"""
if isinstance(bucket, Bucket):
bucket_id = bucket.id
else:
bucket_id = bucket

return self._buckets_service.delete_buckets_id(bucket_id=bucket_id)
try:
return self._buckets_service.delete_buckets_id(bucket_id=bucket_id)
except ApiException:
return self.delete_database(database=bucket_id)

def delete_database(self, database=None):
"""Delete a database at the v1 api (legacy).

:param database_name: name of the database to delete
:param retention_rules: retention rules array or single BucketRetentionRules
:return: Tuple (response body, status code, header dict)
"""
if database is None:
raise ValueError("Invalid value for `database`, must be defined.")

# Hedaer and local_var_params for standard procedures only
header_params = {}
header_params['Accept'] = self._influxdb_client.api_client.select_header_accept(
['application/json'])
header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type(
['application/json'])
local_var_params = locals()
local_var_params['kwargs'] = {}
all_params = []
self._buckets_service._check_operation_params(
"drop_database", all_params, local_var_params
)

return self._influxdb_client.api_client.call_api(
'/query', 'POST',
header_params=header_params,
path_params={}, post_params=[],
files={}, auth_settings=[], collection_formats={},
query_params={'q': f'DROP DATABASE {database}'},
async_req=local_var_params.get('async_req'),
_return_http_data_only=local_var_params.get('_return_http_data_only'),
_preload_content=local_var_params.get('_preload_content', True),
_request_timeout=local_var_params.get('_request_timeout'),
urlopen_kw=None
)

def find_bucket_by_id(self, id):
"""Find bucket by ID.
Expand Down