From b5b63b79213b5e8096add577d0c23d6daff3a4ea Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 14 Oct 2024 15:50:21 +0200 Subject: [PATCH] fix: make sure to post correct registry to vm --- Cargo.lock | 1 + server/Cargo.toml | 1 + server/src/cli.rs | 14 ++++++++ server/src/main.rs | 48 +++++++++++++++++++++---- server/src/metrics/actix_web_metrics.rs | 2 +- 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 11d5b7f3..c76650c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4261,6 +4261,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "base64 0.22.1", "chrono", "cidr", "clap", diff --git a/server/Cargo.toml b/server/Cargo.toml index 7822e1e7..1a04c0bc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -34,6 +34,7 @@ anyhow = "1.0.89" async-trait = "0.1.83" aws-config = { version = "1.5.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.53.0", features = ["behavior-version-latest"] } +base64 = "0.22.1" chrono = { version = "0.4.38", features = ["serde"] } cidr = "0.3.0" clap = { version = "4.5.19", features = ["derive", "env"] } diff --git a/server/src/cli.rs b/server/src/cli.rs index 69079b26..39e4ab30 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -136,6 +136,11 @@ pub struct ClientIdentity { pub pkcs12_passphrase: Option, } +pub enum PromAuth { + None, + Basic(String, String), +} + #[derive(Args, Debug, Clone)] #[command(group( ArgGroup::new("data-provider") @@ -215,6 +220,15 @@ pub struct EdgeArgs { /// Sets the interval for prometheus push metrics, only relevant if `prometheus_push_gateway` is set. Defaults to 60 seconds #[clap(long, env, default_value_t = 60)] pub prometheus_push_interval: u64, + + #[clap(long, env)] + pub prometheus_username: Option, + + #[clap(long, env)] + pub prometheus_password: Option, + + #[clap(long, env)] + pub prometheus_user_id: Option, } pub fn string_to_header_tuple(s: &str) -> Result<(String, String), String> { diff --git a/server/src/main.rs b/server/src/main.rs index 37e30299..92b96bed 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,12 +1,17 @@ +use std::collections::HashMap; use std::sync::Arc; use actix_cors::Cors; use actix_middleware_etag::Etag; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use base64::Engine; use clap::Parser; use dashmap::DashMap; use futures::future::join_all; +use prometheus::labels; +use reqwest::header; +use tracing::{error, info}; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::ConnectVia; use utoipa::OpenApi; @@ -70,6 +75,7 @@ async fn main() -> Result<(), anyhow::Error> { let openapi = openapi::ApiDoc::openapi(); let refresher_for_app_data = feature_refresher.clone(); + let prom_registry_for_write = metrics_handler.registry.clone(); let server = HttpServer::new(move || { let qs_config = serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false)); @@ -165,7 +171,7 @@ async fn main() -> Result<(), anyhow::Error> { _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => { tracing::info!("Token validator validation of startup tokens was unexpectedly shut down"); } - _ = push_prom(edge.prometheus_push_gateway, edge.prometheus_push_interval) => { + _ = push_prom(prom_registry_for_write, edge.prometheus_push_gateway, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password) => { tracing::info!("Prometheus push unexpectedly shut down"); } } @@ -193,26 +199,56 @@ async fn main() -> Result<(), anyhow::Error> { Ok(()) } -async fn push_prom(url: Option, interval: u64) { +async fn push_prom( + registry: prometheus::Registry, + url: Option, + interval: u64, + username: Option, + password: Option, +) { let sleep_duration = tokio::time::Duration::from_secs(interval); + let client = if let Some(uname) = username.clone() { + let mut headers = header::HeaderMap::new(); + let mut value = header::HeaderValue::from_str(&format!( + "Basic {}", + base64::engine::general_purpose::STANDARD.encode(format!( + "{}:{}", + uname, + password.clone().unwrap_or_default() + )) + )) + .expect("Could not create header"); + value.set_sensitive(true); + headers.insert(header::AUTHORIZATION, value); + reqwest::Client::builder() + .default_headers(headers) + .build() + .expect("Could not build client") + } else { + reqwest::Client::new() + }; if let Some(address) = url { loop { tokio::select! { _ = tokio::time::sleep(sleep_duration) => { + let encoder = prometheus::TextEncoder::new(); - let metric_families = prometheus::gather(); + let metric_families = registry.gather(); let mut buf = String::new(); encoder.encode_utf8(&metric_families[..], &mut buf).expect("Could not serialize metrics"); - let client = reqwest::Client::new(); + + info!("Pushing {} bytes", buf.len()); match client.post(address.clone()).body(buf).send().await { - Ok(_) => { - tracing::info!("Successfully posted data") + Ok(r) => { + tracing::info!("Successfully posted data {r:?}"); + tracing::info!("{}", r.text().await.expect("Failed to get body")); } Err(e) => { tracing::error!("Err, arg {e:?}") } } } + } } } else { diff --git a/server/src/metrics/actix_web_metrics.rs b/server/src/metrics/actix_web_metrics.rs index 2eb6c2a3..5edf8e91 100644 --- a/server/src/metrics/actix_web_metrics.rs +++ b/server/src/metrics/actix_web_metrics.rs @@ -387,7 +387,7 @@ where #[derive(Clone, Debug)] pub struct PrometheusMetricsHandler { - registry: prometheus::Registry, + pub registry: prometheus::Registry, } impl PrometheusMetricsHandler {