|
1 | 1 | // Copyright 2018-2025 the Deno authors. MIT license. |
2 | | - |
3 | | -mod in_memory_broadcast_channel; |
4 | | - |
5 | | -use std::cell::RefCell; |
6 | | -use std::rc::Rc; |
7 | | -use std::sync::Arc; |
8 | | - |
9 | | -use async_trait::async_trait; |
10 | | -use deno_core::JsBuffer; |
11 | | -use deno_core::OpState; |
12 | | -use deno_core::Resource; |
13 | | -use deno_core::ResourceId; |
14 | | -use deno_core::op2; |
15 | | -use deno_error::JsErrorBox; |
16 | | -use deno_features::FeatureChecker; |
17 | | -pub use in_memory_broadcast_channel::InMemoryBroadcastChannel; |
18 | | -pub use in_memory_broadcast_channel::InMemoryBroadcastChannelResource; |
19 | | -use tokio::sync::broadcast::error::SendError as BroadcastSendError; |
20 | | -use tokio::sync::mpsc::error::SendError as MpscSendError; |
21 | | - |
22 | | -pub const UNSTABLE_FEATURE_NAME: &str = "broadcast-channel"; |
23 | | - |
24 | | -#[derive(Debug, thiserror::Error, deno_error::JsError)] |
25 | | -pub enum BroadcastChannelError { |
26 | | - #[class(inherit)] |
27 | | - #[error(transparent)] |
28 | | - Resource( |
29 | | - #[from] |
30 | | - #[inherit] |
31 | | - deno_core::error::ResourceError, |
32 | | - ), |
33 | | - #[class(generic)] |
34 | | - #[error(transparent)] |
35 | | - MPSCSendError(MpscSendError<Box<dyn std::fmt::Debug + Send + Sync>>), |
36 | | - #[class(generic)] |
37 | | - #[error(transparent)] |
38 | | - BroadcastSendError( |
39 | | - BroadcastSendError<Box<dyn std::fmt::Debug + Send + Sync>>, |
40 | | - ), |
41 | | - #[class(inherit)] |
42 | | - #[error(transparent)] |
43 | | - Other(#[inherit] JsErrorBox), |
44 | | -} |
45 | | - |
46 | | -impl<T: std::fmt::Debug + Send + Sync + 'static> From<MpscSendError<T>> |
47 | | - for BroadcastChannelError |
48 | | -{ |
49 | | - fn from(value: MpscSendError<T>) -> Self { |
50 | | - BroadcastChannelError::MPSCSendError(MpscSendError(Box::new(value.0))) |
51 | | - } |
52 | | -} |
53 | | -impl<T: std::fmt::Debug + Send + Sync + 'static> From<BroadcastSendError<T>> |
54 | | - for BroadcastChannelError |
55 | | -{ |
56 | | - fn from(value: BroadcastSendError<T>) -> Self { |
57 | | - BroadcastChannelError::BroadcastSendError(BroadcastSendError(Box::new( |
58 | | - value.0, |
59 | | - ))) |
60 | | - } |
61 | | -} |
62 | | - |
63 | | -#[async_trait] |
64 | | -pub trait BroadcastChannel: Clone { |
65 | | - type Resource: Resource; |
66 | | - |
67 | | - fn subscribe(&self) -> Result<Self::Resource, BroadcastChannelError>; |
68 | | - |
69 | | - fn unsubscribe( |
70 | | - &self, |
71 | | - resource: &Self::Resource, |
72 | | - ) -> Result<(), BroadcastChannelError>; |
73 | | - |
74 | | - async fn send( |
75 | | - &self, |
76 | | - resource: &Self::Resource, |
77 | | - name: String, |
78 | | - data: Vec<u8>, |
79 | | - ) -> Result<(), BroadcastChannelError>; |
80 | | - |
81 | | - async fn recv( |
82 | | - &self, |
83 | | - resource: &Self::Resource, |
84 | | - ) -> Result<Option<Message>, BroadcastChannelError>; |
85 | | -} |
86 | | - |
87 | | -pub type Message = (String, Vec<u8>); |
88 | | - |
89 | | -#[op2(fast)] |
90 | | -#[smi] |
91 | | -pub fn op_broadcast_subscribe<BC>( |
92 | | - state: &mut OpState, |
93 | | -) -> Result<ResourceId, BroadcastChannelError> |
94 | | -where |
95 | | - BC: BroadcastChannel + 'static, |
96 | | -{ |
97 | | - state |
98 | | - .borrow::<Arc<FeatureChecker>>() |
99 | | - .check_or_exit(UNSTABLE_FEATURE_NAME, "BroadcastChannel"); |
100 | | - let bc = state.borrow::<BC>(); |
101 | | - let resource = bc.subscribe()?; |
102 | | - Ok(state.resource_table.add(resource)) |
103 | | -} |
104 | | - |
105 | | -#[op2(fast)] |
106 | | -pub fn op_broadcast_unsubscribe<BC>( |
107 | | - state: &mut OpState, |
108 | | - #[smi] rid: ResourceId, |
109 | | -) -> Result<(), BroadcastChannelError> |
110 | | -where |
111 | | - BC: BroadcastChannel + 'static, |
112 | | -{ |
113 | | - let resource = state.resource_table.get::<BC::Resource>(rid)?; |
114 | | - let bc = state.borrow::<BC>(); |
115 | | - bc.unsubscribe(&resource) |
116 | | -} |
117 | | - |
118 | | -#[op2(async)] |
119 | | -pub async fn op_broadcast_send<BC>( |
120 | | - state: Rc<RefCell<OpState>>, |
121 | | - #[smi] rid: ResourceId, |
122 | | - #[string] name: String, |
123 | | - #[buffer] buf: JsBuffer, |
124 | | -) -> Result<(), BroadcastChannelError> |
125 | | -where |
126 | | - BC: BroadcastChannel + 'static, |
127 | | -{ |
128 | | - let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?; |
129 | | - let bc = state.borrow().borrow::<BC>().clone(); |
130 | | - bc.send(&resource, name, buf.to_vec()).await |
131 | | -} |
132 | | - |
133 | | -#[op2(async)] |
134 | | -#[serde] |
135 | | -pub async fn op_broadcast_recv<BC>( |
136 | | - state: Rc<RefCell<OpState>>, |
137 | | - #[smi] rid: ResourceId, |
138 | | -) -> Result<Option<Message>, BroadcastChannelError> |
139 | | -where |
140 | | - BC: BroadcastChannel + 'static, |
141 | | -{ |
142 | | - let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?; |
143 | | - let bc = state.borrow().borrow::<BC>().clone(); |
144 | | - bc.recv(&resource).await |
145 | | -} |
146 | | - |
147 | | -deno_core::extension!(deno_broadcast_channel, |
148 | | - deps = [ deno_webidl, deno_web ], |
149 | | - parameters = [BC: BroadcastChannel], |
150 | | - ops = [ |
151 | | - op_broadcast_subscribe<BC>, |
152 | | - op_broadcast_unsubscribe<BC>, |
153 | | - op_broadcast_send<BC>, |
154 | | - op_broadcast_recv<BC>, |
155 | | - ], |
156 | | - esm = [ "01_broadcast_channel.js" ], |
157 | | - options = { |
158 | | - bc: BC, |
159 | | - }, |
160 | | - state = |state, options| { |
161 | | - state.put(options.bc); |
162 | | - }, |
163 | | -); |
0 commit comments