Skip to content

Commit

Permalink
Keep better track of what has been downloaded
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Gjengset committed Nov 26, 2020
1 parent 9ad0ff6 commit 6b1bd18
Showing 1 changed file with 95 additions and 65 deletions.
160 changes: 95 additions & 65 deletions src/cargo/sources/registry/http_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use curl::easy::{Easy, HttpVersion, List};
use curl::multi::{EasyHandle, Multi};
use log::{debug, trace, warn};
use std::cell::{Cell, RefCell, RefMut};
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write as FmtWrite;
use std::fs::{self, File, OpenOptions};
use std::io::prelude::*;
Expand Down Expand Up @@ -151,10 +151,10 @@ pub struct HttpRegistry<'cfg> {
/// Does the config say that we can use HTTP multiplexing?
multiplexing: bool,

/// Has a prefetch phase been run?
/// What paths have we already prefetched?
///
/// If so, we do not need to double-check any index files -- the prefetch stage already did.
prefetched: bool,
/// We do not need to double-check any of these index files -- the prefetch stage already did.
prefetched: HashSet<PathBuf>,

/// If we are currently prefetching, all calls to RegistryData::load should go to disk.
is_prefetching: bool,
Expand All @@ -173,7 +173,7 @@ impl<'cfg> HttpRegistry<'cfg> {
prefetch: Multi::new(),
multiplexing: false,
downloads: Downloads::default(),
prefetched: false,
prefetched: HashSet::new(),
requested_update: false,
is_prefetching: false,
}
Expand Down Expand Up @@ -332,32 +332,35 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
let pkg = root.join(path);
let bytes;
let was = if pkg.exists() {
if self.at.get().0.is_synchronized() || !self.requested_update {
debug!("not prefetching fresh {}", name);
if self.at.get().0.is_synchronized()
|| !self.requested_update
|| self.prefetched.contains(path)
{
trace!("not prefetching fresh {}", name);

// We already have this file locally, and we don't need to double-check it with
// upstream because we have a changelog, or because the client hasn't requested an
// index update. So there's really nothing to prefetch. We do keep track of the
// request though so that we will eventually yield this back to the caller who may
// then want to prefetch other transitive dependencies.
if let Some(f) = self
.downloads
.eager
.iter_mut()
.find(|f| f.primary.path == path)
{
if &f.primary.req != req {
f.others.insert(req.clone());
use std::collections::btree_map::Entry;
match self.downloads.eager.entry(path.to_path_buf()) {
Entry::Occupied(mut o) => {
let o = o.get_mut();
if &o.primary.req != req {
o.others.insert(req.clone());
}
}
Entry::Vacant(v) => {
v.insert(MultiVersionFetched {
primary: Fetched {
path: path.to_path_buf(),
name,
req: req.clone(),
},
others: HashSet::new(),
});
}
} else {
self.downloads.eager.push(MultiVersionFetched {
primary: Fetched {
path: path.to_path_buf(),
name,
req: req.clone(),
},
others: HashSet::new(),
});
}
return Ok(());
}
Expand All @@ -380,7 +383,6 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
};

assert!(!self.config.offline());
debug!("double-checking freshness of {}", path.display());

let etag = std::str::from_utf8(etag)?;
let last_modified = std::str::from_utf8(last_modified)?;
Expand All @@ -401,18 +403,17 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
dl.additional_reqs.insert(req.clone());
}
return Ok(());
} else if let Some(f) = self
.downloads
.eager
.iter_mut()
.find(|f| f.primary.path == path)
{
} else if let Some(f) = self.downloads.eager.get_mut(path) {
if &f.primary.req != req {
f.others.insert(req.clone());
}
return Ok(());
}

if was.is_some() {
debug!("double-checking freshness of {}", path.display());
}

// Looks like we're going to have to bite the bullet and do a network request.
let url = self.source_id.url();
self.prepare()?;
Expand Down Expand Up @@ -474,7 +475,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
// That thread-local is set up in `next_prefetched` when it calls self.prefetch.perform,
// which is what ultimately calls this method.
handle.write_function(move |buf| {
debug!("{} - {} bytes of data", token, buf.len());
trace!("{} - {} bytes of data", token, buf.len());
tls::with(|downloads| {
if let Some(downloads) = downloads {
downloads.pending[&token]
Expand Down Expand Up @@ -536,17 +537,27 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
// We may already have packages that are ready to go. This takes care of grabbing the
// next of those, while ensuring that we yield every distinct version requirement for
// each package.
if let Some(mut fetched) = self.downloads.eager.pop() {
return if let Some(req) = fetched.others.iter().next().cloned() {
//
// TODO: Use the nightly BTreeMap::pop_first when stable.
if let Some(path) = self.downloads.eager.keys().next().cloned() {
use std::collections::btree_map::Entry;
let mut fetched = if let Entry::Occupied(o) = self.downloads.eager.entry(path) {
o
} else {
unreachable!();
};

return if let Some(req) = fetched.get().others.iter().next().cloned() {
let fetched = fetched.get_mut();
fetched.others.remove(&req);
let ret = Ok(Some(Fetched {
path: fetched.primary.path.clone(),
name: fetched.primary.name,
req,
}));
self.downloads.eager.push(fetched);
ret
} else {
let fetched = fetched.remove();
Ok(Some(fetched.primary))
};
}
Expand All @@ -564,7 +575,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
.perform()
.chain_err(|| "failed to perform http requests")
})?;
debug!("handles remaining: {}", remaining_in_multi);
trace!("handles remaining: {}", remaining_in_multi);

// Walk all the messages cURL came across in case anything completed.
let results = &mut self.downloads.results;
Expand All @@ -583,7 +594,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
//
// This will ultimately add more replies to self.downloads.eager, which we'll
while let Some((token, result)) = results.pop() {
debug!("{} finished with {:?}", token, result);
trace!("{} finished with {:?}", token, result);

let (dl, handle) = self
.downloads
Expand All @@ -603,10 +614,15 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
},
others: dl.additional_reqs,
};
assert!(
self.prefetched.insert(fetched.primary.path.clone()),
"downloaded the same path twice during prefetching"
);

let code = handle.response_code()?;
debug!(
"index file downloaded with status code {}",
"index file for {} downloaded with status code {}",
fetched.primary.name,
handle.response_code()?
);
match code {
Expand All @@ -628,13 +644,25 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
file.write_all(&data)?;
file.flush()?;

self.downloads.eager.push(fetched);
assert!(
self.downloads
.eager
.insert(fetched.primary.path.clone(), fetched)
.is_none(),
"download finished for already-finished path"
);
}
304 => {
// Not Modified response.
// There's nothing for us to do -- the index file is up to date.
// The only thing that matters is telling the caller about this package.
self.downloads.eager.push(fetched);
assert!(
self.downloads
.eager
.insert(fetched.primary.path.clone(), fetched)
.is_none(),
"download finished for already-finished path"
);
}
404 => {
// Not Found response.
Expand Down Expand Up @@ -672,7 +700,6 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {

debug!("prefetched all transitive dependencies");
self.is_prefetching = false;
self.prefetched = true;
Ok(None)
}

Expand Down Expand Up @@ -726,40 +753,42 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
};

let is_synchronized = self.at.get().0.is_synchronized();
let is_fresh =
is_synchronized || !self.requested_update || self.prefetched || self.is_prefetching;

// NOTE: We should double-check for changes to config.json even if synchronized.
let double_check =
!is_fresh || (self.requested_update && path.ends_with("config.json"));

if double_check {
if self.prefetched {
let is_fresh = if is_synchronized {
if self.requested_update && path.ends_with("config.json") {
debug!("double-checking freshness of {} on update", path.display());
false
} else {
trace!(
"not double-checking freshness of {} after prefetch",
path.display()
);
} else if self.config.offline() {
debug!(
"not double-checking freshness of {} due to offline",
"using local {} as changelog is synchronized",
path.display()
);
} else {
debug!("double-checking freshness of {}", path.display());
true
}
} else if is_synchronized {
} else if !self.requested_update {
trace!(
"using local {} as changelog is synchronized",
"using local {} as user did not request update",
path.display()
);
true
} else if self.config.offline() {
trace!("using local {} in offline mode", path.display());
true
} else if self.is_prefetching {
trace!("using local {} in load while prefetching", path.display());
true
} else if self.prefetched.contains(path) {
trace!(
"using local {} as it was already prefetched",
path.display()
);
true
} else {
debug!("using local {} as it is fresh enough", path.display());
}
debug!("double-checking freshness of {}", path.display());
false
};

// NOTE: If we're in offline mode, we don't double-check with the server.
if !double_check || self.config.offline() {
if is_fresh {
return data(rest);
} else {
// We cannot trust the index files and need to double-check with server.
Expand Down Expand Up @@ -904,7 +933,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {

// Make sure that subsequent loads double-check with the server again.
self.requested_update = true;
self.prefetched = false;
self.prefetched.clear();

self.prepare()?;
let path = self.config.assert_package_cache_locked(&self.index_path);
Expand Down Expand Up @@ -1496,7 +1525,8 @@ pub struct Downloads {
/// handle one at a time.
results: Vec<(usize, Result<(), curl::Error>)>,
/// Prefetch requests that we already have a response to.
eager: Vec<MultiVersionFetched>,
/// NOTE: Should this maybe be some kind of heap?
eager: BTreeMap<PathBuf, MultiVersionFetched>,
/// The next ID to use for creating a token (see `Download::token`).
next: usize,
}
Expand Down

0 comments on commit 6b1bd18

Please sign in to comment.