Skip to content

Commit

Permalink
When reading a manifest, accumulate all entries and jobs in a single …
Browse files Browse the repository at this point in the history
…entry.

This change ensures that we won't try to start thousands of simultaneous reads
for the same manifest.

Fixes #455.
  • Loading branch information
nfachan committed Jan 29, 2025
1 parent 91248ca commit bc3f9bf
Showing 1 changed file with 58 additions and 90 deletions.
148 changes: 58 additions & 90 deletions crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ pub enum StartJob {
}

struct ManifestBeingRead {
current: JobId,
waiting: Vec<JobId>,
jobs: Vec<JobId>,
entries: HashSet<Sha256Digest>,
}

pub struct ArtifactGatherer<CacheT: SchedulerCache, DepsT: Deps> {
Expand Down Expand Up @@ -138,6 +138,10 @@ where
self.cache.decrement_refcount(&artifact);
}
}

for manifest_being_read in self.manifests_being_read.values_mut() {
manifest_being_read.jobs.retain(|jid| jid.cid != cid);
}
}

pub fn start_job(
Expand Down Expand Up @@ -248,12 +252,12 @@ where
.assert_is_true();
match manifests_being_read.entry(digest.clone()) {
Entry::Occupied(mut entry) => {
entry.get_mut().waiting.push(jid);
entry.get_mut().jobs.push(jid);
}
Entry::Vacant(entry) => {
entry.insert(ManifestBeingRead {
current: jid,
waiting: Default::default(),
jobs: vec![jid],
entries: Default::default(),
});
let manifest_stream = cache.read_artifact(digest);
deps.send_message_to_manifest_reader(ManifestReadRequest {
Expand Down Expand Up @@ -292,8 +296,7 @@ where
Ok(()),
);
}
let mut ready = Vec::new();
for jid in jids {
let ready = jids.into_iter().filter(|jid| {
let client = self.clients.get_mut(&jid.cid).unwrap();
let job = client.jobs.get_mut(&jid.cjid).unwrap();
let is_manifest = job.missing_artifacts.remove(&digest).unwrap();
Expand All @@ -302,15 +305,13 @@ where
&mut self.deps,
digest.clone(),
is_manifest,
jid,
*jid,
job,
&mut self.manifests_being_read,
);
if job.have_all_artifacts() {
ready.push(jid);
}
}
if let Some(ready) = NonEmpty::from_vec(ready) {
job.have_all_artifacts()
});
if let Some(ready) = NonEmpty::collect(ready) {
self.deps.send_jobs_ready_to_scheduler(ready);
}
}
Expand All @@ -322,74 +323,51 @@ where
manifest_digest: Sha256Digest,
entry_digest: Sha256Digest,
) {
let jid = self
.manifests_being_read
.get(&manifest_digest)
self.manifests_being_read
.get_mut(&manifest_digest)
.unwrap()
.current;
let Some(client) = self.clients.get_mut(&jid.cid) else {
// This indicates that the client isn't around anymore. Just ignore this message. When
// the client disconnected, we canceled all of the outstanding requests. Ideally, we
// would have a way to cancel the outstanding manifest read, but we don't currently
// have that.
return;
};
let job = client.jobs.get_mut(&jid.cjid).unwrap();
Self::start_artifact_acquisition_for_job(
&mut self.cache,
&mut client.sender,
&mut self.deps,
entry_digest,
IsManifest::NotManifest,
jid,
job,
&mut self.manifests_being_read,
);
.entries
.insert(entry_digest);
}

pub fn receive_finished_reading_manifest(
&mut self,
digest: Sha256Digest,
result: anyhow::Result<()>,
) {
let Entry::Occupied(mut occupied_entry) = self.manifests_being_read.entry(digest.clone())
else {
panic!("expected entry in manifests_being_read for {digest}");
};
let entry = occupied_entry.get_mut();
let manifest_being_read = self.manifests_being_read.remove(&digest).unwrap();

match result {
Err(err) => {
self.deps
.send_job_failure_to_scheduler(entry.current, err.to_string());
for jid in manifest_being_read.jobs {
self.deps
.send_job_failure_to_scheduler(jid, err.to_string());
}
}
Ok(()) => {
if let Some(client) = self.clients.get_mut(&entry.current.cid) {
let job = client.jobs.get_mut(&entry.current.cjid).unwrap();
let ready = manifest_being_read.jobs.into_iter().filter(|jid| {
let client = self.clients.get_mut(&jid.cid).unwrap();
let job = client.jobs.get_mut(&jid.cjid).unwrap();
job.manifests_being_read.remove(&digest).assert_is_true();
if job.have_all_artifacts() {
self.deps
.send_jobs_ready_to_scheduler(NonEmpty::new(entry.current));
for digest in &manifest_being_read.entries {
Self::start_artifact_acquisition_for_job(
&mut self.cache,
&mut client.sender,
&mut self.deps,
digest.clone(),
IsManifest::NotManifest,
*jid,
job,
&mut self.manifests_being_read,
);
}
} else {
// This indicates that the client isn't around anymore. Just ignore this
// message. When the client disconnected, we canceled all of the outstanding
// requests.
job.have_all_artifacts()
});
if let Some(ready) = NonEmpty::collect(ready) {
self.deps.send_jobs_ready_to_scheduler(ready);
}
}
}

if entry.waiting.is_empty() {
occupied_entry.remove();
} else {
entry.current = entry.waiting.remove(0);
let manifest_stream = self.cache.read_artifact(&digest);
self.deps
.send_message_to_manifest_reader(ManifestReadRequest {
manifest_stream,
digest: digest.clone(),
});
}
}

pub fn receive_job_completed(&mut self, jid: JobId) {
Expand Down Expand Up @@ -622,7 +600,9 @@ mod tests {
self.borrow_mut()
.get_artifact
.remove(&(jid, digest))
.unwrap()
.expect(&format!(
"sending unexpected get_artifact to cache for {jid}"
))
}

fn got_artifact(
Expand Down Expand Up @@ -1345,26 +1325,20 @@ mod tests {
.send_message_to_manifest_reader(3, 33)
.when()
.start_job((1, 2), [(3, Manifest)], StartJob::NotReady);
fixture
.expect()
.get_artifact((1, 2), 4, GetArtifact::Success)
.when()
.receive_manifest_entry(3, 4);
fixture.receive_manifest_entry(3, 4);
fixture
.expect()
.get_artifact((1, 2), 5, GetArtifact::Wait)
.when()
.receive_manifest_entry(3, 5);
fixture.receive_manifest_entry(3, 4);
fixture.receive_manifest_entry(3, 5);
fixture.receive_manifest_entry(3, 5);
fixture.receive_manifest_entry(3, 6);
fixture.receive_manifest_entry(3, 6);
fixture
.expect()
.get_artifact((1, 2), 4, GetArtifact::Success)
.get_artifact((1, 2), 5, GetArtifact::Wait)
.get_artifact((1, 2), 6, GetArtifact::Get)
.send_transfer_artifact_to_client(1, 6)
.when()
.receive_manifest_entry(3, 6);
fixture.receive_manifest_entry(3, 6);
fixture.receive_finished_reading_manifest(3, Ok(()));
.receive_finished_reading_manifest(3, Ok(()));
fixture
.expect()
.got_artifact_success(6, None, [(1, 2)])
Expand Down Expand Up @@ -1515,23 +1489,17 @@ mod tests {
.when()
.start_job((2, 2), [(4, Manifest), (4, Manifest)], StartJob::NotReady);

fixture.receive_manifest_entry(3, 30);
fixture.receive_manifest_entry(3, 31);

// When we finish reading, we should continue with both jobs.
fixture
.expect()
.get_artifact((1, 2), 30, GetArtifact::Success)
.when()
.receive_manifest_entry(3, 30);
fixture
.expect()
.get_artifact((1, 2), 31, GetArtifact::Success)
.when()
.receive_manifest_entry(3, 31);

// When we finish reading, we should kick off the next.
fixture
.expect()
.send_jobs_ready_to_scheduler([(1, 2)])
.read_artifact(3, 203)
.send_message_to_manifest_reader(3, 203)
.get_artifact((2, 1), 30, GetArtifact::Success)
.get_artifact((2, 1), 31, GetArtifact::Success)
.send_jobs_ready_to_scheduler([(1, 2), (2, 1)])
.when()
.receive_finished_reading_manifest(3, Ok(()));
}
Expand Down

0 comments on commit bc3f9bf

Please sign in to comment.