Skip to content

Commit

Permalink
Let client accept artifacts other than tar
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Jan 22, 2024
1 parent 56693f8 commit 1fc77fb
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions crates/maelstrom-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,30 @@ use std::{
time::SystemTime,
};

fn push_one_artifact(broker_addr: BrokerAddr, path: PathBuf, digest: Sha256Digest) -> Result<()> {
let fs = Fs::new();
fn artifact_type_from_path(path: &Path) -> ArtifactType {
path.extension()
.and_then(|e| ArtifactType::try_from_extension(e.to_str().unwrap()))
.unwrap_or(ArtifactType::Binary)
}

fn push_one_artifact(
broker_addr: BrokerAddr,
path: PathBuf,
type_: ArtifactType,
digest: Sha256Digest,
) -> Result<()> {
let mut stream = TcpStream::connect(broker_addr.inner())?;
net::write_message_to_socket(&mut stream, Hello::ArtifactPusher)?;

let fs = Fs::new();
let file = fs.open_file(path)?;
let size = file.metadata()?.len();
let mut file = FixedSizeReader::new(file, size);
net::write_message_to_socket(&mut stream, Hello::ArtifactPusher)?;

net::write_message_to_socket(
&mut stream,
ArtifactPusherToBroker(ArtifactMetadata {
type_: ArtifactType::Tar,
type_,
digest,
size,
}),
Expand Down Expand Up @@ -65,14 +78,15 @@ fn calculate_digest(path: &Path) -> Result<(SystemTime, Sha256Digest)> {

enum DispatcherMessage {
BrokerToClient(BrokerToClient),
AddArtifact(PathBuf, Sha256Digest),
AddArtifact(PathBuf, Sha256Digest, ArtifactType),
AddJob(JobSpec, JobResponseHandler),
GetJobStateCounts(SyncSender<JobStateCounts>),
Stop,
}

struct ArtifactPushRequest {
path: PathBuf,
type_: ArtifactType,
digest: Sha256Digest,
}

Expand All @@ -98,7 +112,7 @@ impl ArtifactPusher {
if let Ok(msg) = self.receiver.recv() {
let broker_addr = self.broker_addr;
// N.B. We are ignoring this Result<_>
scope.spawn(move || push_one_artifact(broker_addr, msg.path, msg.digest));
scope.spawn(move || push_one_artifact(broker_addr, msg.path, msg.type_, msg.digest));
true
} else {
false
Expand All @@ -112,7 +126,7 @@ pub struct Dispatcher {
artifact_pusher: SyncSender<ArtifactPushRequest>,
stop_when_all_completed: bool,
next_client_job_id: u32,
artifacts: HashMap<Sha256Digest, PathBuf>,
artifacts: HashMap<Sha256Digest, (PathBuf, ArtifactType)>,
handlers: HashMap<ClientJobId, JobResponseHandler>,
stats_reqs: VecDeque<SyncSender<JobStateCounts>>,
}
Expand Down Expand Up @@ -156,18 +170,21 @@ impl Dispatcher {
}
}
DispatcherMessage::BrokerToClient(BrokerToClient::TransferArtifact(digest)) => {
let path = self.artifacts.get(&digest).unwrap().clone();
self.artifact_pusher
.send(ArtifactPushRequest { path, digest })?;
let (path, type_) = self.artifacts.get(&digest).unwrap().clone();
self.artifact_pusher.send(ArtifactPushRequest {
path,
type_,
digest,
})?;
}
DispatcherMessage::BrokerToClient(BrokerToClient::StatisticsResponse(_)) => {
unimplemented!("this client doesn't send statistics requests")
}
DispatcherMessage::BrokerToClient(BrokerToClient::JobStateCountsResponse(res)) => {
self.stats_reqs.pop_front().unwrap().send(res).ok();
}
DispatcherMessage::AddArtifact(path, digest) => {
self.artifacts.insert(digest, path);
DispatcherMessage::AddArtifact(path, digest, type_) => {
self.artifacts.insert(digest, (path, type_));
}
DispatcherMessage::AddJob(spec, handler) => {
let cjid = self.next_client_job_id.into();
Expand Down Expand Up @@ -425,6 +442,7 @@ impl Client {
pub fn add_artifact(&mut self, path: &Path) -> Result<Sha256Digest> {
let fs = Fs::new();
let path = fs.canonicalize(path)?;

let digest = if let Some(digest) = self.digest_repo.get(&path)? {
digest
} else {
Expand All @@ -433,8 +451,11 @@ impl Client {
digest
};
if !self.processed_artifact_paths.contains(&path) {
self.dispatcher_sender
.send(DispatcherMessage::AddArtifact(path.clone(), digest.clone()))?;
self.dispatcher_sender.send(DispatcherMessage::AddArtifact(
path.clone(),
digest.clone(),
artifact_type_from_path(&path),
))?;
self.processed_artifact_paths.insert(path);
}
Ok(digest)
Expand Down

0 comments on commit 1fc77fb

Please sign in to comment.