Skip to content

Commit

Permalink
feat: Add support for remote writing prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswk committed Oct 15, 2024
1 parent b5b63b7 commit 8e78468
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 74 deletions.
47 changes: 45 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ opentelemetry_sdk = { version = "0.24.0", features = [
"logs",
] }
prometheus = { version = "0.13.4", features = ["process", "push"] }
prometheus-reqwest-remote-write = { version = "0.1.1" }
prometheus-static-metric = "0.5.1"
rand = "0.8.5"
redis = { version = "0.27.0", features = [
Expand Down
5 changes: 4 additions & 1 deletion server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ mod tests {
upstream_certificate_file: Default::default(),
token_revalidation_interval_seconds: Default::default(),
prometheus_push_interval: 60,
prometheus_push_gateway: None,
prometheus_remote_write_url: None,
prometheus_user_id: None,
prometheus_password: None,
prometheus_username: None,
};

let result = build_edge(&args, "test-app").await;
Expand Down
6 changes: 3 additions & 3 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ pub struct EdgeArgs {
#[clap(long, env, default_value_t = false, conflicts_with = "strict")]
pub dynamic: bool,

/// Sets a push gateway url for prometheus metrics, if this is set, prometheus metrics will be automatically pushed
/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
#[clap(long, env)]
pub prometheus_push_gateway: Option<String>,
pub prometheus_remote_write_url: Option<String>,

/// Sets the interval for prometheus push metrics, only relevant if `prometheus_push_gateway` is set. Defaults to 60 seconds
/// Sets the interval for prometheus push metrics, only relevant if `prometheus_remote_write_url` is set. Defaults to 60 seconds
#[clap(long, env, default_value_t = 60)]
pub prometheus_push_interval: u64,

Expand Down
71 changes: 3 additions & 68 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;

use actix_cors::Cors;
use actix_middleware_etag::Etag;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use base64::Engine;
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use prometheus::labels;
use reqwest::header;
use tracing::{error, info};
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use utoipa::OpenApi;
Expand All @@ -31,6 +26,8 @@ use unleash_edge::{internal_backstage, tls};
#[cfg(not(tarpaulin_include))]
#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
use unleash_edge::metrics::metrics_pusher;

let args = CliArgs::parse();
let disable_all_endpoint = args.disable_all_endpoint;
if args.markdown_help {
Expand Down Expand Up @@ -171,7 +168,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => {
tracing::info!("Token validator validation of startup tokens was unexpectedly shut down");
}
_ = push_prom(prom_registry_for_write, edge.prometheus_push_gateway, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password) => {
_ = metrics_pusher::prometheus_remote_write(prom_registry_for_write, edge.prometheus_remote_write_url, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password) => {
tracing::info!("Prometheus push unexpectedly shut down");
}
}
Expand Down Expand Up @@ -199,68 +196,6 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}

async fn push_prom(
registry: prometheus::Registry,
url: Option<String>,
interval: u64,
username: Option<String>,
password: Option<String>,
) {
let sleep_duration = tokio::time::Duration::from_secs(interval);
let client = if let Some(uname) = username.clone() {
let mut headers = header::HeaderMap::new();
let mut value = header::HeaderValue::from_str(&format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(format!(
"{}:{}",
uname,
password.clone().unwrap_or_default()
))
))
.expect("Could not create header");
value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, value);
reqwest::Client::builder()
.default_headers(headers)
.build()
.expect("Could not build client")
} else {
reqwest::Client::new()
};
if let Some(address) = url {
loop {
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {

let encoder = prometheus::TextEncoder::new();
let metric_families = registry.gather();
let mut buf = String::new();
encoder.encode_utf8(&metric_families[..], &mut buf).expect("Could not serialize metrics");

info!("Pushing {} bytes", buf.len());
match client.post(address.clone()).body(buf).send().await {
Ok(r) => {
tracing::info!("Successfully posted data {r:?}");
tracing::info!("{}", r.text().await.expect("Failed to get body"));
}
Err(e) => {
tracing::error!("Err, arg {e:?}")
}
}
}

}
}
} else {
loop {
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {
}
}
}
}
}

#[cfg(not(tarpaulin_include))]
async fn clean_shutdown(
persistence: Option<Arc<dyn EdgePersistence>>,
Expand Down
78 changes: 78 additions & 0 deletions server/src/metrics/metrics_pusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use base64::Engine;
use prometheus_reqwest_remote_write::WriteRequest;
use reqwest::{header, Client};

fn get_http_client(username: Option<String>, password: Option<String>) -> Client {
if let Some(uname) = username.clone() {
let mut headers = header::HeaderMap::new();
let mut value = header::HeaderValue::from_str(&format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(format!(
"{}:{}",
uname,
password.clone().unwrap_or_default()
))
))
.expect("Could not create header");
value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, value);
reqwest::Client::builder()
.default_headers(headers)
.build()
.expect("Could not build client")
} else {
reqwest::Client::new()
}
}

pub async fn prometheus_remote_write(
registry: prometheus::Registry,
url: Option<String>,
interval: u64,
username: Option<String>,
password: Option<String>,
) {
let sleep_duration = tokio::time::Duration::from_secs(interval);
let client = get_http_client(username, password);
if let Some(address) = url {
loop {
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {
remote_write_prom(registry.clone(), address.clone(), client.clone()).await;
}
}
}
} else {
loop {
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {
}
}
}
}
}

async fn remote_write_prom(registry: prometheus::Registry, url: String, client: reqwest::Client) {
let write_request = WriteRequest::from_metric_families(registry.gather())
.expect("Could not format write request");
let http_request = write_request
.build_http_request(client.clone(), &url, "unleash_edge")
.expect("Failed to build http request");

match client.execute(http_request).await {
Ok(r) => {
if r.status().is_success() {
tracing::info!(
"Prometheus push successful with status: {} and text {}",
r.status(),
r.text().await.unwrap()
);
} else {
tracing::error!("Prometheus push failed with status: {}", r.status());
}
}
Err(e) => {
tracing::error!("Prometheus push failed with error: {}", e);
}
}
}
1 change: 1 addition & 0 deletions server/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tracing::trace;
pub mod actix_web_metrics;

pub mod client_metrics;
pub mod metrics_pusher;
pub mod route_formatter;

const EDGE_REQUIREMENT: &str = ">=17.0.0";
Expand Down

0 comments on commit 8e78468

Please sign in to comment.