Skip to content

Commit

Permalink
Create a new ArtifactGatherer trait to use in the Scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
nfachan committed Jan 22, 2025
1 parent d670e53 commit 68ceb29
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 118 deletions.
50 changes: 28 additions & 22 deletions crates/maelstrom-broker/src/scheduler_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ mod artifact_gatherer;
mod scheduler;

use crate::cache::SchedulerCache;
use artifact_gatherer::{ArtifactGatherer, Deps as ArtifactGathererDeps};
use maelstrom_base::{
manifest::{ManifestEntryData, ManifestFileData},
proto::{BrokerToClient, BrokerToMonitor, BrokerToWorker},
JobId, Sha256Digest,
};
use maelstrom_util::{manifest::AsyncManifestReader, sync};
use scheduler::{Message, Scheduler, SchedulerDeps};
use std::marker::PhantomData;
use std::{path::PathBuf, sync::mpsc as std_mpsc};
use tokio::io::AsyncRead;
use tokio::sync::mpsc as tokio_mpsc;
use tokio::task::JoinSet;
use tokio::{io::AsyncRead, sync::mpsc as tokio_mpsc, task::JoinSet};

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ManifestReadRequest<ArtifactStreamT> {
Expand All @@ -23,49 +21,51 @@ pub struct ManifestReadRequest<ArtifactStreamT> {
}

#[derive(Debug)]
pub struct PassThroughDeps<ArtifactStreamT>(PhantomData<ArtifactStreamT>);
pub struct PassThroughSchedulerDeps;

/// The production implementation of [SchedulerDeps]. This implementation just hands the
/// message to the provided sender.
impl<ArtifactStreamT> SchedulerDeps for PassThroughDeps<ArtifactStreamT> {
impl SchedulerDeps for PassThroughSchedulerDeps {
type ClientSender = tokio_mpsc::UnboundedSender<BrokerToClient>;
type WorkerSender = tokio_mpsc::UnboundedSender<BrokerToWorker>;
type MonitorSender = tokio_mpsc::UnboundedSender<BrokerToMonitor>;
type WorkerArtifactFetcherSender = std_mpsc::Sender<Option<(PathBuf, u64)>>;
type ArtifactStream = ArtifactStreamT;
type ManifestReaderSender =
tokio_mpsc::UnboundedSender<ManifestReadRequest<Self::ArtifactStream>>;

fn send_message_to_client(&mut self, sender: &mut Self::ClientSender, message: BrokerToClient) {
sender.send(message).ok();
let _ = sender.send(message);
}

fn send_message_to_worker(&mut self, sender: &mut Self::WorkerSender, message: BrokerToWorker) {
sender.send(message).ok();
let _ = sender.send(message);
}

fn send_message_to_monitor(
&mut self,
sender: &mut Self::MonitorSender,
message: BrokerToMonitor,
) {
sender.send(message).ok();
let _ = sender.send(message);
}

fn send_message_to_worker_artifact_fetcher(
&mut self,
sender: &mut Self::WorkerArtifactFetcherSender,
message: Option<(PathBuf, u64)>,
) {
sender.send(message).ok();
let _ = sender.send(message);
}
}

fn send_message_to_manifest_reader(
&mut self,
sender: &mut Self::ManifestReaderSender,
req: ManifestReadRequest<Self::ArtifactStream>,
) {
sender.send(req).ok();
#[derive(Debug)]
pub struct PassThroughArtifactGathererDeps<ArtifactStreamT>(
tokio_mpsc::UnboundedSender<ManifestReadRequest<ArtifactStreamT>>,
);

impl<ArtifactStreamT> ArtifactGathererDeps for PassThroughArtifactGathererDeps<ArtifactStreamT> {
type ArtifactStream = ArtifactStreamT;

fn send_message_to_manifest_reader(&mut self, req: ManifestReadRequest<Self::ArtifactStream>) {
let _ = self.0.send(req);
}
}

Expand Down Expand Up @@ -134,7 +134,10 @@ pub type SchedulerMessage<TempFileT> = Message<
pub type SchedulerSender<TempFileT> = tokio_mpsc::UnboundedSender<SchedulerMessage<TempFileT>>;

pub struct SchedulerTask<CacheT: SchedulerCache> {
scheduler: Scheduler<CacheT, PassThroughDeps<CacheT::ArtifactStream>>,
scheduler: Scheduler<
ArtifactGatherer<CacheT, PassThroughArtifactGathererDeps<CacheT::ArtifactStream>>,
PassThroughSchedulerDeps,
>,
sender: SchedulerSender<CacheT::TempFile>,
receiver: tokio_mpsc::UnboundedReceiver<SchedulerMessage<CacheT::TempFile>>,
}
Expand All @@ -153,7 +156,10 @@ where
tokio::task::spawn(async move { manifest_reader.run().await });

SchedulerTask {
scheduler: Scheduler::new(cache, manifest_reader_sender),
scheduler: Scheduler::new(ArtifactGatherer::new(
cache,
PassThroughArtifactGathererDeps(manifest_reader_sender),
)),
sender,
receiver,
}
Expand All @@ -175,7 +181,7 @@ where
/// ignore the error in that case. Besides, the [scheduler::SchedulerDeps] interface doesn't
/// give us a way to return an error, for precisely this reason.
pub async fn run(mut self) {
let mut deps = PassThroughDeps(PhantomData);
let mut deps = PassThroughSchedulerDeps;
sync::channel_reader(self.receiver, |msg| {
self.scheduler.receive_message(&mut deps, msg);
})
Expand Down
Loading

0 comments on commit 68ceb29

Please sign in to comment.