Skip to content

Commit e897a33

Browse files
SuperFluffyerwanor
andauthored
tower-abci: add support for unix domain sockets (#35)
* feat: support unix domain sockets * don't debug print option, only contents of option * tower-abci(v037): refactor around `Server::run` * tower-abci(v034): backport uds support * tower-abci: remove redundant shadowing of stream/sink * cargo: bump version to `0.10.0` --------- Co-authored-by: Erwan <[email protected]>
1 parent d30102c commit e897a33

File tree

5 files changed

+126
-55
lines changed

5 files changed

+126
-55
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "tower-abci"
3-
version = "0.9.0"
3+
version = "0.10.0"
44
authors = ["Henry de Valence <[email protected]>"]
55
edition = "2021"
66
license = "MIT"

examples/kvstore_34/main.rs

+19-15
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@ use std::{
1010
use bytes::Bytes;
1111
use futures::future::FutureExt;
1212
use structopt::StructOpt;
13-
use tower::{Service, ServiceBuilder};
14-
1513
use tendermint::abci::{Event, EventAttributeIndexExt};
16-
17-
use tendermint::v0_34::abci::response;
18-
use tendermint::v0_34::abci::{Request, Response};
14+
use tendermint::v0_34::abci::{response, Request, Response};
15+
use tower::{Service, ServiceBuilder};
1916

2017
use tower_abci::{
2118
v034::{split, Server},
@@ -141,6 +138,10 @@ struct Opt {
141138
/// Bind the TCP server to this port.
142139
#[structopt(short, long, default_value = "26658")]
143140
port: u16,
141+
142+
/// Bind the UDS server to this path
143+
#[structopt(long)]
144+
uds: Option<String>,
144145
}
145146

146147
#[tokio::main]
@@ -157,7 +158,7 @@ async fn main() {
157158
// Hand those components to the ABCI server, but customize request behavior
158159
// for each category -- for instance, apply load-shedding only to mempool
159160
// and info requests, but not to consensus requests.
160-
let server = Server::builder()
161+
let server_builder = Server::builder()
161162
.consensus(consensus)
162163
.snapshot(snapshot)
163164
.mempool(
@@ -172,13 +173,16 @@ async fn main() {
172173
.buffer(100)
173174
.rate_limit(50, std::time::Duration::from_secs(1))
174175
.service(info),
175-
)
176-
.finish()
177-
.unwrap();
178-
179-
// Run the ABCI server.
180-
server
181-
.listen(format!("{}:{}", opt.host, opt.port))
182-
.await
183-
.unwrap();
176+
);
177+
178+
let server = server_builder.finish().unwrap();
179+
180+
if let Some(uds_path) = opt.uds {
181+
server.listen_unix(uds_path).await.unwrap();
182+
} else {
183+
server
184+
.listen_tcp(format!("{}:{}", opt.host, opt.port))
185+
.await
186+
.unwrap();
187+
}
184188
}

examples/kvstore_37/main.rs

+17-10
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ struct Opt {
145145
/// Bind the TCP server to this port.
146146
#[structopt(short, long, default_value = "26658")]
147147
port: u16,
148+
149+
/// Bind the UDS server to this path
150+
#[structopt(long)]
151+
uds: Option<String>,
148152
}
149153

150154
#[tokio::main]
@@ -161,7 +165,7 @@ async fn main() {
161165
// Hand those components to the ABCI server, but customize request behavior
162166
// for each category -- for instance, apply load-shedding only to mempool
163167
// and info requests, but not to consensus requests.
164-
let server = Server::builder()
168+
let server_builder = Server::builder()
165169
.consensus(consensus)
166170
.snapshot(snapshot)
167171
.mempool(
@@ -176,13 +180,16 @@ async fn main() {
176180
.buffer(100)
177181
.rate_limit(50, std::time::Duration::from_secs(1))
178182
.service(info),
179-
)
180-
.finish()
181-
.unwrap();
182-
183-
// Run the ABCI server.
184-
server
185-
.listen(format!("{}:{}", opt.host, opt.port))
186-
.await
187-
.unwrap();
183+
);
184+
185+
let server = server_builder.finish().unwrap();
186+
187+
if let Some(uds_path) = opt.uds {
188+
server.listen_unix(uds_path).await.unwrap();
189+
} else {
190+
server
191+
.listen_tcp(format!("{}:{}", opt.host, opt.port))
192+
.await
193+
.unwrap();
194+
}
188195
}

src/v034/server.rs

+45-14
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
use std::convert::{TryFrom, TryInto};
2+
use std::path::Path;
23

34
use futures::future::{FutureExt, TryFutureExt};
45
use futures::sink::SinkExt;
56
use futures::stream::{FuturesOrdered, StreamExt};
7+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
68
use tokio::{
7-
net::{TcpListener, TcpStream, ToSocketAddrs},
9+
net::{TcpListener, ToSocketAddrs, UnixListener},
810
select,
911
};
1012
use tokio_util::codec::{FramedRead, FramedWrite};
1113
use tower::{Service, ServiceExt};
1214

15+
use crate::BoxError;
1316
use tendermint::abci::MethodKind;
17+
1418
use tendermint::v0_34::abci::{
1519
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
1620
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
1721
};
1822

19-
use crate::BoxError;
2023
/// An ABCI server which listens for connections and forwards requests to four
2124
/// component ABCI [`Service`]s.
2225
pub struct Server<C, M, I, S> {
@@ -123,29 +126,54 @@ where
123126
ServerBuilder::default()
124127
}
125128

126-
pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
127-
tracing::info!(?addr, "starting ABCI server");
129+
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
130+
let listener = UnixListener::bind(path)?;
131+
let addr = listener.local_addr()?;
132+
tracing::info!(?addr, "ABCI server starting on uds");
133+
134+
loop {
135+
match listener.accept().await {
136+
Ok((socket, _addr)) => {
137+
tracing::debug!(?_addr, "accepted new connection");
138+
let conn = Connection {
139+
consensus: self.consensus.clone(),
140+
mempool: self.mempool.clone(),
141+
info: self.info.clone(),
142+
snapshot: self.snapshot.clone(),
143+
};
144+
let (read, write) = socket.into_split();
145+
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
146+
}
147+
Err(e) => {
148+
tracing::error!({ %e }, "error accepting new connection");
149+
}
150+
}
151+
}
152+
}
153+
154+
pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
155+
self,
156+
addr: A,
157+
) -> Result<(), BoxError> {
128158
let listener = TcpListener::bind(addr).await?;
129-
let local_addr = listener.local_addr()?;
130-
tracing::info!(?local_addr, "bound tcp listener");
159+
let addr = listener.local_addr()?;
160+
tracing::info!(?addr, "ABCI server starting on tcp socket");
131161

132162
loop {
133163
match listener.accept().await {
134164
Ok((socket, _addr)) => {
135-
// set parent: None for the connection span, as it should
136-
// exist independently of the listener's spans.
137-
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
165+
tracing::debug!(?_addr, "accepted new connection");
138166
let conn = Connection {
139167
consensus: self.consensus.clone(),
140168
mempool: self.mempool.clone(),
141169
info: self.info.clone(),
142170
snapshot: self.snapshot.clone(),
143171
};
144-
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
145-
tokio::spawn(async move { conn.run(socket).await.unwrap() });
172+
let (read, write) = socket.into_split();
173+
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
146174
}
147175
Err(e) => {
148-
tracing::warn!({ %e }, "error accepting new tcp connection");
176+
tracing::error!({ %e }, "error accepting new connection");
149177
}
150178
}
151179
}
@@ -172,14 +200,17 @@ where
172200
{
173201
// XXX handle errors gracefully
174202
// figure out how / if to return errors to tendermint
175-
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
203+
async fn run(
204+
mut self,
205+
read: impl AsyncReadExt + std::marker::Unpin,
206+
write: impl AsyncWriteExt + std::marker::Unpin,
207+
) -> Result<(), BoxError> {
176208
tracing::info!("listening for requests");
177209

178210
use tendermint_proto::v0_34::abci as pb;
179211

180212
let (mut request_stream, mut response_sink) = {
181213
use crate::v034::codec::{Decode, Encode};
182-
let (read, write) = socket.split();
183214
(
184215
FramedRead::new(read, Decode::<pb::Request>::default()),
185216
FramedWrite::new(write, Encode::<pb::Response>::default()),

src/v037/server.rs

+44-15
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use std::convert::{TryFrom, TryInto};
2+
use std::path::Path;
23

34
use futures::future::{FutureExt, TryFutureExt};
45
use futures::sink::SinkExt;
56
use futures::stream::{FuturesOrdered, StreamExt};
7+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
68
use tokio::{
7-
net::{TcpListener, TcpStream, ToSocketAddrs},
9+
net::{TcpListener, ToSocketAddrs, UnixListener},
810
select,
911
};
1012
use tokio_util::codec::{FramedRead, FramedWrite};
1113
use tower::{Service, ServiceExt};
1214

13-
use tendermint::abci::MethodKind;
14-
1515
use crate::BoxError;
16+
use tendermint::abci::MethodKind;
1617

1718
use tendermint::v0_37::abci::{
1819
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
@@ -125,29 +126,54 @@ where
125126
ServerBuilder::default()
126127
}
127128

128-
pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
129-
tracing::info!(?addr, "starting ABCI server");
129+
pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
130+
let listener = UnixListener::bind(path)?;
131+
let addr = listener.local_addr()?;
132+
tracing::info!(?addr, "ABCI server starting on uds");
133+
134+
loop {
135+
match listener.accept().await {
136+
Ok((socket, _addr)) => {
137+
tracing::debug!(?_addr, "accepted new connection");
138+
let conn = Connection {
139+
consensus: self.consensus.clone(),
140+
mempool: self.mempool.clone(),
141+
info: self.info.clone(),
142+
snapshot: self.snapshot.clone(),
143+
};
144+
let (read, write) = socket.into_split();
145+
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
146+
}
147+
Err(e) => {
148+
tracing::error!({ %e }, "error accepting new connection");
149+
}
150+
}
151+
}
152+
}
153+
154+
pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
155+
self,
156+
addr: A,
157+
) -> Result<(), BoxError> {
130158
let listener = TcpListener::bind(addr).await?;
131-
let local_addr = listener.local_addr()?;
132-
tracing::info!(?local_addr, "bound tcp listener");
159+
let addr = listener.local_addr()?;
160+
tracing::info!(?addr, "ABCI server starting on tcp socket");
133161

134162
loop {
135163
match listener.accept().await {
136164
Ok((socket, _addr)) => {
137-
// set parent: None for the connection span, as it should
138-
// exist independently of the listener's spans.
139-
//let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
165+
tracing::debug!(?_addr, "accepted new connection");
140166
let conn = Connection {
141167
consensus: self.consensus.clone(),
142168
mempool: self.mempool.clone(),
143169
info: self.info.clone(),
144170
snapshot: self.snapshot.clone(),
145171
};
146-
//tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
147-
tokio::spawn(async move { conn.run(socket).await.unwrap() });
172+
let (read, write) = socket.into_split();
173+
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
148174
}
149175
Err(e) => {
150-
tracing::warn!({ %e }, "error accepting new tcp connection");
176+
tracing::error!({ %e }, "error accepting new connection");
151177
}
152178
}
153179
}
@@ -174,14 +200,17 @@ where
174200
{
175201
// XXX handle errors gracefully
176202
// figure out how / if to return errors to tendermint
177-
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
203+
async fn run(
204+
mut self,
205+
read: impl AsyncReadExt + std::marker::Unpin,
206+
write: impl AsyncWriteExt + std::marker::Unpin,
207+
) -> Result<(), BoxError> {
178208
tracing::info!("listening for requests");
179209

180210
use tendermint_proto::v0_37::abci as pb;
181211

182212
let (mut request_stream, mut response_sink) = {
183213
use crate::v037::codec::{Decode, Encode};
184-
let (read, write) = socket.split();
185214
(
186215
FramedRead::new(read, Decode::<pb::Request>::default()),
187216
FramedWrite::new(write, Encode::<pb::Response>::default()),

0 commit comments

Comments
 (0)