diff --git a/api/project/__init__.py b/api/project/__init__.py index 6bc213ad8..366cb6ec9 100644 --- a/api/project/__init__.py +++ b/api/project/__init__.py @@ -15,7 +15,7 @@ from collections import defaultdict, OrderedDict from collections.abc import Iterable -from datetime import datetime +from datetime import datetime, timezone from flask import Flask, jsonify, request from requests.auth import HTTPBasicAuth from urllib.parse import urlparse @@ -239,14 +239,14 @@ if databaseMode == malcolm_utils.DatabaseMode.ElasticsearchRemote: import elasticsearch as DatabaseImport - from elasticsearch_dsl import Search as SearchClass + from elasticsearch_dsl import Search as SearchClass, A as AggregationClass DatabaseClass = DatabaseImport.Elasticsearch if opensearchHttpAuth: DatabaseInitArgs['basic_auth'] = opensearchHttpAuth else: import opensearchpy as DatabaseImport - from opensearchpy import Search as SearchClass + from opensearchpy import Search as SearchClass, A as AggregationClass DatabaseClass = DatabaseImport.OpenSearch if opensearchHttpAuth: @@ -1074,6 +1074,48 @@ def ready(): ) +@app.route( + f"{('/' + app.config['MALCOLM_API_PREFIX']) if app.config['MALCOLM_API_PREFIX'] else ''}/ingest-stats", + methods=['GET'], +) +def ingest_stats(): + """Provide an aggregation of each log source (host.name) with it's latest event.ingested + time. This can be used to know the most recent time a document was written from each + network sensor. + + Parameters + ---------- + request : Request + Uses 'doctype' from request arguments + Returns + ------- + fields + A dict where key is host.name and value is max(event.ingested) for that host + """ + global databaseClient + global SearchClass + global AggregationClass + + s = SearchClass( + using=databaseClient, + index=index_from_args(get_request_arguments(request)), + ).extra(size=0) + + hostAgg = AggregationClass('terms', field='host.name') + maxIngestAgg = AggregationClass('max', field='event.ingested') + s.aggs.bucket('host_names', hostAgg).metric('max_event_ingested', maxIngestAgg) + response = s.execute() + + return jsonify( + { + bucket.key: datetime.fromtimestamp(bucket.max_event_ingested.value / 1000, timezone.utc) + .replace(microsecond=0) + .isoformat() + for bucket in response.aggregations.host_names.buckets + } + ) + + @app.route( f"{('/' + app.config['MALCOLM_API_PREFIX']) if app.config['MALCOLM_API_PREFIX'] else ''}/ping", methods=['GET'] ) diff --git a/docs/api-ingest-stats.md b/docs/api-ingest-stats.md new file mode 100644 index 000000000..14c746414 --- /dev/null +++ b/docs/api-ingest-stats.md @@ -0,0 +1,17 @@ +# Document Ingest Statistics + +`GET` - /mapi/ingest-stats + +Executes an OpenSearch [bucket aggregation](https://opensearch.org/docs/latest/opensearch/bucket-agg/) query for the `host.name` field and its maximum (i.e., most regent) `event.ingested` UTC time value for all of Malcolm's indexed network traffic metadata. + +This can be used to know the most recent time a log was indexed for each network sensor. + +Example output: + +``` +{ + "malcolm": "2024-11-04T14:58:57+00:00", + "sensor_a": "2024-11-04T14:57:41+00:00", + "sensor_b": "2024-11-04T14:58:59+00:00" +} +``` \ No newline at end of file diff --git a/docs/api.md b/docs/api.md index 16c72ee8c..b467631d4 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,8 +1,9 @@ # API -* [Field Aggregations](api-aggregations.md) +* [Document Ingest Statistics](api-ingest-stats.md) * [Document Lookup](api-document-lookup.md) * [Event Logging](api-event-logging.md) +* [Field Aggregations](api-aggregations.md) * [Fields](api-fields.md) * [Indices](api-indices.md) * [Ping](api-ping.md)