Skip to content

Commit

Permalink
Deal with receiving multiple writes at once in GitHub read queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Dec 31, 2024
1 parent c965f37 commit 6a730dd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/maelstrom-github/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ anyhow.workspace = true
azure_core.workspace = true
azure_storage_blobs.workspace = true
base64.workspace = true
bincode.workspace = true
chrono.workspace = true
futures.workspace = true
reqwest.workspace = true
strum.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
Expand Down
54 changes: 34 additions & 20 deletions crates/maelstrom-github/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ use crate::{two_hours_from_now, Artifact, BackendIds, GitHubClient};
use anyhow::{anyhow, Result};
use azure_storage_blobs::prelude::BlobClient;
use futures::stream::StreamExt as _;
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::FromRepr;

#[derive(FromRepr)]
#[repr(u8)]
#[derive(Serialize, Deserialize)]
enum MessageHeader {
KeepAlive,
Payload,
Payload { size: usize },
Shutdown,
}

Expand All @@ -24,6 +23,7 @@ pub struct GitHubReadQueue {
blob: BlobClient,
index: usize,
etag: Option<azure_core::Etag>,
pending: VecDeque<Option<Vec<u8>>>,
}

impl GitHubReadQueue {
Expand All @@ -33,6 +33,7 @@ impl GitHubReadQueue {
blob,
index: 0,
etag: None,
pending: Default::default(),
})
}

Expand Down Expand Up @@ -82,23 +83,36 @@ impl GitHubReadQueue {
}

pub async fn read_msg(&mut self) -> Result<Option<Vec<u8>>> {
if let Some(msg) = self.pending.pop_front() {
return Ok(msg);
}

let mut read_start = Instant::now();
loop {
if let Some(mut res) = self.maybe_read_msg().await? {
match MessageHeader::from_repr(res.remove(0))
.ok_or_else(|| anyhow!("malformed header"))?
{
MessageHeader::KeepAlive => {
read_start = Instant::now();
}
MessageHeader::Payload => {
return Ok(Some(res));
}
MessageHeader::Shutdown => {
return Ok(None);
if let Some(res) = self.maybe_read_msg().await? {
let mut r = &res[..];
while !r.is_empty() {
let header: MessageHeader = bincode::deserialize_from(&mut r)?;
match header {
MessageHeader::KeepAlive => {
read_start = Instant::now();
}
MessageHeader::Payload { size } => {
let payload = r[..size].to_vec();
r = &r[size..];
self.pending.push_back(Some(payload));
}
MessageHeader::Shutdown => {
self.pending.push_back(None);
}
}
}
}

if let Some(msg) = self.pending.pop_front() {
return Ok(msg);
}

if read_start.elapsed() > READ_TIMEOUT {
return Err(anyhow!("GitHub queue read timeout"));
}
Expand All @@ -110,7 +124,7 @@ async fn send_keep_alive(blob: Arc<BlobClient>) {
loop {
tokio::time::sleep(READ_TIMEOUT / 2).await;
let _ = blob
.append_block(&[MessageHeader::KeepAlive as u8][..])
.append_block(bincode::serialize(&MessageHeader::KeepAlive).unwrap())
.await;
}
}
Expand All @@ -130,7 +144,7 @@ impl GitHubWriteQueue {
}

pub async fn write_msg(&mut self, data: &[u8]) -> Result<()> {
let mut to_send = vec![MessageHeader::Payload as u8];
let mut to_send = bincode::serialize(&MessageHeader::Payload { size: data.len() }).unwrap();
to_send.extend(data);
self.blob.append_block(to_send).await?;

Expand All @@ -146,7 +160,7 @@ impl Drop for GitHubWriteQueue {
self.keep_alive.abort();
let blob = self.blob.clone();
tokio::task::spawn(async move {
blob.append_block(&[MessageHeader::Shutdown as u8][..]);
blob.append_block(bincode::serialize(&MessageHeader::Shutdown).unwrap());
});
}
}
Expand Down

0 comments on commit 6a730dd

Please sign in to comment.