diff --git a/crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs b/crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs index 7e111b8d..c1c61ab4 100644 --- a/crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs +++ b/crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs @@ -96,8 +96,8 @@ pub enum StartJob { } struct ManifestBeingRead { - current: JobId, - waiting: Vec, + jobs: Vec, + entries: HashSet, } pub struct ArtifactGatherer { @@ -138,6 +138,10 @@ where self.cache.decrement_refcount(&artifact); } } + + for manifest_being_read in self.manifests_being_read.values_mut() { + manifest_being_read.jobs.retain(|jid| jid.cid != cid); + } } pub fn start_job( @@ -248,12 +252,12 @@ where .assert_is_true(); match manifests_being_read.entry(digest.clone()) { Entry::Occupied(mut entry) => { - entry.get_mut().waiting.push(jid); + entry.get_mut().jobs.push(jid); } Entry::Vacant(entry) => { entry.insert(ManifestBeingRead { - current: jid, - waiting: Default::default(), + jobs: vec![jid], + entries: Default::default(), }); let manifest_stream = cache.read_artifact(digest); deps.send_message_to_manifest_reader(ManifestReadRequest { @@ -292,8 +296,7 @@ where Ok(()), ); } - let mut ready = Vec::new(); - for jid in jids { + let ready = jids.into_iter().filter(|jid| { let client = self.clients.get_mut(&jid.cid).unwrap(); let job = client.jobs.get_mut(&jid.cjid).unwrap(); let is_manifest = job.missing_artifacts.remove(&digest).unwrap(); @@ -302,15 +305,13 @@ where &mut self.deps, digest.clone(), is_manifest, - jid, + *jid, job, &mut self.manifests_being_read, ); - if job.have_all_artifacts() { - ready.push(jid); - } - } - if let Some(ready) = NonEmpty::from_vec(ready) { + job.have_all_artifacts() + }); + if let Some(ready) = NonEmpty::collect(ready) { self.deps.send_jobs_ready_to_scheduler(ready); } } @@ -322,29 +323,11 @@ where manifest_digest: Sha256Digest, entry_digest: Sha256Digest, ) { - let jid = self - .manifests_being_read - .get(&manifest_digest) + self.manifests_being_read + .get_mut(&manifest_digest) .unwrap() - .current; - let Some(client) = self.clients.get_mut(&jid.cid) else { - // This indicates that the client isn't around anymore. Just ignore this message. When - // the client disconnected, we canceled all of the outstanding requests. Ideally, we - // would have a way to cancel the outstanding manifest read, but we don't currently - // have that. - return; - }; - let job = client.jobs.get_mut(&jid.cjid).unwrap(); - Self::start_artifact_acquisition_for_job( - &mut self.cache, - &mut client.sender, - &mut self.deps, - entry_digest, - IsManifest::NotManifest, - jid, - job, - &mut self.manifests_being_read, - ); + .entries + .insert(entry_digest); } pub fn receive_finished_reading_manifest( @@ -352,44 +335,39 @@ where digest: Sha256Digest, result: anyhow::Result<()>, ) { - let Entry::Occupied(mut occupied_entry) = self.manifests_being_read.entry(digest.clone()) - else { - panic!("expected entry in manifests_being_read for {digest}"); - }; - let entry = occupied_entry.get_mut(); + let manifest_being_read = self.manifests_being_read.remove(&digest).unwrap(); match result { Err(err) => { - self.deps - .send_job_failure_to_scheduler(entry.current, err.to_string()); + for jid in manifest_being_read.jobs { + self.deps + .send_job_failure_to_scheduler(jid, err.to_string()); + } } Ok(()) => { - if let Some(client) = self.clients.get_mut(&entry.current.cid) { - let job = client.jobs.get_mut(&entry.current.cjid).unwrap(); + let ready = manifest_being_read.jobs.into_iter().filter(|jid| { + let client = self.clients.get_mut(&jid.cid).unwrap(); + let job = client.jobs.get_mut(&jid.cjid).unwrap(); job.manifests_being_read.remove(&digest).assert_is_true(); - if job.have_all_artifacts() { - self.deps - .send_jobs_ready_to_scheduler(NonEmpty::new(entry.current)); + for digest in &manifest_being_read.entries { + Self::start_artifact_acquisition_for_job( + &mut self.cache, + &mut client.sender, + &mut self.deps, + digest.clone(), + IsManifest::NotManifest, + *jid, + job, + &mut self.manifests_being_read, + ); } - } else { - // This indicates that the client isn't around anymore. Just ignore this - // message. When the client disconnected, we canceled all of the outstanding - // requests. + job.have_all_artifacts() + }); + if let Some(ready) = NonEmpty::collect(ready) { + self.deps.send_jobs_ready_to_scheduler(ready); } } } - - if entry.waiting.is_empty() { - occupied_entry.remove(); - } else { - entry.current = entry.waiting.remove(0); - let manifest_stream = self.cache.read_artifact(&digest); - self.deps - .send_message_to_manifest_reader(ManifestReadRequest { - manifest_stream, - digest: digest.clone(), - }); - } } pub fn receive_job_completed(&mut self, jid: JobId) { @@ -622,7 +600,9 @@ mod tests { self.borrow_mut() .get_artifact .remove(&(jid, digest)) - .unwrap() + .expect(&format!( + "sending unexpected get_artifact to cache for {jid}" + )) } fn got_artifact( @@ -1345,26 +1325,20 @@ mod tests { .send_message_to_manifest_reader(3, 33) .when() .start_job((1, 2), [(3, Manifest)], StartJob::NotReady); - fixture - .expect() - .get_artifact((1, 2), 4, GetArtifact::Success) - .when() - .receive_manifest_entry(3, 4); fixture.receive_manifest_entry(3, 4); - fixture - .expect() - .get_artifact((1, 2), 5, GetArtifact::Wait) - .when() - .receive_manifest_entry(3, 5); + fixture.receive_manifest_entry(3, 4); + fixture.receive_manifest_entry(3, 5); fixture.receive_manifest_entry(3, 5); + fixture.receive_manifest_entry(3, 6); + fixture.receive_manifest_entry(3, 6); fixture .expect() + .get_artifact((1, 2), 4, GetArtifact::Success) + .get_artifact((1, 2), 5, GetArtifact::Wait) .get_artifact((1, 2), 6, GetArtifact::Get) .send_transfer_artifact_to_client(1, 6) .when() - .receive_manifest_entry(3, 6); - fixture.receive_manifest_entry(3, 6); - fixture.receive_finished_reading_manifest(3, Ok(())); + .receive_finished_reading_manifest(3, Ok(())); fixture .expect() .got_artifact_success(6, None, [(1, 2)]) @@ -1515,23 +1489,17 @@ mod tests { .when() .start_job((2, 2), [(4, Manifest), (4, Manifest)], StartJob::NotReady); + fixture.receive_manifest_entry(3, 30); + fixture.receive_manifest_entry(3, 31); + + // When we finish reading, we should continue with both jobs. fixture .expect() .get_artifact((1, 2), 30, GetArtifact::Success) - .when() - .receive_manifest_entry(3, 30); - fixture - .expect() .get_artifact((1, 2), 31, GetArtifact::Success) - .when() - .receive_manifest_entry(3, 31); - - // When we finish reading, we should kick off the next. - fixture - .expect() - .send_jobs_ready_to_scheduler([(1, 2)]) - .read_artifact(3, 203) - .send_message_to_manifest_reader(3, 203) + .get_artifact((2, 1), 30, GetArtifact::Success) + .get_artifact((2, 1), 31, GetArtifact::Success) + .send_jobs_ready_to_scheduler([(1, 2), (2, 1)]) .when() .receive_finished_reading_manifest(3, Ok(())); }