Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

Commit

Permalink
Update dbacademy_helper_class.py
Browse files Browse the repository at this point in the history
  • Loading branch information
SireInsectus committed Oct 11, 2022
1 parent a3e07d9 commit 0380b87
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions src/dbacademy_helper/dbacademy_helper_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ def cleanup(self, validate_datasets=True):
self.__spark.catalog.clearCache()
self.__cleanup_stop_all_streams()

if drop_catalog: self.__cleanup_catalog()
elif drop_schema: self.__cleanup_schema()
if drop_catalog: self.__drop_catalog()
elif drop_schema: self.__drop_schema()

if remove_wd: self.__cleanup_working_dir()

Expand All @@ -337,18 +337,29 @@ def __cleanup_working_dir(self):

print(self.clock_stopped(start))

def __cleanup_schema(self):
@staticmethod
def __drop_database(schema_name):
from pyspark.sql.utils import AnalysisException

try: location = dbgems.sql(f"DESCRIBE TABLE EXTENDED {schema_name}").filter("col_name == 'Location'").first()["data_type"]
except Exception: location = None # Ignore this concurrency error

try: dbgems.sql(f"DROP DATABASE IF EXISTS {schema_name} CASCADE")
except AnalysisException: pass # Ignore this concurrency error

try: dbgems.dbutils.fs.rm(location)
except: pass # We are going to ignore this as it is most likely deleted or None

def __drop_schema(self):

start = self.clock_start()
print(f"...dropping the schema \"{self.schema_name}\"", end="...")

try: self.__spark.sql(f"DROP DATABASE IF EXISTS {self.schema_name} CASCADE")
except AnalysisException: pass # Ignore this concurrency error
self.__drop_database(self.schema_name)

print(self.clock_stopped(start))

def __cleanup_catalog(self):
def __drop_catalog(self):
from pyspark.sql.utils import AnalysisException

start = self.clock_start()
Expand Down Expand Up @@ -396,8 +407,7 @@ def __reset_databases(self):
for schema_name in schema_names:
if schema_name.startswith(self.schema_name_prefix) and schema_name != "default":
print(f"Dropping the schema \"{catalog_name}.{schema_name}\"")
try: dbgems.spark.sql(f"DROP DATABASE IF EXISTS {catalog_name}.{schema_name} CASCADE")
except AnalysisException: pass # Ignore this concurrency error
self.__drop_database(f"{catalog_name}.{schema_name}")

def __reset_working_dir(self):
from dbacademy_helper.paths_class import Paths
Expand Down

0 comments on commit 0380b87

Please sign in to comment.