diff --git a/src/dbacademy_helper/dbacademy_helper_class.py b/src/dbacademy_helper/dbacademy_helper_class.py index f7e53ed..acc414e 100644 --- a/src/dbacademy_helper/dbacademy_helper_class.py +++ b/src/dbacademy_helper/dbacademy_helper_class.py @@ -320,8 +320,8 @@ def cleanup(self, validate_datasets=True): self.__spark.catalog.clearCache() self.__cleanup_stop_all_streams() - if drop_catalog: self.__cleanup_catalog() - elif drop_schema: self.__cleanup_schema() + if drop_catalog: self.__drop_catalog() + elif drop_schema: self.__drop_schema() if remove_wd: self.__cleanup_working_dir() @@ -337,18 +337,29 @@ def __cleanup_working_dir(self): print(self.clock_stopped(start)) - def __cleanup_schema(self): + @staticmethod + def __drop_database(schema_name): from pyspark.sql.utils import AnalysisException + try: location = dbgems.sql(f"DESCRIBE TABLE EXTENDED {schema_name}").filter("col_name == 'Location'").first()["data_type"] + except Exception: location = None # Ignore this concurrency error + + try: dbgems.sql(f"DROP DATABASE IF EXISTS {schema_name} CASCADE") + except AnalysisException: pass # Ignore this concurrency error + + try: dbgems.dbutils.fs.rm(location) + except: pass # We are going to ignore this as it is most likely deleted or None + + def __drop_schema(self): + start = self.clock_start() print(f"...dropping the schema \"{self.schema_name}\"", end="...") - try: self.__spark.sql(f"DROP DATABASE IF EXISTS {self.schema_name} CASCADE") - except AnalysisException: pass # Ignore this concurrency error + self.__drop_database(self.schema_name) print(self.clock_stopped(start)) - def __cleanup_catalog(self): + def __drop_catalog(self): from pyspark.sql.utils import AnalysisException start = self.clock_start() @@ -396,8 +407,7 @@ def __reset_databases(self): for schema_name in schema_names: if schema_name.startswith(self.schema_name_prefix) and schema_name != "default": print(f"Dropping the schema \"{catalog_name}.{schema_name}\"") - try: dbgems.spark.sql(f"DROP DATABASE IF EXISTS {catalog_name}.{schema_name} CASCADE") - except AnalysisException: pass # Ignore this concurrency error + self.__drop_database(f"{catalog_name}.{schema_name}") def __reset_working_dir(self): from dbacademy_helper.paths_class import Paths