Skip to content

Commit

Permalink
Merge pull request #7678 from fstagni/90_index_templates
Browse files Browse the repository at this point in the history
[9.0] Added index template for ElasticJobParametersDB
  • Loading branch information
fstagni authored Aug 20, 2024
2 parents 50db7b9 + 49ece32 commit f18be6f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
42 changes: 31 additions & 11 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,31 @@ def getIndexPrefix(self):
"""
return self.__indexPrefix

@ifConnected
def addIndexTemplate(
self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None
) -> dict:
"""Adds an index template.
:param self: self reference
:param str name: index name
:param list index_patterns: list of index patterns to match
:param dict mapping: it is the mapping of the index
"""
if settings is None:
settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}}
body = {
"index_patterns": index_patterns,
"priority": priority,
"template": {"settings": settings, "mappings": {"properties": mapping}},
}
try:
res = self.client.indices.put_index_template(name=name, body=body)
return S_OK(res)
except Exception as e: # pylint: disable=broad-except
sLog.exception()
return S_ERROR(e)

@ifConnected
def query(self, index: str, query):
"""Executes a query and returns its result (uses ES DSL language).
Expand Down Expand Up @@ -429,19 +454,14 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
sLog.warn("The period is not provided, so using non-periodic indexes names")
fullIndex = indexPrefix

res = self.existingIndex(fullIndex)
if not res["OK"]:
return res
elif res["Value"]:
return S_OK(fullIndex)

try:
sLog.info("Create index: ", fullIndex + str(mapping))
self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7

if not mapping:
self.client.indices.create(index=fullIndex)
else:
self.client.indices.create(index=fullIndex, body={"mappings": mapping})
return S_OK(fullIndex)
except Exception as e: # pylint: disable=broad-except
sLog.error("Can not create the index:", repr(e))
except Exception: # pylint: disable=broad-except
sLog.exception()
return S_ERROR("Can not create the index")

@ifConnected
Expand Down
24 changes: 1 addition & 23 deletions src/DIRAC/MonitoringSystem/DB/MonitoringDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
class MonitoringDB(ElasticDB):
"""Extension of ElasticDB for Monitoring system DB"""

def __init__(self, name="Monitoring/MonitoringDB", readOnly=False):
def __init__(self, name="Monitoring/MonitoringDB"):
"""Standard constructor"""

try:
Expand All @@ -55,7 +55,6 @@ def __init__(self, name="Monitoring/MonitoringDB", readOnly=False):
self.log.error("Can't connect to MonitoringDB", repr(ex))
raise ex

self.__readonly = readOnly
self.documentTypes = {}

# loads all monitoring indexes and types.
Expand All @@ -75,27 +74,6 @@ def __init__(self, name="Monitoring/MonitoringDB", readOnly=False):
"monitoringFields": monfields,
"period": period,
}
if self.__readonly:
self.log.info("Read only mode: no new index will be created")
else:
# Verifying if the index is there, and if not create it
res = self.existingIndex(f"{indexName}-*") # check with a wildcard
if res["OK"] and res["Value"]:
actualIndexName = self.generateFullIndexName(indexName, period)
res = self.existingIndex(actualIndexName) # check actual index
if not res["OK"] or not res["Value"]:
result = self.createIndex(indexName, self.documentTypes[monitoringType]["mapping"], period)
if not result["OK"]:
self.log.error(result["Message"])
raise RuntimeError(result["Message"])
self.log.always("Index created", actualIndexName)
else:
# in case the index pattern does not exist
result = self.createIndex(indexName, self.documentTypes[monitoringType]["mapping"], period)
if not result["OK"]:
self.log.error(result["Message"])
raise RuntimeError(result["Message"])
self.log.always("Index created", indexName)

def getIndexName(self, typeName):
"""
Expand Down
35 changes: 20 additions & 15 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@
from DIRAC.Core.Utilities import TimeUtilities

mapping = {
"properties": {
"JobID": {"type": "long"},
"timestamp": {"type": "date"},
"CPUNormalizationFactor": {"type": "long"},
"NormCPUTime(s)": {"type": "long"},
"Memory(kB)": {"type": "long"},
"TotalCPUTime(s)": {"type": "long"},
"MemoryUsed(kb)": {"type": "long"},
"HostName": {"type": "keyword"},
"GridCE": {"type": "keyword"},
"ModelName": {"type": "keyword"},
"Status": {"type": "keyword"},
"JobType": {"type": "keyword"},
}
"JobID": {"type": "long"},
"timestamp": {"type": "date"},
"PilotAgent": {"type": "keyword"},
"Pilot_Reference": {"type": "keyword"},
"JobGroup": {"type": "keyword"},
"CPUNormalizationFactor": {"type": "long"},
"NormCPUTime(s)": {"type": "long"},
"Memory(MB)": {"type": "long"},
"LocalAccount": {"type": "keyword"},
"TotalCPUTime(s)": {"type": "long"},
"PayloadPID": {"type": "long"},
"HostName": {"type": "text"},
"GridCE": {"type": "keyword"},
"CEQueue": {"type": "keyword"},
"BatchSystem": {"type": "keyword"},
"ModelName": {"type": "keyword"},
"Status": {"type": "keyword"},
"JobType": {"type": "keyword"},
}


Expand All @@ -42,6 +46,7 @@ def __init__(self, parentLogger=None):
super().__init__(self.fullname, self.index_name, parentLogger=parentLogger)
except Exception as ex:
raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex
self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping)

def _indexName(self, jobID: int) -> str:
"""construct the index name
Expand All @@ -59,7 +64,7 @@ def _createIndex(self, indexName: str) -> None:
# Verifying if the index is there, and if not create it
res = self.existingIndex(indexName)
if not res["OK"] or not res["Value"]:
result = self.createIndex(indexName, mapping, period=None)
result = self.createIndex(indexName, period=None)
if not result["OK"]:
self.log.error(result["Message"])
raise RuntimeError(result["Message"])
Expand Down
10 changes: 6 additions & 4 deletions tests/Integration/Core/Test_ElasticsearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_bulkindex():
assert result["Value"] == 10
time.sleep(5)
indexes = elasticSearchDB.getIndexes()
assert type(indexes) == list
assert isinstance(indexes, list) # it will be list
for index in indexes:
res = elasticSearchDB.deleteIndex(index)
assert res["OK"]
Expand All @@ -101,7 +101,7 @@ def test_bulkindexMonthly():
assert result["Value"] == 10
time.sleep(5)
indexes = elasticSearchDB.getIndexes()
assert type(indexes) == list
assert isinstance(indexes, list) # it will be list
for index in indexes:
res = elasticSearchDB.deleteIndex(index)
assert res["OK"]
Expand Down Expand Up @@ -496,8 +496,10 @@ def test_Search():
# assertEqual(result.aggregations['2'].buckets[1]['end_data'].buckets[0].avg_buckets, {u'value': 4})
@pytest.fixture
def setUpAndTearDown():
result = elasticSearchDB.createIndex("my-index", {})
assert result["OK"]
res = elasticSearchDB.existingIndex("my-index")
if not res["OK"] or not res["Value"]:
result = elasticSearchDB.createIndex("my-index", {}, period=None)
assert result["OK"]
result = elasticSearchDB.index(
indexName="my-index", body={"quantity": 1, "Product": "a", "timestamp": 1458226213000}, docID=1
)
Expand Down

0 comments on commit f18be6f

Please sign in to comment.