Skip to content

feat(EmailWorkers): Implement email worker functionality #715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions worker-build/src/js/shim.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import wasmModule from "./index.wasm";
import { WorkerEntrypoint } from "cloudflare:workers";

const instance = new WebAssembly.Instance(wasmModule, {
"./index_bg.js": imports,
"./index_bg.js": imports,
});

imports.__wbg_set_wasm(instance.exports);
Expand All @@ -15,39 +15,44 @@ instance.exports.__wbindgen_start?.();
export { wasmModule };

class Entrypoint extends WorkerEntrypoint {
async fetch(request) {
let response = imports.fetch(request, this.env, this.ctx);
$WAIT_UNTIL_RESPONSE;
return await response;
}

async queue(batch) {
return await imports.queue(batch, this.env, this.ctx);
}

async scheduled(event) {
return await imports.scheduled(event, this.env, this.ctx);
}
async fetch(request) {
let response = imports.fetch(request, this.env, this.ctx);
$WAIT_UNTIL_RESPONSE;
return await response;
}

async queue(batch) {
return await imports.queue(batch, this.env, this.ctx);
}

async scheduled(event) {
return await imports.scheduled(event, this.env, this.ctx);
}

async email(message) {
return await imports.email(message, this.env, this.ctx);
}
}

const EXCLUDE_EXPORT = [
"IntoUnderlyingByteSource",
"IntoUnderlyingSink",
"IntoUnderlyingSource",
"MinifyConfig",
"PolishConfig",
"R2Range",
"RequestRedirect",
"fetch",
"queue",
"scheduled",
"getMemory",
"IntoUnderlyingByteSource",
"IntoUnderlyingSink",
"IntoUnderlyingSource",
"MinifyConfig",
"PolishConfig",
"R2Range",
"RequestRedirect",
"fetch",
"queue",
"scheduled",
"email",
"getMemory",
];

Object.keys(imports).map((k) => {
if (!(EXCLUDE_EXPORT.includes(k) | k.startsWith("__"))) {
Entrypoint.prototype[k] = imports[k];
}
if (!(EXCLUDE_EXPORT.includes(k) | k.startsWith("__"))) {
Entrypoint.prototype[k] = imports[k];
}
});

export default Entrypoint;
1 change: 1 addition & 0 deletions worker-build/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ fn bundle(esbuild_path: &Path) -> Result<()> {
let mut command = Command::new(esbuild_path);
command.args([
"--external:./index.wasm",
"--external:cloudflare:email",
"--external:cloudflare:sockets",
"--external:cloudflare:workers",
"--format=esm",
Expand Down
41 changes: 40 additions & 1 deletion worker-macros/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream {
Start,
#[cfg(feature = "queue")]
Queue,
Email,
}
use HandlerType::*;

Expand All @@ -25,14 +26,15 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream {
"start" => handler_type = Some(Start),
#[cfg(feature = "queue")]
"queue" => handler_type = Some(Queue),
"email" => handler_type = Some(Email),
"respond_with_errors" => {
respond_with_errors = true;
}
_ => panic!("Invalid attribute: {}", attr),
}
}
let handler_type = handler_type.expect(
"must have either 'fetch', 'scheduled', 'queue' or 'start' attribute, e.g. #[event(fetch)]",
"must have either 'fetch', 'scheduled', 'queue', 'email', or 'start' attribute, e.g. #[event(fetch)]",
);

// create new var using syn item of the attributed fn
Expand Down Expand Up @@ -215,5 +217,42 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream {

TokenStream::from(output)
}
Email => {
let input_fn_ident = Ident::new(
&(input_fn.sig.ident.to_string() + "_email_glue"),
input_fn.sig.ident.span(),
);
let wrapper_fn_ident = Ident::new("email", input_fn.sig.ident.span());
// rename the original attributed fn
input_fn.sig.ident = input_fn_ident.clone();

let wrapper_fn = quote! {
pub async fn #wrapper_fn_ident(message: ::worker::worker_sys::EmailMessage, env: ::worker::Env, ctx: ::worker::worker_sys::Context) {
// call the original fn
let ctx = worker::Context::new(ctx);
match #input_fn_ident(::worker::EmailMessage { inner: message }, env, ctx).await {
Ok(()) => {},
Err(e) => {
::worker::console_log!("{}", &e);
panic!("{}", e);
}
}
}
};
let wasm_bindgen_code =
wasm_bindgen_macro_support::expand(TokenStream::new().into(), wrapper_fn)
.expect("wasm_bindgen macro failed to expand");

let output = quote! {
#input_fn

mod _worker_email {
use ::worker::{wasm_bindgen, wasm_bindgen_futures};
use super::#input_fn_ident;
#wasm_bindgen_code
}
};
TokenStream::from(output)
}
}
}
2 changes: 2 additions & 0 deletions worker-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod crypto;
mod d1;
mod durable_object;
mod dynamic_dispatcher;
mod email;
mod fetcher;
mod fixed_length_stream;
mod hyperdrive;
Expand All @@ -28,6 +29,7 @@ pub use crypto::*;
pub use d1::*;
pub use durable_object::*;
pub use dynamic_dispatcher::*;
pub use email::*;
pub use fetcher::*;
pub use fixed_length_stream::*;
pub use hyperdrive::*;
Expand Down
48 changes: 48 additions & 0 deletions worker-sys/src/types/email.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use wasm_bindgen::prelude::*;
use web_sys::ReadableStream;

#[wasm_bindgen(module = "cloudflare:email")]
extern "C" {
#[wasm_bindgen(extends=js_sys::Object)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub type EmailMessage;

#[wasm_bindgen(constructor, catch)]
pub fn new(from: &str, to: &str, raw: &str) -> Result<EmailMessage, JsValue>;

#[wasm_bindgen(constructor, catch)]
pub fn new_from_stream(
from: &str,
to: &str,
raw: &ReadableStream,
) -> Result<EmailMessage, JsValue>;

#[wasm_bindgen(method, catch, getter)]
pub fn from(this: &EmailMessage) -> Result<js_sys::JsString, JsValue>;

#[wasm_bindgen(method, catch, getter)]
pub fn to(this: &EmailMessage) -> Result<js_sys::JsString, JsValue>;

#[wasm_bindgen(method, catch, getter)]
pub fn headers(this: &EmailMessage) -> Result<web_sys::Headers, JsValue>;

#[wasm_bindgen(method, catch, getter)]
pub fn raw(this: &EmailMessage) -> Result<web_sys::ReadableStream, JsValue>;

#[wasm_bindgen(method, catch, getter, js_name=rawSize)]
pub fn raw_size(this: &EmailMessage) -> Result<js_sys::Number, JsValue>;

#[wasm_bindgen(method, catch, js_name=setReject)]
pub fn set_reject(this: &EmailMessage, reason: js_sys::JsString) -> Result<(), JsValue>;

#[wasm_bindgen(method, catch)]
pub fn forward(
this: &EmailMessage,
recipient: js_sys::JsString,
headers: Option<web_sys::Headers>,
) -> Result<js_sys::Promise, JsValue>;

#[wasm_bindgen(method, catch)]
pub fn reply(this: &EmailMessage, message: EmailMessage) -> Result<js_sys::Promise, JsValue>;

}
85 changes: 85 additions & 0 deletions worker/src/email.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use futures_util::TryStreamExt;
use wasm_bindgen_futures::JsFuture;
use web_sys::ReadableStream;
use worker_sys::EmailMessage as EmailMessageSys;

use crate::{send::SendFuture, ByteStream, Headers, Result};

pub struct EmailMessage {
pub inner: EmailMessageSys,
}

impl EmailMessage {
/// construct a new email message
pub fn new(from: &str, to: &str, raw: &str) -> Result<Self> {
Ok(EmailMessage {
inner: EmailMessageSys::new(from, to, raw)?,
})
}

/// construct a new email message for a ReadableStream
pub fn new_from_stream(from: &str, to: &str, raw: &ReadableStream) -> Result<Self> {
Ok(EmailMessage {
inner: EmailMessageSys::new_from_stream(from, to, raw)?,
})
}

/// the from field of the email message
pub fn from_email(&self) -> String {
self.inner.from().unwrap().into()
}

/// the to field of the email message
pub fn to_email(&self) -> String {
self.inner.to().unwrap().into()
}

/// the headers field of the email message
pub fn headers(&self) -> Headers {
Headers(self.inner.headers().unwrap())
}

/// the raw email message
pub fn raw(&self) -> Result<ByteStream> {
self.inner.raw().map_err(Into::into).map(|rs| ByteStream {
inner: wasm_streams::ReadableStream::from_raw(rs).into_stream(),
})
}

pub async fn raw_bytes(&self) -> Result<Vec<u8>> {
self.raw()?
.try_fold(Vec::new(), |mut bytes, mut chunk| async move {
bytes.append(&mut chunk);
Ok(bytes)
})
.await
}

/// the raw size of the message
pub fn raw_size(&self) -> f64 {
self.inner.raw_size().unwrap().into()
}

/// reject message with reason
pub fn reject(&self, reason: String) {
self.inner.set_reject(reason.into()).unwrap()
}

/// forward message to recipient
pub async fn forward(&self, recipient: String, headers: Option<Headers>) -> Result<()> {
let promise = self.inner.forward(recipient.into(), headers.map(|h| h.0))?;

let fut = SendFuture::new(JsFuture::from(promise));
fut.await?;
Ok(())
}

/// reply with email message to recipient
pub async fn reply(&self, message: EmailMessage) -> Result<()> {
let promise = self.inner.reply(message.inner)?;

let fut = SendFuture::new(JsFuture::from(promise));
fut.await?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub use crate::date::{Date, DateInit};
pub use crate::delay::Delay;
pub use crate::durable::*;
pub use crate::dynamic_dispatch::*;
pub use crate::email::*;
pub use crate::env::{Env, EnvBinding, Secret, Var};
pub use crate::error::Error;
pub use crate::fetcher::Fetcher;
Expand Down Expand Up @@ -212,6 +213,7 @@ mod date;
mod delay;
pub mod durable;
mod dynamic_dispatch;
mod email;
mod env;
mod error;
mod fetcher;
Expand Down