Browse Source

Switch to a single subscription and SSE endpoint.

tokio-broadcast
Jeb Rosen 2 months ago
parent
commit
c8f6ef8fc1
5 changed files with 86 additions and 54 deletions
  1. +20
    -14
      index.html
  2. +38
    -23
      src/bus.rs
  3. +1
    -1
      src/lib.rs
  4. +26
    -16
      src/main.rs
  5. +1
    -0
      src/sse.rs

+ 20
- 14
index.html View File

@@ -26,11 +26,11 @@ h1, h2, h3, h4, h5, h6 {
</p>
<p>
Messages can be posted over HTTP:
<code>curl -d 'from=name&amp;text=message' localhost:8000/room/left</code>
<code>curl -d 'room=left&amp;from=name&amp;text=message' localhost:8000/message</code>
</p>
<p>
Rooms can also be watched from the command line:
<code>curl localhost:8000/room/right</code>
Rooms can also be watched directly:
<code>curl localhost:8000/events</code>
</p>

<div class="rooms">
@@ -54,19 +54,25 @@ h1, h2, h3, h4, h5, h6 {
</div>

<script>
function subscribe(uri, dest) {
function subscribe(uri, dests) {
const events = new EventSource(uri);
events.onmessage = function(event) {
const p = document.createElement("p");
p.innerText = event.data;
dest.appendChild(p);
const nlpos = event.data.indexOf("\n");
if (nlpos !== -1) {
const room = event.data.substring(0, nlpos);
if (dests[room]) {
const message = event.data.substring(nlpos + 1);
const p = document.createElement("p");
p.innerText = message;
dests[room].appendChild(p);
}
}
}
}

subscribe("/room/left", document.getElementById("room1"));
subscribe("/room/right", document.getElementById("room2"));
subscribe("/events", { "left": document.getElementById("room1"), "right": document.getElementById("room2") });

function listen_submit(form, uri) {
function listen_submit(form, room) {
form.addEventListener("submit", function(e) {
e.preventDefault();

@@ -78,9 +84,9 @@ function listen_submit(form, uri) {
const from = document.getElementById("input-user").value || "guest";

// TODO: ugly
const body = "from=" + encodeURIComponent(from) + "&text=" + encodeURIComponent(message);
const body = "room=" + encodeURIComponent(room) + "&from=" + encodeURIComponent(from) + "&text=" + encodeURIComponent(message);

fetch(uri, {
fetch("/message", {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded"
@@ -92,8 +98,8 @@ function listen_submit(form, uri) {
});
}

listen_submit(document.querySelector("#room1 .new-message"), "/room/left");
listen_submit(document.querySelector("#room2 .new-message"), "/room/right");
listen_submit(document.querySelector("#room1 .new-message"), "left");
listen_submit(document.querySelector("#room2 .new-message"), "right");
</script>
</body>
</html>

src/rooms.rs → src/bus.rs View File

@@ -1,5 +1,4 @@
//! A collection of "rooms" (like chat rooms) that manages
//! subscriptions and broadcasting.
//! A simple message bus.

use std::sync::Arc;

@@ -51,24 +50,29 @@ impl<T> Ring<T> {
}
}

/// A manager for "chat room" style messages.
pub struct Rooms<M> {
// ring buffer of (index, room, message)
data: Arc<Mutex<Ring<(String, M)>>>,
/// A Bus manages sending and receiving of messages.
pub struct Bus<M> {
// ring buffer of messages
data: Arc<Mutex<Ring<M>>>,
watch: (watch::Sender<Option<Index>>, watch::Receiver<Option<Index>>),
}

/// A subscription to a single chat "room" in a Rooms.
/// A subscription to messages on a Bus.
pub struct Subscription<M> {
data: Arc<Mutex<Ring<(String, M)>>>,
data: Arc<Mutex<Ring<M>>>,
filter: Box<dyn Fn(&M) -> bool + Send>,
last_read: Option<Index>,
last_known: Option<Index>,
room: String,
watch_rx: watch::Receiver<Option<Index>>,
}

impl<M> Rooms<M> {
/// Create a new Rooms message queue.
/// An error that can occur when reading from the bus
pub enum Error {
Behind,
}

impl<M> Bus<M> {
/// Create a new Bus.
pub fn new() -> Self {
// TODO: parameterize capacity
Self {
@@ -78,11 +82,11 @@ impl<M> Rooms<M> {
}

/// Send a message to all subscribers.
pub async fn send(&self, room: String, message: M) {
pub async fn send(&self, message: M) {
let next;
{
let mut data_lock = self.data.lock().await;
next = data_lock.push((room, message));
next = data_lock.push(message);
}

if let Err(_) = self.watch.0.broadcast(Some(next)) {
@@ -90,13 +94,22 @@ impl<M> Rooms<M> {
}
}

/// Create a new subscription to a specific room.
pub async fn subscribe(&self, room: String) -> Subscription<M> {
/// Create a new subscription to the bus
pub async fn subscribe(&self) -> Subscription<M> {
let data = self.data.clone();
let watch_rx = self.watch.1.clone();
let current = *watch_rx.get_ref();

Subscription { data, last_read: None, last_known: current, room, watch_rx }
Subscription { data, filter: Box::new(|_| true), last_read: None, last_known: current, watch_rx }
}
}

impl<M> Subscription<M> {
pub fn with_filter(self, filter: impl Fn(&M) -> bool + Send + 'static) -> Self {
Self {
filter: Box::new(filter),
..self
}
}
}

@@ -105,8 +118,10 @@ impl<M: Clone> Subscription<M> {
/// subscription is polled too slowly, it may "miss" messages. In this
/// situation, it will silently skip ahead to the most recent message.
///
/// Returns 'None' if the underlying Rooms instance has been dropped
pub async fn next(&mut self) -> Option<M> {
/// Returns 'None' if the underlying Bus has been dropped.
/// If next() returns an error, further calls to next() will attempt
/// to resume at the latest message in the bus.
pub async fn next(&mut self) -> Option<Result<M, Error>> {
loop {
if self.last_known.is_some() {
while self.last_read != self.last_known {
@@ -117,19 +132,19 @@ impl<M: Clone> Subscription<M> {
};

match data_lock.get(try_next) {
Some((msg_room, msg)) => {
Some(msg) => {
// Whether or not self message is returned, it was "read"
self.last_read = Some(try_next);

if *msg_room == self.room {
if (self.filter)(msg) {
// Return this one!
return Some(msg.clone());
return Some(Ok(msg.clone()));
}
}
None => {
// Assume we fell too far behind. Clear last_known and continue
//log::warn!("Fell behind.");
// We fell too far behind. Clear last_known and raise an error
self.last_known = None;
return Some(Err(Error::Behind));
}
}
}

+ 1
- 1
src/lib.rs View File

@@ -1,3 +1,3 @@
pub mod bus;
pub mod io_channel;
pub mod rooms;
pub mod sse;

+ 26
- 16
src/main.rs View File

@@ -7,11 +7,12 @@ use rocket::request::{Form, State};
use rocket::response::NamedFile;

use rocket_rooms::sse;
use rocket_rooms::rooms::Rooms;
use rocket_rooms::bus;

#[derive(rocket::FromForm)]
#[derive(Clone)]
struct Message {
pub room: String,
pub from: String,
pub text: String,
}
@@ -21,22 +22,30 @@ fn index() -> NamedFile {
NamedFile::open("index.html").expect("index.html")
}

#[get("/room/<room>")]
async fn room_stream(room: String, rooms: State<'_, Rooms<Message>>) -> sse::SSE {
// Subscribe to the room. 'subscription' is a Stream of Messages.
let mut subscription = rooms.subscribe(room).await;
#[get("/events")]
async fn messages(bus: State<'_, bus::Bus<Message>>) -> sse::SSE {
// Subscribe to messages
let mut subscription = bus.subscribe().await;

// Create the SSE stream
sse::with_writer(|mut writer| async move {
loop {
// Asynchronously get the next message from the room
let message = subscription.next().await.expect("rooms can't 'close'");
// Asynchronously get the next message
let event = match subscription.next().await.expect("bus can't 'close'") {
Ok(message) => {
// Format messages as "room\n<user> message".
// TODO: We already checked that 'room' has no newlines or anything else weird
let formatted = format!("{}\n<{}> {}", message.room, message.from, message.text);

// Format messages as "<user> hi!"
let formatted = format!("<{}> {}", message.from, message.text);
sse::Event::new(Some("message".into()), Some(formatted), None)
}
Err(bus::Error::Behind) => {
sse::Event::new(Some("behind".into()), None, None)
}
};

// Send the message to the client
if let Err(_) = writer.send(sse::Event::data(formatted)).await {
if let Err(_) = writer.send(event).await {
// An error usually (TODO: always?) means the client has disconnected
break;
}
@@ -44,16 +53,17 @@ async fn room_stream(room: String, rooms: State<'_, Rooms<Message>>) -> sse::SSE
})
}

#[post("/room/<room>", data="<form>")]
async fn post_message(room: String, form: Form<Message>, rooms: State<'_, Rooms<Message>>) {
// Send the message to the requested room
rooms.send(room, form.into_inner()).await;
#[post("/message", data="<form>")]
async fn post_message(form: Form<Message>, bus: State<'_, bus::Bus<Message>>) {
// Send the message
// TODO: ensure room name is alphanum only
bus.send(form.into_inner()).await;
}

fn main() {
rocket::ignite()
.manage(Rooms::<Message>::new())
.mount("/", routes![index, room_stream, post_message])
.manage(bus::Bus::<Message>::new())
.mount("/", routes![index, messages, post_message])
.launch()
.expect("server quit unexpectedly")
}

+ 1
- 0
src/sse.rs View File

@@ -89,6 +89,7 @@ impl SSEWriter {
}
}

// TODO: Cache-Control header?
impl<'r> Responder<'r> for SSE {
fn respond_to(self, _req: &'r Request<'_>) -> ResultFuture<'r> {
Box::pin(async move {


Loading…
Cancel
Save