Browse Source

Reorganize and rename the SSE implementation to "v1".

tokio-broadcast
Jeb Rosen 2 months ago
parent
commit
589188db48
3 changed files with 146 additions and 134 deletions
  1. +0
    -134
      src/sse.rs
  2. +77
    -0
      src/sse/mod.rs
  3. +69
    -0
      src/sse/v1.rs

+ 0
- 134
src/sse.rs View File

@@ -1,134 +0,0 @@
//! An SSE Responder.
//!
//! This module might be suitable for inclusion in rocket_contrib.

use std::future::Future;

use rocket::request::Request;
use rocket::response::{Responder, Response, ResultFuture};
use tokio::io::{BufWriter, AsyncWrite, AsyncWriteExt};

use super::io_channel::{io_channel, IoChannelReader, IoChannelWriter};

// TODO: Comprehensive support for all possible message types and fields:
// * comments
// * 'retry' field
// * custom fields (ignored by EventSource API, but worth considering)
/// A single SSE message, with optional `event`, `data`, and `id` fields.
#[derive(Clone)]
pub struct Event {
event: Option<String>,
id: Option<String>,
data: Option<String>,
}

impl Event {
/// Create a new Event with only the data field specified
pub fn data<S: Into<String>>(data: S) -> Self {
Self { event: None, id: None, data: Some(data.into()) }
}

// TODO: Result instead of panic!
/// Create a new Event with event, data, and id all (optionally) specified
///
/// # Panics
///
/// Panics if either `event` or `id` contain newlines
pub fn new(event: Option<String>, data: Option<String>, id: Option<String>) -> Self {
if event.as_ref().map_or(false, |e| e.find(|b| b == '\r' || b == '\n').is_some()) {
panic!("event cannot contain newlines");
}

if id.as_ref().map_or(false, |i| i.find(|b| b == '\r' || b == '\n').is_some()) {
panic!("id cannot contain newlines");
}

Self { event, id, data }
}

/// Writes this event to a `writer` according in the EventStream
/// format
//TODO: Remove Unpin bound?
pub async fn write_to<W: AsyncWrite + Unpin>(self, mut writer: W) -> Result<(), std::io::Error> {
if let Some(event) = self.event {
writer.write_all(b"event: ").await?;
writer.write_all(event.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
if let Some(id) = self.id {
writer.write_all(b"id: ").await?;
writer.write_all(id.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
if let Some(data) = self.data {
for line in data.lines() {
writer.write_all(b"data: ").await?;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
}
writer.write_all(b"\n").await?;
Ok(())
}
}

/// The 'read half' of an SSE stream. This type implements `Responder`; see the
/// [`with_writer`] function for a usage example.
pub struct SSE(IoChannelReader);

/// The 'send half' of an SSE stream. You can use the [`SSEWriter::send`] method
/// to send events to the stream
pub struct SSEWriter(BufWriter<IoChannelWriter>);

impl SSEWriter {
/// Sends the `event` to the connected client
pub async fn send(&mut self, event: Event) -> Result<(), std::io::Error> {
event.write_to(&mut self.0).await?;
self.0.flush().await?;
Ok(())
}
}

// TODO: Cache-Control header?
impl<'r> Responder<'r> for SSE {
fn respond_to(self, _req: &'r Request<'_>) -> ResultFuture<'r> {
Box::pin(async move {
Response::build()
.raw_header("Content-Type", "text/event-stream")
.streamed_body(self.0)
.ok()
})
}
}

/// Creates an SSE stream based on an [`SSEWriter`].
///
/// Typical usage:
///
/// ```rust
/// # use rocket::get;
/// #
///
/// use rocket_rooms::sse::{self, Event, SSE};
/// #[get("/stream")]
/// fn stream() -> SSE {
/// sse::with_writer(|mut writer| async move {
/// writer.send(Event::data("data1")).await.unwrap();
/// writer.send(Event::data("data2")).await.unwrap();
/// writer.send(Event::data("data3")).await.unwrap();
/// })
/// }
/// ```
pub fn with_writer<F, Fut>(func: F) -> SSE
where
F: FnOnce(SSEWriter) -> Fut,
Fut: Future<Output=()> + Send + 'static,
{
let (tx, rx) = io_channel();
tokio::spawn(func(SSEWriter(BufWriter::new(tx))));
SSE(rx)
}

// TODO: Consider an SSEStream that wraps an Stream<Item=Event>.
// Users would probably need to use something like async_stream, and the
// AsyncRead impl would probably have to be a pretty complex state machine

+ 77
- 0
src/sse/mod.rs View File

@@ -0,0 +1,77 @@
//! An SSE Responder.
//!
//! This module might be suitable for inclusion in rocket_contrib.

use tokio::io::{AsyncWrite, AsyncWriteExt};

mod v1;
pub use v1::{SSE, SSEWriter, with_writer};

// TODO: Comprehensive support for all possible message types and fields:
// * comments
// * 'retry' field
// * custom fields (ignored by EventSource API, but worth considering)
/// A single SSE message, with optional `event`, `data`, and `id` fields.
#[derive(Clone)]
pub struct Event {
event: Option<String>,
id: Option<String>,
data: Option<String>,
}

impl Event {
/// Create a new Event with only the data field specified
pub fn data<S: Into<String>>(data: S) -> Self {
Self { event: None, id: None, data: Some(data.into()) }
}

// TODO: Result instead of panic!
/// Create a new Event with event, data, and id all (optionally) specified
///
/// # Panics
///
/// Panics if either `event` or `id` contain newlines
pub fn new(event: Option<String>, data: Option<String>, id: Option<String>) -> Self {
if event.as_ref().map_or(false, |e| e.find(|b| b == '\r' || b == '\n').is_some()) {
panic!("event cannot contain newlines");
}

if id.as_ref().map_or(false, |i| i.find(|b| b == '\r' || b == '\n').is_some()) {
panic!("id cannot contain newlines");
}

Self { event, id, data }
}

/// Writes this event to a `writer` according in the EventStream
/// format
//TODO: Remove Unpin bound?
pub async fn write_to<W: AsyncWrite + Unpin>(self, mut writer: W) -> Result<(), std::io::Error> {
writer.write_all(&self.serialize()).await
}

pub fn serialize(self) -> Vec<u8> {
let mut vec = vec![];

if let Some(event) = self.event {
vec.extend(b"event: ");
vec.extend(event.into_bytes());
vec.extend(b"\n");
}
if let Some(id) = self.id {
vec.extend(b"id: ");
vec.extend(id.into_bytes());
vec.extend(b"\n");
}
if let Some(data) = self.data {
for line in data.lines() {
vec.extend(b"data: ");
vec.extend(line.as_bytes());
vec.extend(b"\n");
}
}
vec.extend(b"\n");

vec
}
}

+ 69
- 0
src/sse/v1.rs View File

@@ -0,0 +1,69 @@
//! The V1 implementation using io_channel

use std::future::Future;

use rocket::request::Request;
use rocket::response::{Responder, Response, ResultFuture};
use tokio::io::{BufWriter, AsyncWriteExt};

use crate::io_channel::{io_channel, IoChannelReader, IoChannelWriter};
use super::Event;


/// The 'read half' of an SSE stream. This type implements `Responder`; see the
/// [`with_writer`] function for a usage example.
pub struct SSE(IoChannelReader);

/// The 'send half' of an SSE stream. You can use the [`SSEWriter::send`] method
/// to send events to the stream
pub struct SSEWriter(BufWriter<IoChannelWriter>);

impl SSEWriter {
/// Sends the `event` to the connected client
pub async fn send(&mut self, event: Event) -> Result<(), std::io::Error> {
event.write_to(&mut self.0).await?;
self.0.flush().await?;
Ok(())
}
}

// TODO: Cache-Control header?
impl<'r> Responder<'r> for SSE {
fn respond_to(self, _req: &'r Request<'_>) -> ResultFuture<'r> {
Box::pin(async move {
Response::build()
.raw_header("Content-Type", "text/event-stream")
.streamed_body(self.0)
.ok()
})
}
}

/// Creates an SSE stream based on an [`SSEWriter`].
///
/// Typical usage:
///
/// ```rust
/// # use rocket::get;
/// #
///
/// use rocket_rooms::sse::{self, Event, SSE};
/// #[get("/stream")]
/// fn stream() -> SSE {
/// sse::with_writer(|mut writer| async move {
/// writer.send(Event::data("data1")).await.unwrap();
/// writer.send(Event::data("data2")).await.unwrap();
/// writer.send(Event::data("data3")).await.unwrap();
/// })
/// }
/// ```
pub fn with_writer<F, Fut>(func: F) -> SSE
where
F: FnOnce(SSEWriter) -> Fut,
Fut: Future<Output=()> + Send + 'static,
{
let (tx, rx) = io_channel();
tokio::spawn(func(SSEWriter(BufWriter::new(tx))));
SSE(rx)
}


Loading…
Cancel
Save