diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 273661a5a7f..6455ec07c30 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -216,6 +216,31 @@ def getIndexPrefix(self): """ return self.__indexPrefix + @ifConnected + def addIndexTemplate( + self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None + ) -> dict: + """Adds an index template. + + :param self: self reference + :param str name: index name + :param list index_patterns: list of index patterns to match + :param dict mapping: it is the mapping of the index + """ + if settings is None: + settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}} + body = { + "index_patterns": index_patterns, + "priority": priority, + "template": {"settings": settings, "mappings": {"properties": mapping}}, + } + try: + res = self.client.indices.put_index_template(name=name, body=body) + return S_OK(res) + except Exception as e: # pylint: disable=broad-except + sLog.exception() + return S_ERROR(e) + @ifConnected def query(self, index: str, query): """Executes a query and returns its result (uses ES DSL language). @@ -429,19 +454,14 @@ def createIndex(self, indexPrefix, mapping=None, period="day"): sLog.warn("The period is not provided, so using non-periodic indexes names") fullIndex = indexPrefix - res = self.existingIndex(fullIndex) - if not res["OK"]: - return res - elif res["Value"]: - return S_OK(fullIndex) - try: - sLog.info("Create index: ", fullIndex + str(mapping)) - self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7 - + if not mapping: + self.client.indices.create(index=fullIndex) + else: + self.client.indices.create(index=fullIndex, body={"mappings": mapping}) return S_OK(fullIndex) - except Exception as e: # pylint: disable=broad-except - sLog.error("Can not create the index:", repr(e)) + except Exception: # pylint: disable=broad-except + sLog.exception() return S_ERROR("Can not create the index") @ifConnected diff --git a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py index 40c51ce9e94..0f919b72456 100644 --- a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py +++ b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py @@ -43,7 +43,7 @@ class MonitoringDB(ElasticDB): """Extension of ElasticDB for Monitoring system DB""" - def __init__(self, name="Monitoring/MonitoringDB", readOnly=False): + def __init__(self, name="Monitoring/MonitoringDB"): """Standard constructor""" try: @@ -55,7 +55,6 @@ def __init__(self, name="Monitoring/MonitoringDB", readOnly=False): self.log.error("Can't connect to MonitoringDB", repr(ex)) raise ex - self.__readonly = readOnly self.documentTypes = {} # loads all monitoring indexes and types. @@ -75,27 +74,6 @@ def __init__(self, name="Monitoring/MonitoringDB", readOnly=False): "monitoringFields": monfields, "period": period, } - if self.__readonly: - self.log.info("Read only mode: no new index will be created") - else: - # Verifying if the index is there, and if not create it - res = self.existingIndex(f"{indexName}-*") # check with a wildcard - if res["OK"] and res["Value"]: - actualIndexName = self.generateFullIndexName(indexName, period) - res = self.existingIndex(actualIndexName) # check actual index - if not res["OK"] or not res["Value"]: - result = self.createIndex(indexName, self.documentTypes[monitoringType]["mapping"], period) - if not result["OK"]: - self.log.error(result["Message"]) - raise RuntimeError(result["Message"]) - self.log.always("Index created", actualIndexName) - else: - # in case the index pattern does not exist - result = self.createIndex(indexName, self.documentTypes[monitoringType]["mapping"], period) - if not result["OK"]: - self.log.error(result["Message"]) - raise RuntimeError(result["Message"]) - self.log.always("Index created", indexName) def getIndexName(self, typeName): """ diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index 14d4dec1a36..cdd32156d4a 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -13,20 +13,24 @@ from DIRAC.Core.Utilities import TimeUtilities mapping = { - "properties": { - "JobID": {"type": "long"}, - "timestamp": {"type": "date"}, - "CPUNormalizationFactor": {"type": "long"}, - "NormCPUTime(s)": {"type": "long"}, - "Memory(kB)": {"type": "long"}, - "TotalCPUTime(s)": {"type": "long"}, - "MemoryUsed(kb)": {"type": "long"}, - "HostName": {"type": "keyword"}, - "GridCE": {"type": "keyword"}, - "ModelName": {"type": "keyword"}, - "Status": {"type": "keyword"}, - "JobType": {"type": "keyword"}, - } + "JobID": {"type": "long"}, + "timestamp": {"type": "date"}, + "PilotAgent": {"type": "keyword"}, + "Pilot_Reference": {"type": "keyword"}, + "JobGroup": {"type": "keyword"}, + "CPUNormalizationFactor": {"type": "long"}, + "NormCPUTime(s)": {"type": "long"}, + "Memory(MB)": {"type": "long"}, + "LocalAccount": {"type": "keyword"}, + "TotalCPUTime(s)": {"type": "long"}, + "PayloadPID": {"type": "long"}, + "HostName": {"type": "text"}, + "GridCE": {"type": "keyword"}, + "CEQueue": {"type": "keyword"}, + "BatchSystem": {"type": "keyword"}, + "ModelName": {"type": "keyword"}, + "Status": {"type": "keyword"}, + "JobType": {"type": "keyword"}, } @@ -42,6 +46,7 @@ def __init__(self, parentLogger=None): super().__init__(self.fullname, self.index_name, parentLogger=parentLogger) except Exception as ex: raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex + self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping) def _indexName(self, jobID: int) -> str: """construct the index name @@ -59,7 +64,7 @@ def _createIndex(self, indexName: str) -> None: # Verifying if the index is there, and if not create it res = self.existingIndex(indexName) if not res["OK"] or not res["Value"]: - result = self.createIndex(indexName, mapping, period=None) + result = self.createIndex(indexName, period=None) if not result["OK"]: self.log.error(result["Message"]) raise RuntimeError(result["Message"]) diff --git a/tests/Integration/Core/Test_ElasticsearchDB.py b/tests/Integration/Core/Test_ElasticsearchDB.py index b89938c7a60..890b78ca806 100644 --- a/tests/Integration/Core/Test_ElasticsearchDB.py +++ b/tests/Integration/Core/Test_ElasticsearchDB.py @@ -88,7 +88,7 @@ def test_bulkindex(): assert result["Value"] == 10 time.sleep(5) indexes = elasticSearchDB.getIndexes() - assert type(indexes) == list + assert isinstance(indexes, list) # it will be list for index in indexes: res = elasticSearchDB.deleteIndex(index) assert res["OK"] @@ -101,7 +101,7 @@ def test_bulkindexMonthly(): assert result["Value"] == 10 time.sleep(5) indexes = elasticSearchDB.getIndexes() - assert type(indexes) == list + assert isinstance(indexes, list) # it will be list for index in indexes: res = elasticSearchDB.deleteIndex(index) assert res["OK"] @@ -496,8 +496,10 @@ def test_Search(): # assertEqual(result.aggregations['2'].buckets[1]['end_data'].buckets[0].avg_buckets, {u'value': 4}) @pytest.fixture def setUpAndTearDown(): - result = elasticSearchDB.createIndex("my-index", {}) - assert result["OK"] + res = elasticSearchDB.existingIndex("my-index") + if not res["OK"] or not res["Value"]: + result = elasticSearchDB.createIndex("my-index", {}, period=None) + assert result["OK"] result = elasticSearchDB.index( indexName="my-index", body={"quantity": 1, "Product": "a", "timestamp": 1458226213000}, docID=1 )