Skip to content

Commit

Permalink
feat(redis): use ClusterClient for redis cluster adapter creation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore authored Jan 28, 2025
1 parent 353bb80 commit 241e89b
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 37 deletions.
2 changes: 1 addition & 1 deletion crates/socketioxide-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fred = { version = "10", features = [
"subscriber-client",
"i-pubsub",
], default-features = false, optional = true }
redis = { version = "0.28.1", features = [
redis = { version = "0.28.2", features = [
"aio",
"tokio-comp",
"streams",
Expand Down
18 changes: 6 additions & 12 deletions crates/socketioxide-redis/src/drivers/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,14 @@ impl RedisDriver {
impl ClusterDriver {
/// Create a new redis driver from a redis cluster client.
#[cfg_attr(docsrs, doc(cfg(feature = "redis-cluster")))]
pub async fn new(
client_builder: redis::cluster::ClusterClientBuilder,
) -> Result<Self, redis::RedisError> {
pub async fn new(client: &redis::cluster::ClusterClient) -> Result<Self, redis::RedisError> {
let handlers = Arc::new(RwLock::new(HashMap::new()));
let handlers_clone = handlers.clone();
let conn = client_builder
.push_sender(move |msg| {
handle_msg(msg, handlers_clone.clone());
Ok::<(), std::convert::Infallible>(())
})
.build()
.unwrap()
.get_async_connection()
.await?;
let config = redis::cluster::ClusterConfig::new().set_push_sender(move |msg| {
handle_msg(msg, handlers_clone.clone());
Ok::<(), std::convert::Infallible>(())
});
let conn = client.get_async_connection_with_config(config).await?;

Ok(Self { conn, handlers })
}
Expand Down
14 changes: 6 additions & 8 deletions crates/socketioxide-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@
//! async fn on_event<A: Adapter>(socket: SocketRef<A>, Data(data): Data<String>) {}
//!
//! // single node cluster
//! let builder = redis::cluster::ClusterClient::builder(std::iter::once(
//! "redis://127.0.0.1:6379?protocol=resp3",
//! ));
//! let adapter = RedisAdapterCtr::new_with_cluster(builder).await?;
//! let client = redis::cluster::ClusterClient::new(["redis://127.0.0.1:6379?protocol=resp3"])?;
//! let adapter = RedisAdapterCtr::new_with_cluster(&client).await?;
//!
//! let (layer, io) = SocketIo::builder()
//! .with_adapter::<ClusterAdapter<_>>(adapter)
Expand Down Expand Up @@ -334,18 +332,18 @@ impl RedisAdapterCtr<drivers::redis::ClusterDriver> {
/// Create a new adapter constructor with the [`redis`] driver and a default config.
#[cfg_attr(docsrs, doc(cfg(feature = "redis-cluster")))]
pub async fn new_with_cluster(
builder: redis::cluster::ClusterClientBuilder,
client: &redis::cluster::ClusterClient,
) -> redis::RedisResult<Self> {
Self::new_with_cluster_config(builder, RedisAdapterConfig::default()).await
Self::new_with_cluster_config(client, RedisAdapterConfig::default()).await
}

/// Create a new adapter constructor with the [`redis`] driver and a default config.
#[cfg_attr(docsrs, doc(cfg(feature = "redis-cluster")))]
pub async fn new_with_cluster_config(
builder: redis::cluster::ClusterClientBuilder,
client: &redis::cluster::ClusterClient,
config: RedisAdapterConfig,
) -> redis::RedisResult<Self> {
let driver = drivers::redis::ClusterDriver::new(builder).await?;
let driver = drivers::redis::ClusterDriver::new(client).await?;
Ok(Self::new_with_driver(driver, config))
}
}
Expand Down
6 changes: 3 additions & 3 deletions e2e/adapter/src/bins/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.finish();

tracing::subscriber::set_global_default(subscriber)?;
let builder = redis::cluster::ClusterClient::builder([
let client = redis::cluster::ClusterClient::new([
"redis://127.0.0.1:7000?protocol=resp3",
"redis://127.0.0.1:7001?protocol=resp3",
"redis://127.0.0.1:7002?protocol=resp3",
"redis://127.0.0.1:7003?protocol=resp3",
"redis://127.0.0.1:7004?protocol=resp3",
"redis://127.0.0.1:7005?protocol=resp3",
]);
let adapter = RedisAdapterCtr::new_with_cluster(builder).await?;
])?;
let adapter = RedisAdapterCtr::new_with_cluster(&client).await?;
#[allow(unused_mut)]
let mut builder =
SocketIo::builder().with_adapter::<socketioxide_redis::ClusterAdapter<_>>(adapter);
Expand Down
13 changes: 4 additions & 9 deletions examples/chat/src/redis/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

info!("Starting server");

// We should not have to create two builders. This is a limitation of the current redis API.
// https://github.com/redis-rs/redis-rs/issues/1486
const CLUSTER: [&str; 6] = [
let client = redis::cluster::ClusterClient::new([
"redis://127.0.0.1:7000?protocol=resp3",
"redis://127.0.0.1:7001?protocol=resp3",
"redis://127.0.0.1:7002?protocol=resp3",
"redis://127.0.0.1:7003?protocol=resp3",
"redis://127.0.0.1:7004?protocol=resp3",
"redis://127.0.0.1:7005?protocol=resp3",
];
])?;

let client = redis::cluster::ClusterClient::builder(CLUSTER);

let adapter = RedisAdapterCtr::new_with_cluster(client).await?;
let client = redis::cluster::ClusterClient::builder(CLUSTER);
let conn = client.build().unwrap().get_async_connection().await?;
let adapter = RedisAdapterCtr::new_with_cluster(&client).await?;
let conn = client.get_async_connection().await?;

let (layer, io) = SocketIo::builder()
.with_state(RemoteUserCnt::new(conn))
Expand Down
6 changes: 2 additions & 4 deletions examples/redis-whiteboard/src/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

info!("connecting to redis");
// single node cluster. In a real world scenario, you would have multiple nodes.
let builder = redis::cluster::ClusterClient::builder(std::iter::once(
"redis://127.0.0.1:6379?protocol=resp3",
));
let adapter = RedisAdapterCtr::new_with_cluster(builder).await?;
let client = redis::cluster::ClusterClient::new(["redis://127.0.0.1:6379?protocol=resp3"])?;
let adapter = RedisAdapterCtr::new_with_cluster(&client).await?;
info!("starting server");

let (layer, io) = SocketIo::builder()
Expand Down

0 comments on commit 241e89b

Please sign in to comment.