Skip to content

Commit

Permalink
per-service worker monitoring infrastructure (#37)
Browse files Browse the repository at this point in the history
* per-service worker monitoring infrastructure

* improve worker report style, still basic
  • Loading branch information
dginev authored Aug 17, 2018
1 parent 5f042ac commit 52e971a
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 5 deletions.
43 changes: 43 additions & 0 deletions bin/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct TemplateContext {
entries: Option<Vec<HashMap<String, String>>>,
categories: Option<Vec<HashMap<String, String>>>,
whats: Option<Vec<HashMap<String, String>>>,
workers: Option<Vec<HashMap<String, String>>>,
}
impl Default for TemplateContext {
fn default() -> Self {
Expand All @@ -120,6 +121,7 @@ impl Default for TemplateContext {
entries: None,
categories: None,
whats: None,
workers: None,
}
}
}
Expand Down Expand Up @@ -205,6 +207,46 @@ fn admin() -> Template {
Template::render("admin", context)
}

#[get("/workers/<service_name>")]
fn worker_report(service_name: String) -> Result<Template, NotFound<String>> {
let backend = Backend::default();
let service_name = aux_uri_unescape(Some(&service_name)).unwrap_or_else(|| UNKNOWN.to_string());
if let Ok(service) = Service::find_by_name(&service_name, &backend.connection) {
let mut global = HashMap::new();
global.insert(
"title".to_string(),
format!("Worker report for service {} ", &service_name),
);
global.insert(
"description".to_string(),
format!(
"Worker report for service {} as registered by the CorTeX dispatcher",
&service_name
),
);
global.insert("service_name".to_string(), service_name.to_string());
global.insert(
"service_description".to_string(),
service.description.clone(),
);
let mut context = TemplateContext {
global,
..TemplateContext::default()
};

let workers = service
.select_workers(&backend.connection)
.unwrap()
.into_iter()
.map(|w| w.into())
.collect();
context.workers = Some(workers);
Ok(Template::render("workers", context))
} else {
Err(NotFound(String::from("no such service")))
}
}

#[get("/corpus/<corpus_name>")]
fn corpus(corpus_name: String) -> Result<Template, NotFound<String>> {
let backend = Backend::default();
Expand Down Expand Up @@ -645,6 +687,7 @@ fn rocket() -> rocket::Rocket {
corpus,
favicon,
files,
worker_report,
top_service_report,
severity_service_report,
category_service_report,
Expand Down
3 changes: 2 additions & 1 deletion migrations/2017-10-01-204006_tasks/up.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Your SQL goes here
CREATE TABLE tasks (
CREATE TABLE tasks
(
id BIGSERIAL PRIMARY KEY,
service_id INTEGER NOT NULL,
corpus_id INTEGER NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions migrations/2018-08-16-200128_worker_metadata/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE worker_metadata;
16 changes: 16 additions & 0 deletions migrations/2018-08-16-200128_worker_metadata/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE TABLE worker_metadata
(
id SERIAL PRIMARY KEY,
service_id INTEGER NOT NULL,
last_dispatched_task_id BIGINT NOT NULL,
last_returned_task_id BIGINT,
total_dispatched INTEGER NOT NULL DEFAULT 0,
total_returned INTEGER NOT NULL DEFAULT 0,
first_seen TIMESTAMP NOT NULL,
session_seen TIMESTAMP,
time_last_dispatch TIMESTAMP NOT NULL,
time_last_return TIMESTAMP,
name varchar(200) NOT NULL
);
create unique index worker_id_idx on worker_metadata(name, service_id);
create index worker_service_idx on worker_metadata(service_id);
17 changes: 17 additions & 0 deletions src/dispatcher/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use backend;

enum EventKind {
Send,
Receive,
}

pub struct Event {
pub kind: EventKind,
pub time: String,
pub valid: bool,
pub task: String,
pub service: String,
pub identity: String,
}

pub fn register_event(e: Event) {}
9 changes: 8 additions & 1 deletion src/dispatcher/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use time;
use dispatcher::server;
use helpers;
use helpers::{TaskProgress, TaskReport, TaskStatus};
use models::Service;
use models::{Service, WorkerMetadata};

/// Specifies the binding and operation parameters for a ZMQ sink component
pub struct Sink {
Expand Down Expand Up @@ -166,6 +166,13 @@ impl Sink {
},
}
}
// Also update worker metadata for transparency
WorkerMetadata::record_received(
identity.to_string(),
service.id,
taskid,
self.backend_address.clone(),
)?;
} else {
// Otherwise just discard the rest of the message
println!(
Expand Down
14 changes: 12 additions & 2 deletions src/dispatcher/ventilator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use backend;
use dispatcher::server;
use helpers;
use helpers::{NewTaskMessage, TaskProgress, TaskReport, TaskStatus};
use models::Service;
use models::{Service, WorkerMetadata};
use std::error::Error;
use zmq::SNDMORE;

Expand Down Expand Up @@ -122,12 +122,14 @@ impl Ventilator {
}
}
}

ventilator.send_msg(identity, SNDMORE)?;
let mut taskid = -1;
if let Some(current_task_progress) = task_queue.pop() {
dispatched_task_opt = Some(current_task_progress.clone());

let current_task = current_task_progress.task;
let taskid = current_task.id;
taskid = current_task.id;
let serviceid = current_task.service_id;
println!(
"vent {}: worker {:?} received task {:?}",
Expand Down Expand Up @@ -168,6 +170,7 @@ impl Ventilator {
);
} else {
println!("-- Failed to prepare input stream for taskid {:?}", taskid);
taskid = -1;
ventilator.send(&[], 0)?;
}
}
Expand All @@ -179,6 +182,13 @@ impl Ventilator {
ventilator.send_str("0", SNDMORE)?;
ventilator.send(&[], 0)?;
}
// Update this worker's metadata
WorkerMetadata::record_dispatched(
identity_str,
service.id,
taskid,
self.backend_address.clone(),
)?;
} else {
println!(
"-- No such service in ventilator request: {:?}",
Expand Down
Loading

0 comments on commit 52e971a

Please sign in to comment.