Skip to content
RAprogramm edited this page Jan 7, 2026 · 2 revisions

使用 Postgres LISTEN/NOTIFY 实时订阅实体变更。流支持实时仪表板、即时通知、缓存失效和事件驱动架构。

快速开始

#[derive(Entity, Serialize, Deserialize)]
#[entity(table = "orders", events, streams)]
pub struct Order {
    #[id]
    pub id: Uuid,

    #[field(create, update, response)]
    pub status: String,

    #[field(create, response)]
    pub customer_id: Uuid,
}

要求:

  • 实体必须派生 SerializeDeserialize(用于 JSON 载荷)
  • 需要同时使用 eventsstreams 属性
  • 在 Cargo.toml 中启用 streams 特性
[dependencies]
entity-derive = { version = "0.3", features = ["postgres", "streams"] }
serde = { version = "1", features = ["derive"] }

生成的代码

streams 属性生成:

频道常量

impl Order {
    /// Postgres NOTIFY 频道名称。
    pub const CHANNEL: &'static str = "entity_orders";
}

订阅者结构体

/// Order 实时变更订阅者。
pub struct OrderSubscriber {
    listener: PgListener,
}

impl OrderSubscriber {
    /// 连接并订阅频道。
    pub async fn new(pool: &PgPool) -> Result<Self, sqlx::Error>;

    /// 等待下一个事件(阻塞)。
    pub async fn recv(&mut self) -> Result<OrderEvent, StreamError<sqlx::Error>>;

    /// 非阻塞检查事件。
    pub async fn try_recv(&mut self) -> Result<Option<OrderEvent>, StreamError<sqlx::Error>>;
}

自动通知

CRUD 操作自动发出事件:

// 在生成的 create() 方法中:
async fn create(&self, dto: CreateOrderRequest) -> Result<Order, Self::Error> {
    let order = /* insert */;

    // 自动生成的通知
    let event = OrderEvent::created(order.clone());
    let payload = serde_json::to_string(&event)?;
    sqlx::query("SELECT pg_notify($1, $2)")
        .bind(Order::CHANNEL)
        .bind(&payload)
        .execute(self)
        .await?;

    Ok(order)
}

使用示例

基本订阅

use entity_derive::StreamError;

async fn watch_orders(pool: &PgPool) -> Result<(), Box<dyn std::error::Error>> {
    let mut subscriber = OrderSubscriber::new(pool).await?;

    loop {
        match subscriber.recv().await {
            Ok(event) => {
                match event {
                    OrderEvent::Created(order) => {
                        println!("新订单: {}", order.id);
                    }
                    OrderEvent::Updated { old, new } => {
                        println!("订单 {} 已更新: {} -> {}", new.id, old.status, new.status);
                    }
                    OrderEvent::HardDeleted { id } => {
                        println!("订单 {} 已删除", id);
                    }
                    _ => {}
                }
            }
            Err(StreamError::Database(e)) => {
                eprintln!("数据库错误: {}", e);
                break;
            }
            Err(StreamError::Deserialize(e)) => {
                eprintln!("无效的事件载荷: {}", e);
            }
        }
    }

    Ok(())
}

实时仪表板(Axum WebSocket)

use axum::{
    extract::{State, WebSocketUpgrade, ws::{Message, WebSocket}},
    response::IntoResponse,
};

async fn ws_handler(
    ws: WebSocketUpgrade,
    State(pool): State<PgPool>,
) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, pool))
}

async fn handle_socket(mut socket: WebSocket, pool: PgPool) {
    let mut subscriber = match OrderSubscriber::new(&pool).await {
        Ok(s) => s,
        Err(_) => return,
    };

    loop {
        match subscriber.recv().await {
            Ok(event) => {
                let json = serde_json::to_string(&event).unwrap();
                if socket.send(Message::Text(json)).await.is_err() {
                    break;
                }
            }
            Err(_) => break,
        }
    }
}

缓存失效

struct CacheInvalidator {
    cache: Redis,
    pool: PgPool,
}

impl CacheInvalidator {
    async fn run(&self) -> Result<(), StreamError<sqlx::Error>> {
        let mut subscriber = OrderSubscriber::new(&self.pool).await
            .map_err(StreamError::Database)?;

        loop {
            let event = subscriber.recv().await?;
            let key = format!("order:{}", event.entity_id());

            match event {
                OrderEvent::Created(_) | OrderEvent::Updated { .. } => {
                    self.cache.del(&key).await.ok();
                }
                OrderEvent::HardDeleted { id } | OrderEvent::SoftDeleted { id } => {
                    self.cache.del(&format!("order:{}", id)).await.ok();
                }
                _ => {}
            }
        }
    }
}

带优雅关闭的后台工作器

use tokio::sync::watch;

async fn notification_worker(
    pool: PgPool,
    mut shutdown: watch::Receiver<bool>,
) {
    let mut subscriber = OrderSubscriber::new(&pool).await.unwrap();

    loop {
        tokio::select! {
            result = subscriber.recv() => {
                match result {
                    Ok(event) => process_event(event).await,
                    Err(e) => {
                        eprintln!("流错误: {:?}", e);
                        tokio::time::sleep(Duration::from_secs(1)).await;
                    }
                }
            }
            _ = shutdown.changed() => {
                println!("正在关闭通知工作器");
                break;
            }
        }
    }
}

错误处理

use entity_derive::StreamError;

match subscriber.recv().await {
    Ok(event) => { /* 处理 */ }
    Err(StreamError::Database(sqlx_error)) => {
        // 连接丢失、查询失败等
        // 订阅者将在下次 recv() 时自动重连
    }
    Err(StreamError::Deserialize(message)) => {
        // 无效的 JSON 载荷
        // 记录日志并继续 - 不要中断循环
    }
}

架构

CRUD 操作 (create/update/delete)
         │
         ▼
    pg_notify(channel, event_json)
         │
         ▼
    Postgres NOTIFY
         │
    ┌────┴────┐
    ▼         ▼
订阅者     订阅者  (多个监听器)
    │         │
    ▼         ▼
 WebSocket   缓存
 仪表板     失效器

最佳实践

  1. 重连 — PgListener 自动重连;设计循环以处理临时故障
  2. 幂等性 — 事件可能多次传递;处理程序应该是幂等的
  3. 载荷大小 — 保持实体小巧;大载荷可能超出 Postgres 限制
  4. 独立连接池 — 为监听器使用专用连接池
  5. 监控 — 记录流错误并跟踪事件处理延迟
  6. 优雅关闭 — 使用 select! 配合关闭信号来清理资源

配合软删除

启用 soft_delete 时,可使用额外事件:

#[derive(Entity, Serialize, Deserialize)]
#[entity(table = "documents", events, streams, soft_delete)]
pub struct Document {
    #[id]
    pub id: Uuid,

    #[field(create, response)]
    pub title: String,

    #[field(skip)]
    pub deleted_at: Option<DateTime<Utc>>,
}

// 事件包括:
// - DocumentEvent::SoftDeleted { id }
// - DocumentEvent::Restored { id }
// - DocumentEvent::HardDeleted { id }

另请参阅

  • 事件 — 无实时流的事件枚举
  • 钩子 — 在生命周期事件上执行自定义逻辑
  • 最佳实践 — 生产环境技巧

Clone this wiki locally