Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

Commit

Permalink
Update dbacademy_helper_class.py
Browse files Browse the repository at this point in the history
  • Loading branch information
SireInsectus committed Oct 11, 2022
1 parent 6b12391 commit aaef670
Showing 1 changed file with 42 additions and 42 deletions.
84 changes: 42 additions & 42 deletions src/dbacademy_helper/dbacademy_helper_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ def to_unique_name(self, username):
course_code = self.course_config.course_code
return f"{local_part}-{username_hash}-dbacademy-{course_code}".lower()

@property
def catalog_name_prefix(self):
return self.to_catalog_name_prefix(self.username)

def to_catalog_name_prefix(self, username):
local_part = username.split("@")[0]
username_hash = dbgems.stable_hash(username, length=4)
course_code = self.course_config.course_code
return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}").lower()

@property
def catalog_name(self):

Expand All @@ -163,21 +173,21 @@ def catalog_name(self):
return self.to_catalog_name(self.username)

def to_catalog_name(self, username: str) -> str:
local_part = username.split("@")[0]
username_hash = dbgems.stable_hash(username, length=4)
course_code = self.course_config.course_code
catalog_name_prefix = self.to_catalog_name_prefix(username)

if self.lesson_config.name is None:
return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}").lower()
# With no lesson, catalog and prefix are the same.
return catalog_name_prefix
else:
return DBAcademyHelper.clean_string(f"{local_part}-{username_hash}-dbacademy-{course_code}-{self.lesson_config.clean_name}").lower()
# Append the lesson name to the catalog name
return DBAcademyHelper.clean_string(f"{catalog_name_prefix}-{self.lesson_config.clean_name}").lower()

@property
def schema_name_prefix(self):
if self.lesson_config.created_catalog:
return "default"
else:
return self.to_schema_name(username=self.username)
return self.to_schema_name_prefix(username=self.username)

def to_schema_name_prefix(self, username: str) -> str:
local_part = username.split("@")[0]
Expand Down Expand Up @@ -298,19 +308,19 @@ def cleanup(self, validate_datasets=True):
remove_wd = self.paths.exists(self.paths.working_dir) # Test to see if the working directory exists

if self.lesson_config.created_catalog:
clean_catalog = True # If we created it, we clean it
drop_catalog = True # If we created it, we clean it
drop_schema = False # But don't the schema
else:
clean_catalog = False # We didn't clean the catalog so don't touch it.
drop_catalog = False # We didn't clean the catalog so don't touch it.
drop_schema = self.__spark.sql(f"SHOW DATABASES").filter(f"databaseName == '{self.schema_name}'").count() == 1

if clean_catalog or drop_schema or remove_wd or active_streams:
if drop_catalog or drop_schema or remove_wd or active_streams:
print("Resetting the learning environment...")

self.__spark.catalog.clearCache()
self.__cleanup_stop_all_streams()

if clean_catalog: self.__cleanup_catalog()
if drop_catalog: self.__cleanup_catalog()
elif drop_schema: self.__cleanup_schema()

if remove_wd: self.__cleanup_working_dir()
Expand All @@ -327,39 +337,21 @@ def __cleanup_working_dir(self):

print(self.clock_stopped(start))

# Without UC, we only want to drop the database provided to the learner
def __cleanup_schema(self):
start = self.clock_start()
print(f"...dropping the schema \"{self.schema_name}\"", end="...")

self.__spark.sql(f"DROP DATABASE {self.schema_name} CASCADE")
self.__spark.sql(f"DROP DATABASE IF EXISTS {self.schema_name} CASCADE")

print(self.clock_stopped(start))

# With UC enabled, we need to drop all databases
def __cleanup_catalog(self):
start = self.clock_start()
print(f"...dropping the catalog \"{self.catalog_name}\"", end="...")

catalogs = [c[0] for c in dbgems.sql("SHOW CATALOGS").collect()]
if self.catalog_name not in catalogs:
return # The catalog no longer exists
self.__spark.sql(f"DROP CATALOG IF EXISTS {self.catalog_name} CASCADE")

if self.lesson_config.created_catalog:
schemas = [d[0] for d in dbgems.spark.sql(f"SHOW DATABASES IN {self.catalog_name}").collect()]

for ignored in DBAcademyHelper.SPECIAL_SCHEMAS:
if ignored in schemas:
del schemas[schemas.index(ignored)]

s = "" if len(schemas) == 1 else "s"
print(f"...dropping {len(schemas)} schema{s} from the catalog \"{self.catalog_name}\"")
for schema_name in schemas:
if schema_name.startswith("_") or schema_name in DBAcademyHelper.SPECIAL_SCHEMAS:
print(f"......skipping the schema \"{schema_name}\"")
else:
start = self.clock_start()
print(f"......dropping the schema \"{schema_name}\"", end="...")
dbgems.spark.sql(f"DROP SCHEMA IF EXISTS {self.catalog_name}.{schema_name} CASCADE")
print(self.clock_stopped(start))
print(self.clock_stopped(start))

def __cleanup_stop_all_streams(self):
for stream in self.__spark.streams.active:
Expand All @@ -379,15 +371,23 @@ def reset_learning_environment(self):
print(f"\nThe learning environment was successfully reset {self.clock_stopped(start)}.")

def __reset_databases(self):
if self.lesson_config.created_catalog is not None:
self.__cleanup_catalog()
else:
# This is a "classic" setup, drop all user-specific databases.
schema_names = [d.databaseName for d in dbgems.spark.sql(f"show databases").collect()]
for schema_name in schema_names:
if schema_name.startswith(self.schema_name_prefix) and schema_name != "default":
print(f"Dropping the schema \"{schema_name}\"")
dbgems.spark.sql(f"DROP DATABASE IF EXISTS {schema_name} CASCADE")
# Drop all user-specific catalogs
catalog_names = [c.catalog for c in dbgems.spark.sql(f"SHOW CATALOGS").collect()]
for catalog_name in catalog_names:
if catalog_name.startswith(self.catalog_name_prefix):
print(f"Dropping the catalog \"{catalog_name}\"")
dbgems.spark.sql(f"DROP CATALOG IF EXISTS {catalog_name} CASCADE")

# Refresh the list of catalogs
catalog_names = [c.catalog for c in dbgems.spark.sql(f"SHOW CATALOGS").collect()]
for catalog_name in catalog_names:
# There are potentially two "default" catalogs from which we need to remove user-specific schemas
if catalog_name in [DBAcademyHelper.CATALOG_SPARK_DEFAULT, DBAcademyHelper.CATALOG_UC_DEFAULT]:
schema_names = [d.databaseName for d in dbgems.spark.sql(f"SHOW DATABASES IN {catalog_name}").collect()]
for schema_name in schema_names:
if schema_name.startswith(self.schema_name_prefix) and schema_name != "default":
print(f"Dropping the schema \"{catalog_name}.{schema_name}\"")
dbgems.spark.sql(f"DROP DATABASE IF EXISTS {catalog_name}.{schema_name} CASCADE")

def __reset_working_dir(self):
from dbacademy_helper.paths_class import Paths
Expand Down

0 comments on commit aaef670

Please sign in to comment.