diff --git a/influxdb_client/client/bucket_api.py b/influxdb_client/client/bucket_api.py index 47763bee..3b10488c 100644 --- a/influxdb_client/client/bucket_api.py +++ b/influxdb_client/client/bucket_api.py @@ -5,7 +5,6 @@ A bucket belongs to an organization. """ import warnings - from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest from influxdb_client.client.util.helpers import get_org_query_param @@ -20,13 +19,12 @@ def __init__(self, influxdb_client): def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None, description=None, org=None) -> Bucket: - """Create a bucket. + """Create a bucket. Database creation via v1 API as fallback. :param Bucket|PostBucketRequest bucket: bucket to create :param bucket_name: bucket name :param description: bucket description :param org_id: org_id - :param bucket_name: bucket name :param retention_rules: retention rules array or single BucketRetentionRules :param str, Organization org: specifies the organization for create the bucket; Take the ``ID``, ``Name`` or ``Organization``. @@ -35,6 +33,13 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru If the method is called asynchronously, returns the request thread. """ + if self._buckets_service._is_below_v2(): + # Fall back to v1 API if buckets are not supported + warnings.warn("InfluxDB versions below v2.0 are deprecated. " + + "Falling back to CREATE DATABASE statement", DeprecationWarning) + database_name = bucket_name if bucket_name is not None else bucket + return self._create_database(database=database_name) + if retention_rules is None: retention_rules = [] @@ -58,6 +63,41 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru return self._buckets_service.post_buckets(post_bucket_request=bucket) + def _create_database(self, database=None): + """Create a database at the v1 api (legacy). + + :param database_name: name of the new database + :return: tuple(response body, status code, header dict) + """ + if database is None: + raise ValueError("Invalid value for `database`, must be defined.") + + # Hedaer and local_var_params for standard procedures only + header_params = {} + header_params['Accept'] = self._influxdb_client.api_client.select_header_accept( + ['application/json']) + header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type( + ['application/json']) + local_var_params = locals() + local_var_params['kwargs'] = {} + all_params = [] + self._buckets_service._check_operation_params( + "create_database", all_params, local_var_params + ) + + return self._influxdb_client.api_client.call_api( + '/query', 'POST', + header_params=header_params, + path_params={}, post_params=[], + files={}, auth_settings=[], collection_formats={}, + query_params={'q': f'CREATE DATABASE {database}'}, + async_req=local_var_params.get('async_req'), + _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 + _preload_content=local_var_params.get('_preload_content', True), + _request_timeout=local_var_params.get('_request_timeout'), + urlopen_kw=None + ) + def update_bucket(self, bucket: Bucket) -> Bucket: """Update a bucket. @@ -71,7 +111,7 @@ def update_bucket(self, bucket: Bucket) -> Bucket: return self._buckets_service.patch_buckets_id(bucket_id=bucket.id, patch_bucket_request=request) def delete_bucket(self, bucket): - """Delete a bucket. + """Delete a bucket. Delete a database via v1 API as fallback. :param bucket: bucket id or Bucket :return: Bucket @@ -81,8 +121,49 @@ def delete_bucket(self, bucket): else: bucket_id = bucket + if self._buckets_service._is_below_v2(): + # Fall back to v1 API if buckets are not supported + warnings.warn("InfluxDB versions below v2.0 are deprecated. " + + "Falling back to DROP DATABASE statement", DeprecationWarning) + return self._delete_database(database=bucket_id) + return self._buckets_service.delete_buckets_id(bucket_id=bucket_id) + def _delete_database(self, database=None): + """Delete a database at the v1 api (legacy). + + :param database_name: name of the database to delete + :return: tuple(response body, status code, header dict) + """ + if database is None: + raise ValueError("Invalid value for `database`, must be defined.") + + # Hedaer and local_var_params for standard procedures only + header_params = {} + header_params['Accept'] = self._influxdb_client.api_client.select_header_accept( + ['application/json']) + header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type( + ['application/json']) + local_var_params = locals() + local_var_params['kwargs'] = {} + all_params = [] + self._buckets_service._check_operation_params( + "drop_database", all_params, local_var_params + ) + + return self._influxdb_client.api_client.call_api( + '/query', 'POST', + header_params=header_params, + path_params={}, post_params=[], + files={}, auth_settings=[], collection_formats={}, + query_params={'q': f'DROP DATABASE {database}'}, + async_req=local_var_params.get('async_req'), + _return_http_data_only=local_var_params.get('_return_http_data_only'), + _preload_content=local_var_params.get('_preload_content', True), + _request_timeout=local_var_params.get('_request_timeout'), + urlopen_kw=None + ) + def find_bucket_by_id(self, id): """Find bucket by ID. diff --git a/influxdb_client/service/_base_service.py b/influxdb_client/service/_base_service.py index d3e8f995..15e9d56c 100644 --- a/influxdb_client/service/_base_service.py +++ b/influxdb_client/service/_base_service.py @@ -9,6 +9,7 @@ def __init__(self, api_client=None): raise ValueError("Invalid value for `api_client`, must be defined.") self.api_client = api_client self._build_type = None + self._build_version = None def _check_operation_params(self, operation_id, supported_params, local_params): supported_params.append('async_req') @@ -35,6 +36,16 @@ async def _is_cloud_instance_async(self) -> bool: self._build_type = await self.build_type_async() return 'cloud' in self._build_type.lower() + def _is_below_v2(self) -> bool: + if self._build_version is None: + self._build_version = self.build_version() + return self._build_version < '2' + + async def _is_below_v2_async(self) -> bool: + if self._build_version is None: + self._build_version = await self.build_version() + return self._build_version < '2' + def build_type(self) -> str: """ Return the build type of the connected InfluxDB Server. @@ -59,6 +70,30 @@ async def build_type_async(self) -> str: response = await ping_service.get_ping_async(_return_http_data_only=False) return self.response_header(response, header_name='X-Influxdb-Build') + def build_version(self) -> str: + """ + Return the version number of the connected InfluxDB Server. + + :return: Version number of InfluxDB build. + """ + from influxdb_client import PingService + ping_service = PingService(self.api_client) + + response = ping_service.get_ping_with_http_info(_return_http_data_only=False) + return self.response_header(response, header_name='X-Influxdb-Version') + + async def build_version_async(self) -> str: + """ + Return the version number of the connected InfluxDB Server. + + :return: Version number of InfluxDB build. + """ + from influxdb_client import PingService + ping_service = PingService(self.api_client) + + response = await ping_service.get_ping_async(_return_http_data_only=False) + return self.response_header(response, header_name='X-Influxdb-Version') + def response_header(self, response, header_name='X-Influxdb-Version') -> str: if response is not None and len(response) >= 3: if header_name in response[2]: