Skip to content

Commit

Permalink
fix: make sure to post correct registry to vm
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswk committed Oct 14, 2024
1 parent 255480a commit b5b63b7
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ anyhow = "1.0.89"
async-trait = "0.1.83"
aws-config = { version = "1.5.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.53.0", features = ["behavior-version-latest"] }
base64 = "0.22.1"
chrono = { version = "0.4.38", features = ["serde"] }
cidr = "0.3.0"
clap = { version = "4.5.19", features = ["derive", "env"] }
Expand Down
14 changes: 14 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ pub struct ClientIdentity {
pub pkcs12_passphrase: Option<String>,
}

pub enum PromAuth {
None,
Basic(String, String),
}

#[derive(Args, Debug, Clone)]
#[command(group(
ArgGroup::new("data-provider")
Expand Down Expand Up @@ -215,6 +220,15 @@ pub struct EdgeArgs {
/// Sets the interval for prometheus push metrics, only relevant if `prometheus_push_gateway` is set. Defaults to 60 seconds
#[clap(long, env, default_value_t = 60)]
pub prometheus_push_interval: u64,

#[clap(long, env)]
pub prometheus_username: Option<String>,

#[clap(long, env)]
pub prometheus_password: Option<String>,

#[clap(long, env)]
pub prometheus_user_id: Option<String>,
}

pub fn string_to_header_tuple(s: &str) -> Result<(String, String), String> {
Expand Down
48 changes: 42 additions & 6 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;

use actix_cors::Cors;
use actix_middleware_etag::Etag;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use base64::Engine;
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use prometheus::labels;
use reqwest::header;
use tracing::{error, info};
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ConnectVia;
use utoipa::OpenApi;
Expand Down Expand Up @@ -70,6 +75,7 @@ async fn main() -> Result<(), anyhow::Error> {

let openapi = openapi::ApiDoc::openapi();
let refresher_for_app_data = feature_refresher.clone();
let prom_registry_for_write = metrics_handler.registry.clone();
let server = HttpServer::new(move || {
let qs_config =
serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false));
Expand Down Expand Up @@ -165,7 +171,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ = validator.schedule_revalidation_of_startup_tokens(edge.tokens, lazy_feature_refresher) => {
tracing::info!("Token validator validation of startup tokens was unexpectedly shut down");
}
_ = push_prom(edge.prometheus_push_gateway, edge.prometheus_push_interval) => {
_ = push_prom(prom_registry_for_write, edge.prometheus_push_gateway, edge.prometheus_push_interval, edge.prometheus_username, edge.prometheus_password) => {
tracing::info!("Prometheus push unexpectedly shut down");
}
}
Expand Down Expand Up @@ -193,26 +199,56 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}

async fn push_prom(url: Option<String>, interval: u64) {
async fn push_prom(
registry: prometheus::Registry,
url: Option<String>,
interval: u64,
username: Option<String>,
password: Option<String>,
) {
let sleep_duration = tokio::time::Duration::from_secs(interval);
let client = if let Some(uname) = username.clone() {
let mut headers = header::HeaderMap::new();
let mut value = header::HeaderValue::from_str(&format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(format!(
"{}:{}",
uname,
password.clone().unwrap_or_default()
))
))
.expect("Could not create header");
value.set_sensitive(true);
headers.insert(header::AUTHORIZATION, value);
reqwest::Client::builder()
.default_headers(headers)
.build()
.expect("Could not build client")
} else {
reqwest::Client::new()
};
if let Some(address) = url {
loop {
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {

let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
let metric_families = registry.gather();
let mut buf = String::new();
encoder.encode_utf8(&metric_families[..], &mut buf).expect("Could not serialize metrics");
let client = reqwest::Client::new();

info!("Pushing {} bytes", buf.len());
match client.post(address.clone()).body(buf).send().await {
Ok(_) => {
tracing::info!("Successfully posted data")
Ok(r) => {
tracing::info!("Successfully posted data {r:?}");
tracing::info!("{}", r.text().await.expect("Failed to get body"));
}
Err(e) => {
tracing::error!("Err, arg {e:?}")
}
}
}

}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/src/metrics/actix_web_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ where

#[derive(Clone, Debug)]
pub struct PrometheusMetricsHandler {
registry: prometheus::Registry,
pub registry: prometheus::Registry,
}

impl PrometheusMetricsHandler {
Expand Down

0 comments on commit b5b63b7

Please sign in to comment.