Skip to content

Commit

Permalink
Replace BrokerToClient::ArtifactTransferredResponse with BrokerToClie…
Browse files Browse the repository at this point in the history
…nt::GeneralError.

This new message lets the broker tell the client about an error, with the
assumption that the client will shut down when it receives the message. In
theory, we could log the message at a high level as well, but this type of
error will likely cause a whole slew of individual jobs to fail.
  • Loading branch information
nfachan committed Jan 30, 2025
1 parent 7cd1ba6 commit 2a18aa5
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 81 deletions.
2 changes: 1 addition & 1 deletion crates/maelstrom-base/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum BrokerToClient {
JobResponse(ClientJobId, JobOutcomeResult),
JobStatusUpdate(ClientJobId, JobBrokerStatus),
TransferArtifact(Sha256Digest),
ArtifactTransferredResponse(Sha256Digest, Result<(), String>),
GeneralError(String),
}

/// Message sent from a client to the broker. After sending the initial [`Hello`], a client will
Expand Down
9 changes: 2 additions & 7 deletions crates/maelstrom-broker/src/scheduler_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,8 @@ impl<TempFileT, ArtifactStreamT> ArtifactGathererDeps
let _ = sender.send(BrokerToClient::TransferArtifact(digest));
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
let _ = sender.send(BrokerToClient::ArtifactTransferredResponse(digest, result));
fn send_general_error_to_client(&mut self, sender: &mut Self::ClientSender, error: String) {
let _ = sender.send(BrokerToClient::GeneralError(error));
}

fn send_jobs_ready_to_scheduler(&mut self, jids: NonEmpty<JobId>) {
Expand Down
83 changes: 33 additions & 50 deletions crates/maelstrom-broker/src/scheduler_task/artifact_gatherer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ pub trait Deps {
sender: &mut Self::ClientSender,
digest: Sha256Digest,
);
fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
);
fn send_general_error_to_client(&mut self, sender: &mut Self::ClientSender, error: String);
fn send_jobs_ready_to_scheduler(&mut self, jobs: NonEmpty<JobId>);
fn send_jobs_failed_to_scheduler(&mut self, jobs: NonEmpty<JobId>, err: String);
}
Expand Down Expand Up @@ -329,10 +324,9 @@ where
match self.cache.got_artifact(&digest, file) {
Err((err, jobs)) => {
let client = self.clients.get_mut(&cid).unwrap();
self.deps.send_artifact_transferred_response_to_client(
self.deps.send_general_error_to_client(
&mut client.sender,
digest,
Err(err.to_string()),
format!("error incorporating artifact {digest} into cache: {err}"),
);
let jobs = jobs.into_iter().filter(|jid| {
// It's possible that the job failed for some other reason while we were
Expand All @@ -351,12 +345,6 @@ where
}
}
Ok(jobs) => {
let client = self.clients.get_mut(&cid).unwrap();
self.deps.send_artifact_transferred_response_to_client(
&mut client.sender,
digest.clone(),
Ok(()),
);
let ready = jobs.into_iter().filter(|jid| {
let client = self.clients.get_mut(&jid.cid).unwrap();
// It's possible that the job failed for some other reason while we were
Expand Down Expand Up @@ -512,8 +500,7 @@ mod tests {
send_message_to_manifest_reader: HashSet<ManifestReadRequest<i32>>,
send_message_to_worker_artifact_fetcher: HashSet<(i32, Option<(PathBuf, u64)>)>,
send_transfer_artifact_to_client: Vec<(ClientId, Sha256Digest)>,
send_artifact_transferred_response_to_client:
Vec<(ClientId, Sha256Digest, Result<(), String>)>,
send_general_error_to_client: Vec<(ClientId, String)>,
send_jobs_ready_to_scheduler: Vec<HashSet<JobId>>,
send_jobs_failed_to_scheduler: Vec<(HashSet<JobId>, String)>,
client_sender_dropped: HashSet<ClientId>,
Expand Down Expand Up @@ -544,9 +531,9 @@ mod tests {
self.send_transfer_artifact_to_client,
);
assert!(
self.send_artifact_transferred_response_to_client.is_empty(),
"unused mock entries for Deps::send_artifact_transferred_response_to_client: {:?}",
self.send_artifact_transferred_response_to_client,
self.send_general_error_to_client.is_empty(),
"unused mock entries for Deps::send_general_error_to_client: {:?}",
self.send_general_error_to_client,
);
assert!(
self.send_jobs_ready_to_scheduler.is_empty(),
Expand Down Expand Up @@ -653,20 +640,13 @@ mod tests {
vec.remove(index);
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut Self::ClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
let vec = &mut self
.borrow_mut()
.send_artifact_transferred_response_to_client;
fn send_general_error_to_client(&mut self, sender: &mut Self::ClientSender, error: String) {
let vec = &mut self.borrow_mut().send_general_error_to_client;
let index = vec
.iter()
.position(|e| e.0 == sender.cid && e.1 == digest && e.2 == result)
.position(|e| e.0 == sender.cid && e.1 == error)
.expect(&format!(
"sending unexpected artifact_transferred_response to client {cid}: {digest} {result:?}",
"sending unexpected general_error to client {cid}: {error}",
cid = sender.cid,
));
let _ = vec.remove(index);
Expand Down Expand Up @@ -915,17 +895,16 @@ mod tests {
self
}

fn send_artifact_transferred_response_to_client(
fn send_general_error_to_client(
self,
cid: impl Into<ClientId>,
digest: impl Into<Sha256Digest>,
result: Result<(), String>,
error: impl Into<String>,
) -> Self {
self.fixture
.mock
.borrow_mut()
.send_artifact_transferred_response_to_client
.push((cid.into(), digest.into(), result));
.send_general_error_to_client
.push((cid.into(), error.into()));
self
}

Expand All @@ -944,13 +923,13 @@ mod tests {
fn send_jobs_failed_to_scheduler(
self,
jobs: impl IntoIterator<Item = impl Into<JobId>>,
err: impl ToString,
err: impl Into<String>,
) -> Self {
self.fixture
.mock
.borrow_mut()
.send_jobs_failed_to_scheduler
.push((jobs.into_iter().map(Into::into).collect(), err.to_string()));
.push((jobs.into_iter().map(Into::into).collect(), err.into()));
self
}

Expand Down Expand Up @@ -991,7 +970,7 @@ mod tests {
self,
digest: impl Into<Sha256Digest>,
file: Option<&str>,
error: impl ToString,
error: impl Into<String>,
jobs: impl IntoIterator<Item = impl Into<JobId>>,
) -> Self {
self.fixture
Expand All @@ -1001,7 +980,7 @@ mod tests {
.insert(
(digest.into(), file.map(Into::into)),
Err((
anyhow!(error.to_string()),
anyhow!(error.into()),
jobs.into_iter().map(Into::into).collect(),
)),
)
Expand Down Expand Up @@ -1284,7 +1263,6 @@ mod tests {
fixture
.expect()
.got_artifact_success(2, None, [] as [JobId; 0])
.send_artifact_transferred_response_to_client(1, 2, Ok(()))
.when()
.receive_artifact_transferred(1, 2, ArtifactUploadLocation::Remote);
}
Expand All @@ -1310,20 +1288,17 @@ mod tests {
fixture
.expect()
.got_artifact_success(5, None, [(1, 2), (1, 3)])
.send_artifact_transferred_response_to_client(1, 5, Ok(()))
.send_jobs_ready_to_scheduler([(1, 2)])
.when()
.receive_artifact_transferred(1, 5, ArtifactUploadLocation::Remote);
fixture
.expect()
.got_artifact_success(6, None, [(1, 3)])
.send_artifact_transferred_response_to_client(1, 6, Ok(()))
.when()
.receive_artifact_transferred(1, 6, ArtifactUploadLocation::Remote);
fixture
.expect()
.got_artifact_success(7, None, [(1, 3)])
.send_artifact_transferred_response_to_client(1, 7, Ok(()))
.send_jobs_ready_to_scheduler([(1, 3)])
.when()
.receive_artifact_transferred(1, 7, ArtifactUploadLocation::Remote);
Expand All @@ -1335,7 +1310,13 @@ mod tests {
fixture
.expect()
.got_artifact_failure(2, None, "error", [] as [JobId; 0])
.send_artifact_transferred_response_to_client(1, 2, Err("error".into()))
.send_general_error_to_client(
1,
format!(
"error incorporating artifact {} into cache: error",
Sha256Digest::from(2)
),
)
.when()
.receive_artifact_transferred(1, 2, ArtifactUploadLocation::Remote);
}
Expand Down Expand Up @@ -1369,7 +1350,13 @@ mod tests {
fixture
.expect()
.got_artifact_failure(5, None, "error", [(1, 2), (1, 3), (2, 2)])
.send_artifact_transferred_response_to_client(1, 5, Err("error".into()))
.send_general_error_to_client(
1,
format!(
"error incorporating artifact {} into cache: error",
Sha256Digest::from(5)
),
)
.send_jobs_failed_to_scheduler([(1, 2), (1, 3), (2, 2)], "error")
.when()
.receive_artifact_transferred(1, 5, ArtifactUploadLocation::Remote);
Expand Down Expand Up @@ -1442,13 +1429,11 @@ mod tests {
fixture
.expect()
.got_artifact_success(6, None, [(1, 2)])
.send_artifact_transferred_response_to_client(1, 6, Ok(()))
.when()
.artifact_transferred(1, 6, ArtifactUploadLocation::Remote);
fixture
.expect()
.got_artifact_success(5, None, [(1, 2)])
.send_artifact_transferred_response_to_client(1, 5, Ok(()))
.send_jobs_ready_to_scheduler([(1, 2)])
.when()
.artifact_transferred(1, 5, ArtifactUploadLocation::Remote);
Expand All @@ -1471,7 +1456,6 @@ mod tests {
fixture
.expect()
.got_artifact_success(3, None, [(1, 2), (2, 2)])
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.send_jobs_ready_to_scheduler([(1, 2), (2, 2)])
.when()
.artifact_transferred(1, 3, ArtifactUploadLocation::Remote);
Expand All @@ -1489,7 +1473,6 @@ mod tests {
fixture
.expect()
.got_artifact_success(3, None, [(1, 2)])
.send_artifact_transferred_response_to_client(1, 3, Ok(()))
.read_artifact(3, 33)
.send_message_to_manifest_reader(3, 33)
.when()
Expand Down
14 changes: 4 additions & 10 deletions crates/maelstrom-broker/src/scheduler_task/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,16 +798,10 @@ mod tests {
.push(ToClient(sender.0, BrokerToClient::TransferArtifact(digest)));
}

fn send_artifact_transferred_response_to_client(
&mut self,
sender: &mut TestClientSender,
digest: Sha256Digest,
result: Result<(), String>,
) {
self.borrow_mut().messages.push(ToClient(
sender.0,
BrokerToClient::ArtifactTransferredResponse(digest, result),
));
fn send_general_error_to_client(&mut self, sender: &mut TestClientSender, error: String) {
self.borrow_mut()
.messages
.push(ToClient(sender.0, BrokerToClient::GeneralError(error)));
}

fn send_jobs_ready_to_scheduler(&mut self, jobs: NonEmpty<JobId>) {
Expand Down
18 changes: 5 additions & 13 deletions crates/maelstrom-client-process/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::artifact_pusher;
use anyhow::{anyhow, Error, Result};
use anyhow::{anyhow, bail, Error, Result};
use maelstrom_base::{
proto::{BrokerToClient, BrokerToWorker, ClientToBroker, WorkerToBroker},
ClientId, ClientJobId, JobId, JobOutcomeResult, JobSpec, Sha256Digest,
Expand Down Expand Up @@ -139,14 +139,8 @@ impl<DepsT: Deps> Router<DepsT> {
self.deps
.start_artifact_transfer_to_broker(digest, path.to_owned());
}
Message::Broker(BrokerToClient::ArtifactTransferredResponse(digest, result)) => {
assert!(!self.standalone);
let path = self.artifacts.get(&digest).ok_or_else(|| {
anyhow!("got request for unknown artifact with digest {digest}")
})?;
result.map_err(|err| {
anyhow!("error transferring artifact: {}: {err}", path.display())
})?;
Message::Broker(BrokerToClient::GeneralError(err)) => {
bail!("received error from broker: {err}");
}
Message::LocalWorker(WorkerToBroker::JobResponse(jid, result)) => {
self.receive_job_response(jid.cjid, result);
Expand Down Expand Up @@ -491,17 +485,16 @@ mod tests {
Broker(TransferArtifact(digest!(1))) => {
StartArtifactTransferToBroker(digest!(1), path_buf!("bar")),
};
Broker(ArtifactTransferredResponse(digest!(1), Ok(()))) => {}
}

script_test! {
broker_transfer_artifact_known_clustered_error,
Fixture::new(false, Some("error transferring artifact: bar: test error".into())),
Fixture::new(false, Some("received error from broker: test error".into())),
AddArtifact(path_buf!("bar"), digest!(1)) => {};
Broker(TransferArtifact(digest!(1))) => {
StartArtifactTransferToBroker(digest!(1), path_buf!("bar")),
};
Broker(ArtifactTransferredResponse(digest!(1), Err("test error".into()))) => {}
Broker(GeneralError("test error".into())) => {}
}

script_test! {
Expand All @@ -512,7 +505,6 @@ mod tests {
Broker(TransferArtifact(digest!(1))) => {
StartArtifactTransferToBroker(digest!(1), path_buf!("bar")),
};
Broker(ArtifactTransferredResponse(digest!(1), Ok(()))) => {}
}

script_test! {
Expand Down

0 comments on commit 2a18aa5

Please sign in to comment.