Skip to content

Commit

Permalink
Merge branch 'prep_1.9.0b2' into 1.9.latest
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Oct 25, 2024
2 parents a944656 + 823f5f2 commit 047cec7
Show file tree
Hide file tree
Showing 30 changed files with 217 additions and 244 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
### Features

- Add support for serverless job clusters on python models ([706](https://github.com/databricks/dbt-databricks/pull/706))
- Add 'user_folder_for_python' config to switch writing python model notebooks to the user's folder ([706](https://github.com/databricks/dbt-databricks/pull/706))
- Add 'user_folder_for_python' behavior to switch writing python model notebooks to the user's folder ([835](https://github.com/databricks/dbt-databricks/pull/835))
- Merge capabilities are extended ([739](https://github.com/databricks/dbt-databricks/pull/739)) to include the support for the following features (thanks @mi-volodin):
- `with schema evolution` clause (requires Databricks Runtime 15.2 or above);
- `when not matched by source` clause, only for `delete` action
Expand All @@ -24,6 +24,8 @@
- Fix places where we were not properly closing cursors, and other test warnings ([713](https://github.com/databricks/dbt-databricks/pull/713))
- Drop support for Python 3.8 ([713](https://github.com/databricks/dbt-databricks/pull/713))
- Upgrade databricks-sql-connector dependency to 3.5.0 ([833](https://github.com/databricks/dbt-databricks/pull/833))
- Prepare for python typing deprecations ([837](https://github.com/databricks/dbt-databricks/pull/837))
- Fix behavior flag use in init of DatabricksAdapter (thanks @VersusFacit!) ([836](https://github.com/databricks/dbt-databricks/pull/836))

## dbt-databricks 1.8.7 (October 10, 2024)

Expand Down
32 changes: 12 additions & 20 deletions dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import base64
from collections.abc import Callable
import time
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
import re
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Set

from dbt.adapters.databricks import utils
from dbt.adapters.databricks.__version__ import version
Expand All @@ -34,17 +31,17 @@ def __init__(self, session: Session, host: str, api: str):
self.session = session

def get(
self, suffix: str = "", json: Optional[Any] = None, params: Optional[Dict[str, Any]] = None
self, suffix: str = "", json: Optional[Any] = None, params: Optional[dict[str, Any]] = None
) -> Response:
return self.session.get(f"{self.prefix}{suffix}", json=json, params=params)

def post(
self, suffix: str = "", json: Optional[Any] = None, params: Optional[Dict[str, Any]] = None
self, suffix: str = "", json: Optional[Any] = None, params: Optional[dict[str, Any]] = None
) -> Response:
return self.session.post(f"{self.prefix}{suffix}", json=json, params=params)

def put(
self, suffix: str = "", json: Optional[Any] = None, params: Optional[Dict[str, Any]] = None
self, suffix: str = "", json: Optional[Any] = None, params: Optional[dict[str, Any]] = None
) -> Response:
return self.session.put(f"{self.prefix}{suffix}", json=json, params=params)

Expand Down Expand Up @@ -141,11 +138,6 @@ def get_folder(self, catalog: str, schema: str) -> str:
# Use this for now to not break users
class SharedFolderApi(FolderApi):
def get_folder(self, _: str, schema: str) -> str:
logger.warning(
f"Uploading notebook to '/Shared/dbt_python_models/{schema}/'. "
"Writing to '/Shared' is deprecated and will be removed in a future release. "
"Write to the current user's home directory by setting `user_folder_for_python: true`"
)
return f"/Shared/dbt_python_models/{schema}/"


Expand Down Expand Up @@ -230,7 +222,7 @@ def _poll_api(
url: str,
params: dict,
get_state_func: Callable[[Response], str],
terminal_states: Set[str],
terminal_states: set[str],
expected_end_state: str,
unexpected_end_state_func: Callable[[Response], None],
) -> Response:
Expand Down Expand Up @@ -261,7 +253,7 @@ class CommandExecution(object):
context_id: str
cluster_id: str

def model_dump(self) -> Dict[str, Any]:
def model_dump(self) -> dict[str, Any]:
return {
"commandId": self.command_id,
"contextId": self.context_id,
Expand Down Expand Up @@ -328,7 +320,7 @@ def __init__(self, session: Session, host: str, polling_interval: int, timeout:
super().__init__(session, host, "/api/2.1/jobs/runs", polling_interval, timeout)

def submit(
self, run_name: str, job_spec: Dict[str, Any], **additional_job_settings: Dict[str, Any]
self, run_name: str, job_spec: dict[str, Any], **additional_job_settings: dict[str, Any]
) -> str:
submit_response = self.session.post(
"/submit", json={"run_name": run_name, "tasks": [job_spec], **additional_job_settings}
Expand Down Expand Up @@ -388,7 +380,7 @@ class JobPermissionsApi(DatabricksApi):
def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/permissions/jobs")

def put(self, job_id: str, access_control_list: List[Dict[str, Any]]) -> None:
def put(self, job_id: str, access_control_list: list[dict[str, Any]]) -> None:
request_body = {"access_control_list": access_control_list}

response = self.session.put(f"/{job_id}", json=request_body)
Expand All @@ -397,7 +389,7 @@ def put(self, job_id: str, access_control_list: List[Dict[str, Any]]) -> None:
if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}")

def get(self, job_id: str) -> Dict[str, Any]:
def get(self, job_id: str) -> dict[str, Any]:
response = self.session.get(f"/{job_id}")

if response.status_code != 200:
Expand All @@ -413,15 +405,15 @@ class WorkflowJobApi(DatabricksApi):
def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.1/jobs")

def search_by_name(self, job_name: str) -> List[Dict[str, Any]]:
def search_by_name(self, job_name: str) -> list[dict[str, Any]]:
response = self.session.get("/list", json={"name": job_name})

if response.status_code != 200:
raise DbtRuntimeError(f"Error fetching job by name.\n {response.content!r}")

return response.json().get("jobs", [])

def create(self, job_spec: Dict[str, Any]) -> str:
def create(self, job_spec: dict[str, Any]) -> str:
"""
:return: the job_id
"""
Expand All @@ -434,7 +426,7 @@ def create(self, job_spec: Dict[str, Any]) -> str:
logger.info(f"New workflow created with job id {job_id}")
return job_id

def update_job_settings(self, job_id: str, job_spec: Dict[str, Any]) -> None:
def update_job_settings(self, job_id: str, job_spec: dict[str, Any]) -> None:
request_body = {
"job_id": job_id,
"new_settings": job_spec,
Expand Down
5 changes: 2 additions & 3 deletions dbt/adapters/databricks/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Dict
from typing import Optional

from databricks.sdk.core import Config
Expand Down Expand Up @@ -34,7 +33,7 @@ def from_dict(raw: Optional[dict]) -> Optional[CredentialsProvider]:
def __call__(self, _: Optional[Config] = None) -> HeaderFactory:
static_credentials = {"Authorization": f"Bearer {self._token}"}

def inner() -> Dict[str, str]:
def inner() -> dict[str, str]:
return static_credentials

return inner
Expand Down Expand Up @@ -81,7 +80,7 @@ def from_dict(host: str, client_id: str, client_secret: str, raw: dict) -> Crede
return c

def __call__(self, _: Optional[Config] = None) -> HeaderFactory:
def inner() -> Dict[str, str]:
def inner() -> dict[str, str]:
token = self._token_source.token() # type: ignore
return {"Authorization": f"{token.token_type} {token.access_token}"}

Expand Down
13 changes: 6 additions & 7 deletions dbt/adapters/databricks/behaviors/columns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import List
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.relation import DatabricksRelation
Expand All @@ -14,13 +13,13 @@ class GetColumnsBehavior(ABC):
@abstractmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
) -> list[DatabricksColumn]:
pass

@staticmethod
def _get_columns_with_comments(
adapter: SQLAdapter, relation: DatabricksRelation, macro_name: str
) -> List[AttrDict]:
) -> list[AttrDict]:
return list(
handle_missing_objects(
lambda: adapter.execute_macro(macro_name, kwargs={"relation": relation}),
Expand All @@ -33,12 +32,12 @@ class GetColumnsByDescribe(GetColumnsBehavior):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
) -> list[DatabricksColumn]:
rows = cls._get_columns_with_comments(adapter, relation, "get_columns_comments")
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
def _parse_columns(cls, rows: list[AttrDict]) -> list[DatabricksColumn]:
columns = []

for row in rows:
Expand All @@ -57,7 +56,7 @@ class GetColumnsByInformationSchema(GetColumnsByDescribe):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
) -> list[DatabricksColumn]:
if relation.is_hive_metastore() or relation.type == DatabricksRelation.View:
return super().get_columns_in_relation(adapter, relation)

Expand All @@ -67,5 +66,5 @@ def get_columns_in_relation(
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
def _parse_columns(cls, rows: list[AttrDict]) -> list[DatabricksColumn]:
return [DatabricksColumn(column=row[0], dtype=row[1], comment=row[2]) for row in rows]
3 changes: 1 addition & 2 deletions dbt/adapters/databricks/column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass
from typing import ClassVar
from typing import Dict
from typing import Optional

from dbt.adapters.spark.column import SparkColumn
Expand All @@ -11,7 +10,7 @@ class DatabricksColumn(SparkColumn):
table_comment: Optional[str] = None
comment: Optional[str] = None

TYPE_LABELS: ClassVar[Dict[str, str]] = {
TYPE_LABELS: ClassVar[dict[str, str]] = {
"LONG": "BIGINT",
}

Expand Down
Loading

0 comments on commit 047cec7

Please sign in to comment.