-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: thread shared metadata file into rover to enable use of alarm #390
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
f9a1833
Thread shared metadata into rover to enable use of alarm.
macklin-10x c156c84
Use match instead of if/else
macklin-10x ea0fb7b
Revert "Use match instead of if/else"
macklin-10x 2fb5870
Make alarm method &self due to runtime borrow checking.
macklin-10x a12c209
Use a shared ownership approach that provides a per-file shared lock.
macklin-10x 41d9df0
Clippy fix
macklin-10x 589d352
Update Rust to 1.70.0 for github actions.
macklin-10x 627156f
Cargo format.
macklin-10x f4c4b08
Appease the paperclip.
macklin-10x 1875616
Update compiler error message expectations.
macklin-10x 57d488f
Refactor to eliminate lazy initialization.
macklin-10x File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,11 +5,11 @@ use serde_json::map::Map; | |
use serde_json::{self, Value}; | ||
use std::any::type_name; | ||
use std::borrow::Cow; | ||
use std::collections::HashSet; | ||
use std::fs::{rename, File, OpenOptions}; | ||
use std::io::{ErrorKind, Write}; | ||
use std::os::unix::io::{FromRawFd, IntoRawFd}; | ||
use std::os::unix::io::FromRawFd; | ||
use std::path::{Path, PathBuf}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::time::SystemTime; | ||
use time::{OffsetDateTime, UtcOffset}; | ||
|
||
|
@@ -28,7 +28,8 @@ pub struct Metadata { | |
run_file: String, | ||
raw_jobinfo: JsonDict, | ||
pub jobinfo: JobInfo, // Partially parsed Job info | ||
cache: HashSet<String>, | ||
/// Shared reference to the alarm file. | ||
alarm_file: SharedFile, | ||
} | ||
|
||
#[derive(Debug, Default, Clone, Serialize, Deserialize)] | ||
|
@@ -58,10 +59,11 @@ impl Default for Version { | |
} | ||
} | ||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
#[derive(Default, Debug, Clone, Serialize, Deserialize)] | ||
#[serde(rename_all = "snake_case")] | ||
#[non_exhaustive] | ||
pub enum ProfileMode { | ||
#[default] | ||
Disable, | ||
Cpu, | ||
Line, | ||
|
@@ -70,12 +72,6 @@ pub enum ProfileMode { | |
Pyspy, | ||
} | ||
|
||
impl Default for ProfileMode { | ||
fn default() -> ProfileMode { | ||
ProfileMode::Disable | ||
} | ||
} | ||
|
||
// Stuff that will be added to the _jobinfo under the "rust" key | ||
#[derive(Debug, Serialize)] | ||
struct RustAdapterInfo { | ||
|
@@ -150,23 +146,23 @@ impl Metadata { | |
let metadata_path = args.pop().unwrap(); | ||
let stage_type = args.pop().unwrap(); | ||
let stage_name = args.pop().unwrap(); | ||
let alarm_file = SharedFile::new(make_metadata_file_path(metadata_path.as_ref(), "alarm")); | ||
|
||
Metadata { | ||
stage_name, | ||
stage_type, | ||
metadata_path, | ||
files_path, | ||
run_file, | ||
cache: HashSet::new(), | ||
raw_jobinfo: Map::new(), | ||
jobinfo: JobInfo::default(), | ||
jobinfo: Default::default(), | ||
alarm_file, | ||
} | ||
} | ||
|
||
/// Path within chunk | ||
pub fn make_path(&self, name: &str) -> PathBuf { | ||
let md: &Path = self.metadata_path.as_ref(); | ||
md.join([METADATA_PREFIX, name].concat()) | ||
make_metadata_file_path(self.metadata_path.as_ref(), name) | ||
} | ||
|
||
/// Write to a file inside the chunk | ||
|
@@ -182,39 +178,32 @@ impl Metadata { | |
} | ||
|
||
/// Update the Martian journal -- so that Martian knows what we've updated | ||
fn update_journal_main(&mut self, name: &str, force: bool) -> Result<()> { | ||
fn update_journal(&self, name: &str) -> Result<()> { | ||
let journal_name: Cow<str> = if self.stage_type != "main" { | ||
format!("{}_{name}", self.stage_type).into() | ||
} else { | ||
name.into() | ||
}; | ||
|
||
if force || !self.cache.contains(name) { | ||
let tmp_run_file = format!("{}.{journal_name}.tmp", self.run_file); | ||
let run_file = &tmp_run_file[..tmp_run_file.len() - 4]; | ||
|
||
{ | ||
let mut f = File::create(&tmp_run_file)?; | ||
if let Err(err) = f.write_all(make_timestamp_now().as_bytes()) { | ||
// Pretty much ignore this error. The only reason we need | ||
// any content at all in this file is because some | ||
// filesystems behave strangely with completely empty files. | ||
eprintln!("Writing journal file {tmp_run_file}: {err}"); | ||
} | ||
let tmp_run_file = format!("{}.{journal_name}.tmp", self.run_file); | ||
let run_file = &tmp_run_file[..tmp_run_file.len() - 4]; | ||
|
||
{ | ||
let mut f = File::create(&tmp_run_file)?; | ||
if let Err(err) = f.write_all(make_timestamp_now().as_bytes()) { | ||
// Pretty much ignore this error. The only reason we need | ||
// any content at all in this file is because some | ||
// filesystems behave strangely with completely empty files. | ||
eprintln!("Writing journal file {tmp_run_file}: {err}"); | ||
} | ||
rename(tmp_run_file.as_str(), run_file).or_else(ignore_not_found)?; | ||
self.cache.insert(journal_name.into_owned()); | ||
} | ||
rename(tmp_run_file.as_str(), run_file).or_else(ignore_not_found)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
fn update_journal(&mut self, name: &str) -> Result<()> { | ||
self.update_journal_main(name, false) | ||
} | ||
|
||
/// Write JSON to a chunk file | ||
pub(crate) fn write_json_obj(&mut self, name: &str, object: &JsonDict) -> Result<()> { | ||
pub(crate) fn write_json_obj(&self, name: &str, object: &JsonDict) -> Result<()> { | ||
serde_json::to_writer_pretty(File::create(self.make_path(name))?, object)?; | ||
self.update_journal(name)?; | ||
Ok(()) | ||
|
@@ -271,40 +260,13 @@ impl Metadata { | |
Error::new(e).context(context) | ||
} | ||
|
||
fn _append(&mut self, name: &str, message: &str) -> Result<()> { | ||
let filename = self.make_path(name); | ||
let mut file = OpenOptions::new() | ||
.create(true) | ||
.append(true) | ||
.open(filename)?; | ||
file.write_all(message.as_bytes())?; | ||
file.write_all(b"\n")?; | ||
// Ensure the file is closed before we write the journal, to reduce | ||
// the chances that `mrp` sees the journal entry before the file content | ||
// has be sync'ed. This can be an issue on nfs systems. | ||
drop(file); | ||
self.update_journal(name)?; | ||
Ok(()) | ||
pub(crate) fn alarm_file(&self) -> &SharedFile { | ||
&self.alarm_file | ||
} | ||
|
||
/// Write to _log | ||
pub fn log(&mut self, level: &str, message: &str) -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For those not in our slack conversation, turns out this is unnecessary because we're already overriding the global-scope logging macros to write to the log. And we weren't using it. And we shouldn't, because this was buggy, as
|
||
let mut log_file = unsafe { File::from_raw_fd(3) }; | ||
|
||
log_file | ||
.write(format!("{} [{level}] {message}", make_timestamp_now()).as_bytes()) | ||
.and(log_file.flush())?; | ||
|
||
let _ = log_file.into_raw_fd(); | ||
Ok(()) | ||
} | ||
|
||
pub fn log_time(&mut self, message: &str) -> Result<()> { | ||
self.log("time", message) | ||
} | ||
|
||
pub fn alarm(&mut self, message: &str) -> Result<()> { | ||
self._append("alarm", &format!("{} {message}", make_timestamp_now())) | ||
/// Write a message to the stage alarms. | ||
pub fn alarm(&self, message: &str) -> Result<()> { | ||
self.alarm_file.appendln(message, true) | ||
} | ||
|
||
#[cold] | ||
|
@@ -369,6 +331,34 @@ impl Metadata { | |
} | ||
} | ||
|
||
fn make_metadata_file_path(metadata_dir: &Path, name: &str) -> PathBuf { | ||
metadata_dir.join([METADATA_PREFIX, name].concat()) | ||
} | ||
|
||
/// Manage shared access to a metadata file. | ||
#[derive(Debug, Clone)] | ||
pub(crate) struct SharedFile(Arc<Mutex<PathBuf>>); | ||
|
||
impl SharedFile { | ||
pub fn new(path: PathBuf) -> Self { | ||
Self(Arc::new(Mutex::new(path))) | ||
} | ||
|
||
/// Append the provided contents to the file. | ||
/// Creates the file if it does not exist. | ||
/// Appends a newline after contents. | ||
/// Prepends a timestamp if requested. | ||
pub fn appendln(&self, contents: &str, prepend_timestamp: bool) -> Result<()> { | ||
let path = self.0.lock().unwrap(); | ||
let mut file = OpenOptions::new().create(true).append(true).open(&*path)?; | ||
if prepend_timestamp { | ||
write!(file, "{} ", make_timestamp_now())?; | ||
} | ||
writeln!(file, "{contents}")?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we had this cache in the first place but it wasn't serving any real purpose. The only metadata files we might write to repeatedly are
_log
, which is handled bymrjob
_progress
, which this isn't exposing and for which we'd want to write the journal unconditionally every time._alarms
, for whichmrp
doesn't actually need the journal at all. And if you're writing enough alarms that journaling it repeatedly causes performance degradation, you have other problems to worry about.