Skip to content

Commit

Permalink
Pass mutable reference to github queue reader / writer functions
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Jan 1, 2025
1 parent ef8658b commit 189ec4a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
7 changes: 4 additions & 3 deletions crates/maelstrom-broker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,17 @@ async fn unassigned_github_connection_main<TempFileT>(
SchedulerMessage::WorkerDisconnected,
|scheduler_sender| async move {
let _ = net::github_queue_reader(
read_queue,
&mut read_queue,
scheduler_sender,
|msg| SchedulerMessage::FromWorker(id, msg),
&log_clone,
)
.await;
},
|scheduler_receiver| async move {
let _ = net::github_queue_writer(scheduler_receiver, write_queue, &log_clone2)
.await;
let _ =
net::github_queue_writer(scheduler_receiver, &mut write_queue, &log_clone2)
.await;
},
)
.await;
Expand Down
8 changes: 4 additions & 4 deletions crates/maelstrom-util/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ where
}

pub async fn github_queue_reader<MessageT, TransformedT>(
mut queue: GitHubReadQueue,
queue: &mut GitHubReadQueue,
channel: UnboundedSender<TransformedT>,
transform: impl Fn(MessageT) -> TransformedT,
log: &Logger,
) -> Result<()>
where
MessageT: Debug + DeserializeOwned,
{
while let Some(msg) = read_message_from_github_queue(&mut queue, log).await? {
while let Some(msg) = read_message_from_github_queue(queue, log).await? {
if channel.send(transform(msg)).is_err() {
break;
}
Expand Down Expand Up @@ -221,7 +221,7 @@ where

pub async fn github_queue_writer<MessageT>(
mut channel: UnboundedReceiver<MessageT>,
mut queue: GitHubWriteQueue,
queue: &mut GitHubWriteQueue,
log: &Logger,
) -> Result<()>
where
Expand All @@ -233,7 +233,7 @@ where
match tokio::time::timeout_at(next, channel.recv()).await {
Err(_) => {
if !to_send.is_empty() {
write_many_messages_to_github_queue(&mut queue, &to_send, log).await?;
write_many_messages_to_github_queue(queue, &to_send, log).await?;
to_send.clear();
}
next = Instant::now() + Duration::from_millis(10);
Expand Down
12 changes: 8 additions & 4 deletions crates/maelstrom-worker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,24 @@ impl BrokerConnection for GitHubQueue {
}

impl BrokerReadConnection for GitHubReadQueue {
async fn read_messages(self, dispatcher_sender: DispatcherSender, log: Logger) -> Result<()> {
net::github_queue_reader(self, dispatcher_sender, Message::Broker, &log)
async fn read_messages(
mut self,
dispatcher_sender: DispatcherSender,
log: Logger,
) -> Result<()> {
net::github_queue_reader(&mut self, dispatcher_sender, Message::Broker, &log)
.await
.context("error communicating with broker")
}
}

impl BrokerWriteConnection for GitHubWriteQueue {
async fn write_messages(
self,
mut self,
broker_socket_outgoing_receiver: BrokerSocketOutgoingReceiver,
log: Logger,
) -> Result<()> {
net::github_queue_writer(broker_socket_outgoing_receiver, self, &log)
net::github_queue_writer(broker_socket_outgoing_receiver, &mut self, &log)
.await
.context("error communicating with broker")
}
Expand Down

0 comments on commit 189ec4a

Please sign in to comment.