Skip to content

Commit

Permalink
Remove the JobId from the ManifestReadRequest.
Browse files Browse the repository at this point in the history
Now, for a given manifest, there will only be one manifest read at a time.
  • Loading branch information
nfachan committed Jan 29, 2025
1 parent 42f6862 commit 91248ca
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 88 deletions.
31 changes: 19 additions & 12 deletions crates/maelstrom-broker/src/scheduler_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use tokio::{io::AsyncRead, sync::mpsc as tokio_mpsc, task::JoinSet};
pub struct ManifestReadRequest<ArtifactStreamT> {
manifest_stream: ArtifactStreamT,
digest: Sha256Digest,
jid: JobId,
}

#[derive(Debug)]
Expand Down Expand Up @@ -138,12 +137,17 @@ struct CacheManifestReader<ArtifactStreamT, TempFileT> {
async fn read_manifest<ArtifactStreamT: AsyncRead + Unpin, TempFileT>(
sender: SchedulerSender<TempFileT>,
stream: ArtifactStreamT,
job_id: JobId,
manifest: Sha256Digest,
) -> anyhow::Result<()> {
let mut reader = AsyncManifestReader::new(stream).await?;
while let Some(entry) = reader.next().await? {
if let ManifestEntryData::File(ManifestFileData::Digest(digest)) = entry.data {
sender.send(Message::GotManifestEntry(digest, job_id)).ok();
if let ManifestEntryData::File(ManifestFileData::Digest(entry_digest)) = entry.data {
sender
.send(Message::GotManifestEntry {
manifest_digest: manifest.clone(),
entry_digest,
})
.ok();
}
}
Ok(())
Expand All @@ -169,11 +173,10 @@ where
while let Some(req) = self.receiver.recv().await {
let sender = self.sender.clone();
self.tasks.spawn(async move {
let result = read_manifest(sender.clone(), req.manifest_stream, req.jid).await;
let result =
read_manifest(sender.clone(), req.manifest_stream, req.digest.clone()).await;
sender
.send(Message::FinishedReadingManifest(
req.digest, req.jid, result,
))
.send(Message::FinishedReadingManifest(req.digest, result))
.ok();
});
}
Expand Down Expand Up @@ -323,12 +326,16 @@ where
Message::StatisticsHeartbeat => self
.scheduler
.receive_statistics_heartbeat(&mut self.artifact_gatherer),
Message::GotManifestEntry(digest, jid) => {
self.artifact_gatherer.receive_manifest_entry(digest, jid)
Message::GotManifestEntry {
manifest_digest,
entry_digest,
} => {
self.artifact_gatherer
.receive_manifest_entry(manifest_digest, entry_digest);
}
Message::FinishedReadingManifest(digest, jid, result) => self
Message::FinishedReadingManifest(digest, result) => self
.artifact_gatherer
.receive_finished_reading_manifest(digest, jid, result),
.receive_finished_reading_manifest(digest, result),
Message::JobsReadyFromArtifactGatherer(ready) => {
self.scheduler
.receive_jobs_ready_from_artifact_gatherer(ready);
Expand Down
Loading

0 comments on commit 91248ca

Please sign in to comment.