diff --git a/src/dbacademy_helper/dbacademy_helper_class.py b/src/dbacademy_helper/dbacademy_helper_class.py index ecb344e..46bfdbc 100644 --- a/src/dbacademy_helper/dbacademy_helper_class.py +++ b/src/dbacademy_helper/dbacademy_helper_class.py @@ -147,6 +147,16 @@ def to_unique_name(self, username): course_code = self.course_config.course_code return f"{local_part}-{username_hash}-dbacademy-{course_code}".lower() + @property + def catalog_name_prefix(self): + return self.to_catalog_name_prefix(self.username) + + def to_catalog_name_prefix(self, username): + local_part = username.split("@")[0] + username_hash = dbgems.stable_hash(username, length=4) + course_code = self.course_config.course_code + return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}").lower() + @property def catalog_name(self): @@ -163,21 +173,21 @@ def catalog_name(self): return self.to_catalog_name(self.username) def to_catalog_name(self, username: str) -> str: - local_part = username.split("@")[0] - username_hash = dbgems.stable_hash(username, length=4) - course_code = self.course_config.course_code + catalog_name_prefix = self.to_catalog_name_prefix(username) if self.lesson_config.name is None: - return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}").lower() + # With no lesson, catalog and prefix are the same. + return catalog_name_prefix else: - return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}-{self.lesson_config.clean_name}").lower() + # Append the lesson name to the catalog name + return DBAcademyHelper.clean_string(f"{catalog_name_prefix}-{self.lesson_config.clean_name}").lower() @property def schema_name_prefix(self): if self.lesson_config.created_catalog: return "default" else: - return self.to_schema_name(username=self.username) + return self.to_schema_name_prefix(username=self.username) def to_schema_name_prefix(self, username: str) -> str: local_part = username.split("@")[0] @@ -298,19 +308,19 @@ def cleanup(self, validate_datasets=True): remove_wd = self.paths.exists(self.paths.working_dir) # Test to see if the working directory exists if self.lesson_config.created_catalog: - clean_catalog = True # If we created it, we clean it + drop_catalog = True # If we created it, we clean it drop_schema = False # But don't the schema else: - clean_catalog = False # We didn't clean the catalog so don't touch it. + drop_catalog = False # We didn't clean the catalog so don't touch it. drop_schema = self.__spark.sql(f"SHOW DATABASES").filter(f"databaseName == '{self.schema_name}'").count() == 1 - if clean_catalog or drop_schema or remove_wd or active_streams: + if drop_catalog or drop_schema or remove_wd or active_streams: print("Resetting the learning environment...") self.__spark.catalog.clearCache() self.__cleanup_stop_all_streams() - if clean_catalog: self.__cleanup_catalog() + if drop_catalog: self.__cleanup_catalog() elif drop_schema: self.__cleanup_schema() if remove_wd: self.__cleanup_working_dir() @@ -327,39 +337,21 @@ def __cleanup_working_dir(self): print(self.clock_stopped(start)) - # Without UC, we only want to drop the database provided to the learner def __cleanup_schema(self): start = self.clock_start() print(f"...dropping the schema \"{self.schema_name}\"", end="...") - self.__spark.sql(f"DROP DATABASE {self.schema_name} CASCADE") + self.__spark.sql(f"DROP DATABASE IF EXISTS {self.schema_name} CASCADE") print(self.clock_stopped(start)) - # With UC enabled, we need to drop all databases def __cleanup_catalog(self): + start = self.clock_start() + print(f"...dropping the catalog \"{self.catalog_name}\"", end="...") - catalogs = [c[0] for c in dbgems.sql("SHOW CATALOGS").collect()] - if self.catalog_name not in catalogs: - return # The catalog no longer exists + self.__spark.sql(f"DROP CATALOG IF EXISTS {self.catalog_name} CASCADE") - if self.lesson_config.created_catalog: - schemas = [d[0] for d in dbgems.spark.sql(f"SHOW DATABASES IN {self.catalog_name}").collect()] - - for ignored in DBAcademyHelper.SPECIAL_SCHEMAS: - if ignored in schemas: - del schemas[schemas.index(ignored)] - - s = "" if len(schemas) == 1 else "s" - print(f"...dropping {len(schemas)} schema{s} from the catalog \"{self.catalog_name}\"") - for schema_name in schemas: - if schema_name.startswith("_") or schema_name in DBAcademyHelper.SPECIAL_SCHEMAS: - print(f"......skipping the schema \"{schema_name}\"") - else: - start = self.clock_start() - print(f"......dropping the schema \"{schema_name}\"", end="...") - dbgems.spark.sql(f"DROP SCHEMA IF EXISTS {self.catalog_name}.{schema_name} CASCADE") - print(self.clock_stopped(start)) + print(self.clock_stopped(start)) def __cleanup_stop_all_streams(self): for stream in self.__spark.streams.active: @@ -379,15 +371,23 @@ def reset_learning_environment(self): print(f"\nThe learning environment was successfully reset {self.clock_stopped(start)}.") def __reset_databases(self): - if self.lesson_config.created_catalog is not None: - self.__cleanup_catalog() - else: - # This is a "classic" setup, drop all user-specific databases. - schema_names = [d.databaseName for d in dbgems.spark.sql(f"show databases").collect()] - for schema_name in schema_names: - if schema_name.startswith(self.schema_name_prefix) and schema_name != "default": - print(f"Dropping the schema \"{schema_name}\"") - dbgems.spark.sql(f"DROP DATABASE IF EXISTS {schema_name} CASCADE") + # Drop all user-specific catalogs + catalog_names = [c.catalog for c in dbgems.spark.sql(f"SHOW CATALOGS").collect()] + for catalog_name in catalog_names: + if catalog_name.startswith(self.catalog_name_prefix): + print(f"Dropping the catalog \"{catalog_name}\"") + dbgems.spark.sql(f"DROP CATALOG IF EXISTS {catalog_name} CASCADE") + + # Refresh the list of catalogs + catalog_names = [c.catalog for c in dbgems.spark.sql(f"SHOW CATALOGS").collect()] + for catalog_name in catalog_names: + # There are potentially two "default" catalogs from which we need to remove user-specific schemas + if catalog_name in [DBAcademyHelper.CATALOG_SPARK_DEFAULT, DBAcademyHelper.CATALOG_UC_DEFAULT]: + schema_names = [d.databaseName for d in dbgems.spark.sql(f"SHOW DATABASES IN {catalog_name}").collect()] + for schema_name in schema_names: + if schema_name.startswith(self.schema_name_prefix) and schema_name != "default": + print(f"Dropping the schema \"{catalog_name}.{schema_name}\"") + dbgems.spark.sql(f"DROP DATABASE IF EXISTS {catalog_name}.{schema_name} CASCADE") def __reset_working_dir(self): from dbacademy_helper.paths_class import Paths