Skip to content

Commit

Permalink
Split send_message_to_client in Deps out into two functions, one for …
Browse files Browse the repository at this point in the history
…each message type sent.
  • Loading branch information
nfachan committed Jan 25, 2025
1 parent eff87f6 commit b7780bc
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 65 deletions.
17 changes: 15 additions & 2 deletions crates/maelstrom-broker/src/scheduler_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,21 @@ impl<TempFileT, ArtifactStreamT> ArtifactGathererDeps
let _ = sender.send(message);
}

fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient) {
let _ = sender.send(message);
fn send_transfer_artifact_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
) {
let _ = sender.send(BrokerToClient::TransferArtifact(digest));
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
let _ = sender.send(BrokerToClient::ArtifactTransferredResponse(digest, result));
}

fn send_job_ready_to_scheduler(&mut self, jid: JobId) {
Expand Down
153 changes: 93 additions & 60 deletions crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{cache::SchedulerCache, scheduler_task::ManifestReadRequest};
use maelstrom_base::{
proto::{ArtifactUploadLocation, BrokerToClient},
ArtifactType, ClientId, ClientJobId, JobId, NonEmpty, Sha256Digest,
proto::ArtifactUploadLocation, ArtifactType, ClientId, ClientJobId, JobId, NonEmpty,
Sha256Digest,
};
use maelstrom_util::{
cache::GetArtifact,
Expand All @@ -25,7 +25,17 @@ pub trait Deps {
sender: &mut Self::WorkerArtifactFetcherSender,
message: Option<(PathBuf, u64)>,
);
fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient);
fn send_transfer_artifact_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
);
fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
);
fn send_job_ready_to_scheduler(&mut self, jid: JobId);
fn send_job_failure_to_scheduler(&mut self, jid: JobId, err: String);
}
Expand Down Expand Up @@ -156,10 +166,7 @@ where
job.missing_artifacts
.insert(digest.clone(), is_manifest)
.assert_is_none();
deps.send_message_to_client(
client_sender,
BrokerToClient::TransferArtifact(digest.clone()),
);
deps.send_transfer_artifact_to_client(client_sender, digest.clone());
}
}
}
Expand Down Expand Up @@ -233,16 +240,18 @@ where
let client = self.clients.get_mut(&cid).unwrap();
match self.cache.got_artifact(&digest, file) {
Err(err) => {
self.deps.send_message_to_client(
self.deps.send_artifact_transferred_response_to_client(
&mut client.sender,
BrokerToClient::ArtifactTransferredResponse(digest, Err(err.to_string())),
digest,
Err(err.to_string()),
);
Default::default()
}
Ok(jids) => {
self.deps.send_message_to_client(
self.deps.send_artifact_transferred_response_to_client(
&mut client.sender,
BrokerToClient::ArtifactTransferredResponse(digest.clone(), Ok(())),
digest.clone(),
Ok(()),
);
for jid in jids {
let client = self.clients.get_mut(&jid.cid).unwrap();
Expand Down Expand Up @@ -387,7 +396,6 @@ mod tests {
use super::*;
use anyhow::Result;
use hashbag::HashBag;
use maelstrom_base::digest;
use std::{
cell::{RefCell, RefMut},
rc::Rc,
Expand All @@ -398,7 +406,9 @@ mod tests {
struct TestStateInner {
send_message_to_manifest_reader_returns: HashSet<ManifestReadRequest<i32>>,
send_message_to_worker_artifact_fetcher_returns: HashSet<(i32, Option<(PathBuf, u64)>)>,
send_message_to_client_returns: Vec<(ClientId, BrokerToClient)>,
send_transfer_artifact_to_client_returns: Vec<(ClientId, Sha256Digest)>,
send_artifact_transferred_response_to_client_returns:
Vec<(ClientId, Sha256Digest, Result<(), String>)>,
send_job_ready_to_scheduler: HashSet<JobId>,
get_artifact_returns: HashMap<(JobId, Sha256Digest), GetArtifact>,
got_artifact_returns: HashMap<(Sha256Digest, Option<String>), Result<Vec<JobId>>>,
Expand All @@ -420,9 +430,14 @@ mod tests {
self.send_message_to_worker_artifact_fetcher_returns,
);
assert!(
self.send_message_to_client_returns.is_empty(),
"unused test fixture entries for Deps::send_message_to_client: {:?}",
self.send_message_to_client_returns,
self.send_transfer_artifact_to_client_returns.is_empty(),
"unused test fixture entries for Deps::send_transfer_artifact_to_client: {:?}",
self.send_transfer_artifact_to_client_returns,
);
assert!(
self.send_artifact_transferred_response_to_client_returns.is_empty(),
"unused test fixture entries for Deps::send_artifact_transferred_response_to_client: {:?}",
self.send_artifact_transferred_response_to_client_returns,
);
assert!(
self.send_job_ready_to_scheduler.is_empty(),
Expand Down Expand Up @@ -482,17 +497,32 @@ mod tests {
self
}

fn send_message_to_client(
fn send_transfer_artifact_to_client(
mut self,
cid: impl Into<ClientId>,
message: BrokerToClient,
digest: impl Into<Sha256Digest>,
) -> Self {
self.test_state
.inner
.as_mut()
.unwrap()
.send_message_to_client_returns
.push((cid.into(), message));
.send_transfer_artifact_to_client_returns
.push((cid.into(), digest.into()));
self
}

fn send_artifact_transferred_response_to_client(
mut self,
cid: impl Into<ClientId>,
digest: impl Into<Sha256Digest>,
result: Result<(), String>,
) -> Self {
self.test_state
.inner
.as_mut()
.unwrap()
.send_artifact_transferred_response_to_client_returns
.push((cid.into(), digest.into(), result));
self
}

Expand Down Expand Up @@ -617,24 +647,45 @@ mod tests {
.assert_is_true();
}

fn send_message_to_client(
fn send_transfer_artifact_to_client(
&mut self,
sender: &mut Self::ClientSender,
message: BrokerToClient,
digest: Sha256Digest,
) {
let mut borrow = self.borrow_mut();
let send_message_to_client = &mut borrow
let send_transfer_artifact_to_client = &mut borrow
.inner
.as_mut()
.unwrap()
.send_message_to_client_returns;
let index = send_message_to_client
.send_transfer_artifact_to_client_returns;
let index = send_transfer_artifact_to_client
.iter()
.position(|e| e.0 == *sender && e.1 == message)
.position(|e| e.0 == *sender && e.1 == digest)
.expect(&format!(
"sending unexpected message to client {sender}: {message:#?}"
"sending unexpected transfer_artifact to client {sender}: {digest}"
));
send_message_to_client.remove(index);
send_transfer_artifact_to_client.remove(index);
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
let mut borrow = self.borrow_mut();
let send_artifact_transferred_response_to_client = &mut borrow
.inner
.as_mut()
.unwrap()
.send_artifact_transferred_response_to_client_returns;
let index = send_artifact_transferred_response_to_client
.iter()
.position(|e| e.0 == *sender && e.1 == digest && e.2 == result)
.expect(&format!(
"sending unexpected artifact_transferred_response to client {sender}: {digest}"
));
let _ = send_artifact_transferred_response_to_client.remove(index);
}

fn send_job_ready_to_scheduler(&mut self, jid: JobId) {
Expand Down Expand Up @@ -839,16 +890,13 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Tar)], StartJob::NotReady);

fixture
.expect()
.got_artifact(3, None, Ok([(1, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(3), Ok(())),
)
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.send_job_ready_to_scheduler((1, 2));
fixture.artifact_transferred(1, 3, ArtifactUploadLocation::Remote);
}
Expand All @@ -861,16 +909,13 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Tar), (3, Tar)], StartJob::NotReady);

fixture
.expect()
.got_artifact(3, None, Ok([(1, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(3), Ok(())),
)
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.send_job_ready_to_scheduler((1, 2));
fixture.artifact_transferred(1, 3, ArtifactUploadLocation::Remote);
}
Expand Down Expand Up @@ -926,7 +971,7 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Manifest)], StartJob::NotReady);
}

Expand All @@ -937,7 +982,7 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Manifest), (3, Manifest)], StartJob::NotReady);
}

Expand Down Expand Up @@ -1026,7 +1071,7 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 6, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(6)));
.send_transfer_artifact_to_client(1, 6);
fixture.receive_manifest_entry(6, (1, 2));

fixture.receive_manifest_entry(6, (1, 2));
Expand All @@ -1036,19 +1081,13 @@ mod tests {
fixture
.expect()
.got_artifact(6, None, Ok([(1, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(6), Ok(())),
);
.send_artifact_transferred_response_to_client(1, 6, Ok(()));
fixture.artifact_transferred(1, 6, ArtifactUploadLocation::Remote);

fixture
.expect()
.got_artifact(5, None, Ok([(1, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(5), Ok(())),
)
.send_artifact_transferred_response_to_client(1, 5, Ok(()))
.send_job_ready_to_scheduler((1, 2));
fixture.artifact_transferred(1, 5, ArtifactUploadLocation::Remote);
}
Expand All @@ -1062,7 +1101,7 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Tar)], StartJob::NotReady);

fixture.expect().get_artifact((2, 2), 3, GetArtifact::Wait);
Expand All @@ -1071,10 +1110,7 @@ mod tests {
fixture
.expect()
.got_artifact(3, None, Ok([(1, 2), (2, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(3), Ok(())),
)
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.send_job_ready_to_scheduler((1, 2))
.send_job_ready_to_scheduler((2, 2));
fixture.artifact_transferred(1, 3, ArtifactUploadLocation::Remote);
Expand All @@ -1088,16 +1124,13 @@ mod tests {
fixture
.expect()
.get_artifact((1, 2), 3, GetArtifact::Get)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job((1, 2), [(3, Manifest)], StartJob::NotReady);

fixture
.expect()
.got_artifact(3, None, Ok([(1, 2)]))
.send_message_to_client(
1,
BrokerToClient::ArtifactTransferredResponse(digest!(3), Ok(())),
)
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.read_artifact(3, 33)
.send_message_to_manifest_reader((1, 2), 3, 33);
fixture.artifact_transferred(1, 3, ArtifactUploadLocation::Remote);
Expand Down Expand Up @@ -1188,7 +1221,7 @@ mod tests {
.get_artifact((1, 1), 4, GetArtifact::Success)
.read_artifact(4, 44)
.send_message_to_manifest_reader((1, 1), 4, 44)
.send_message_to_client(1, BrokerToClient::TransferArtifact(digest!(3)));
.send_transfer_artifact_to_client(1, 3);
fixture.start_job(
(1, 1),
[(1, Tar), (2, Tar), (3, Tar), (4, Manifest), (1, Tar)],
Expand Down
20 changes: 17 additions & 3 deletions crates/maelstrom-broker/src/scheduler_task/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,12 +776,26 @@ mod tests {
.push(ToWorkerArtifactFetcher(sender.0, message));
}

fn send_message_to_client(
fn send_transfer_artifact_to_client(
&mut self,
sender: &mut TestClientSender,
message: BrokerToClient,
digest: Sha256Digest,
) {
self.borrow_mut()
.messages
.push(ToClient(sender.0, BrokerToClient::TransferArtifact(digest)));
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut TestClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
self.borrow_mut().messages.push(ToClient(sender.0, message));
self.borrow_mut().messages.push(ToClient(
sender.0,
BrokerToClient::ArtifactTransferredResponse(digest, result),
));
}

fn send_job_ready_to_scheduler(&mut self, jid: JobId) {
Expand Down

0 comments on commit b7780bc

Please sign in to comment.