Skip to content

Commit e22041b

Browse files
committed
Scalafmt
1 parent 4a00566 commit e22041b

32 files changed

+374
-242
lines changed

io/js/src/main/scala/fs2/io/net/AsyncSocketsProvider.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ private[net] abstract class AsyncSocketsProvider[F[_]](implicit F: Async[F]) {
6262
.registerOneTimeListener[F, js.Error]("error") { error =>
6363
cb(Left(js.JavaScriptException(error)))
6464
} <* F.delay {
65-
to match {
66-
case Left(addr) =>
67-
sock.connect(addr.port.value, addr.host.toString, () => cb(Right(())))
68-
case Right(addr) =>
69-
sock.connect(addr.path, () => cb(Right(())))
70-
}
65+
to match {
66+
case Left(addr) =>
67+
sock.connect(addr.port.value, addr.host.toString, () => cb(Right(())))
68+
case Right(addr) =>
69+
sock.connect(addr.path, () => cb(Right(())))
70+
}
7171
}
7272
}
7373
.toResource
@@ -112,7 +112,7 @@ private[net] abstract class AsyncSocketsProvider[F[_]](implicit F: Async[F]) {
112112
else
113113
server.listen(addr.port.value, addr.host.toString, () => cb(Right(())))
114114
case Right(addr) =>
115-
server.listen(addr.path, () => cb(Right(())))
115+
server.listen(addr.path, () => cb(Right(())))
116116
}
117117
}
118118
}

io/js/src/main/scala/fs2/io/net/IpSocketsProviderPlatform.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ private[net] trait IpSocketsProviderCompanionPlatform { self: IpSocketsProvider.
3434
private def forAsyncAndDns[F[_]: Async: Dns]: IpSocketsProvider[F] =
3535
new AsyncSocketsProvider[F] with IpSocketsProvider[F] {
3636

37-
override def connect(
38-
address: SocketAddress[Host],
39-
options: List[SocketOption]
40-
): Resource[F, Socket[F]] =
41-
Resource.eval(address.host.resolve[F]).flatMap { ip =>
42-
connectIpOrUnix(Left(SocketAddress(ip, address.port)), options)
43-
}
37+
override def connect(
38+
address: SocketAddress[Host],
39+
options: List[SocketOption]
40+
): Resource[F, Socket[F]] =
41+
Resource.eval(address.host.resolve[F]).flatMap { ip =>
42+
connectIpOrUnix(Left(SocketAddress(ip, address.port)), options)
43+
}
4444

45-
override def bind(
46-
address: SocketAddress[Host],
47-
options: List[SocketOption]
48-
): Resource[F, ServerSocket[F]] =
49-
Resource.eval(address.host.resolve[F]).flatMap { ip =>
50-
bindIpOrUnix(Left(SocketAddress(ip, address.port)), options)
51-
}
52-
}
45+
override def bind(
46+
address: SocketAddress[Host],
47+
options: List[SocketOption]
48+
): Resource[F, ServerSocket[F]] =
49+
Resource.eval(address.host.resolve[F]).flatMap { ip =>
50+
bindIpOrUnix(Left(SocketAddress(ip, address.port)), options)
51+
}
52+
}
5353
}

io/js/src/main/scala/fs2/io/net/NetworkPlatform.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,18 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
3939

4040
// TODO pull up
4141
import cats.ApplicativeThrow
42-
private def matchAddress[F[_]: ApplicativeThrow, A](address: GenSocketAddress, ifIp: SocketAddress[Host] => F[A], ifUnix: UnixSocketAddress => F[A]): F[A] =
42+
private def matchAddress[F[_]: ApplicativeThrow, A](
43+
address: GenSocketAddress,
44+
ifIp: SocketAddress[Host] => F[A],
45+
ifUnix: UnixSocketAddress => F[A]
46+
): F[A] =
4347
address match {
4448
case sa: SocketAddress[Host] => ifIp(sa)
45-
case ua: UnixSocketAddress => ifUnix(ua)
46-
case other => ApplicativeThrow[F].raiseError(new UnsupportedOperationException(s"Unsupported address type: $other"))
49+
case ua: UnixSocketAddress => ifUnix(ua)
50+
case other =>
51+
ApplicativeThrow[F].raiseError(
52+
new UnsupportedOperationException(s"Unsupported address type: $other")
53+
)
4754
}
4855

4956
def forAsync[F[_]](implicit F: Async[F]): Network[F] =
@@ -54,20 +61,24 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
5461
private lazy val datagramSocketGroup = DatagramSocketGroup.forAsync[F]
5562

5663
override def connect(
57-
address: GenSocketAddress,
58-
options: List[SocketOption]
64+
address: GenSocketAddress,
65+
options: List[SocketOption]
5966
): Resource[F, Socket[F]] =
60-
matchAddress(address,
67+
matchAddress(
68+
address,
6169
sa => ipSockets.connect(sa, options),
62-
ua => unixSockets.connect(ua, options))
70+
ua => unixSockets.connect(ua, options)
71+
)
6372

6473
override def bind(
65-
address: GenSocketAddress,
66-
options: List[SocketOption]
74+
address: GenSocketAddress,
75+
options: List[SocketOption]
6776
): Resource[F, ServerSocket[F]] =
68-
matchAddress(address,
77+
matchAddress(
78+
address,
6979
sa => ipSockets.bind(sa, options),
70-
ua => unixSockets.bind(ua, options))
80+
ua => unixSockets.bind(ua, options)
81+
)
7182

7283
override def openDatagramSocket(
7384
address: Option[Host],

io/js/src/main/scala/fs2/io/net/SocketInfoPlatform.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ private[net] trait SocketInfoCompanionPlatform {
3636
}
3737
}
3838

39-
4039
private[net] trait AsyncSocketInfo[F[_]] extends SocketInfo[F] {
4140

4241
implicit protected def asyncInstance: Async[F]
@@ -45,7 +44,7 @@ private[net] trait SocketInfoCompanionPlatform {
4544

4645
override def localAddressGen: F[GenSocketAddress] = ???
4746

48-
override def supportedOptions: F[Set[SocketOption.Key[_]]] = ???
47+
override def supportedOptions: F[Set[SocketOption.Key[?]]] = ???
4948

5049
override def getOption[A](key: SocketOption.Key[A]): F[Option[A]] =
5150
key.get(sock)

io/js/src/main/scala/fs2/io/net/SocketOptionPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
8686
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[Boolean]] =
8787
Sync[F].pure(None)
8888
}
89-
89+
9090
object UnixServerSocketDeleteOnClose extends Key[Boolean] {
9191
override private[net] def set[F[_]: Sync](
9292
sock: facade.net.Socket,

io/js/src/main/scala/fs2/io/net/SocketPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[net] trait SocketCompanionPlatform {
100100
override def remoteAddressGen: F[GenSocketAddress] =
101101
???
102102

103-
override def supportedOptions: F[Set[SocketOption.Key[_]]] =
103+
override def supportedOptions: F[Set[SocketOption.Key[?]]] =
104104
???
105105

106106
override def getOption[A](key: SocketOption.Key[A]): F[Option[A]] =

io/js/src/main/scala/fs2/io/net/UnixSocketsProviderPlatform.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ private[net] trait UnixSocketsProviderCompanionPlatform {
3636
private def forAsyncAndFiles[F[_]: Async: Files]: UnixSocketsProvider[F] =
3737
new AsyncSocketsProvider[F] with UnixSocketsProvider[F] {
3838

39-
override def connect(address: UnixSocketAddress, options: List[SocketOption]): Resource[F, Socket[F]] =
39+
override def connect(
40+
address: UnixSocketAddress,
41+
options: List[SocketOption]
42+
): Resource[F, Socket[F]] =
4043
connectIpOrUnix(Right(address), options)
4144

4245
override def bind(
@@ -59,7 +62,9 @@ private[net] trait UnixSocketsProviderCompanionPlatform {
5962

6063
val delete = Resource.make(
6164
if (deleteIfExists) Files[F].deleteIfExists(Path(address.path)).void else Async[F].unit
62-
)(_ => if (deleteOnClose) Files[F].deleteIfExists(Path(address.path)).void else Async[F].unit)
65+
)(_ =>
66+
if (deleteOnClose) Files[F].deleteIfExists(Path(address.path)).void else Async[F].unit
67+
)
6368

6469
delete *> bindIpOrUnix(Right(address), filteredOptions)
6570
}

io/jvm-native/src/main/scala/fs2/io/net/AsynchronousChannelGroupIpSocketsProvider.scala

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ package fs2
2323
package io
2424
package net
2525

26-
2726
import java.net.InetSocketAddress
2827
import java.nio.channels.{
2928
AsynchronousCloseException,
@@ -38,14 +37,15 @@ import cats.effect.{Async, Resource}
3837
import com.comcast.ip4s.{Dns, Host, SocketAddress}
3938

4039
private[net] class AsynchronousChannelGroupIpSocketsProvider[F[_]] private (
41-
channelGroup: AsynchronousChannelGroup
42-
)(implicit F: Async[F], F2: Dns[F]) extends IpSocketsProvider[F] {
40+
channelGroup: AsynchronousChannelGroup
41+
)(implicit F: Async[F], F2: Dns[F])
42+
extends IpSocketsProvider[F] {
4343

4444
override def connect(
45-
address: SocketAddress[Host],
46-
options: List[SocketOption]
45+
address: SocketAddress[Host],
46+
options: List[SocketOption]
4747
): Resource[F, Socket[F]] = {
48-
48+
4949
def setup: Resource[F, AsynchronousSocketChannel] =
5050
Resource
5151
.make(
@@ -59,29 +59,27 @@ private[net] class AsynchronousChannelGroupIpSocketsProvider[F[_]] private (
5959
address.resolve[F].flatMap { ip =>
6060
F.async[AsynchronousSocketChannel] { cb =>
6161
F.delay {
62-
ch.connect(
63-
ip.toInetSocketAddress,
64-
null,
65-
new CompletionHandler[Void, Void] {
66-
def completed(result: Void, attachment: Void): Unit =
67-
cb(Right(ch))
68-
def failed(rsn: Throwable, attachment: Void): Unit =
69-
cb(Left(rsn))
70-
}
71-
)
72-
}
73-
.as(Some(F.delay(ch.close())))
62+
ch.connect(
63+
ip.toInetSocketAddress,
64+
null,
65+
new CompletionHandler[Void, Void] {
66+
def completed(result: Void, attachment: Void): Unit =
67+
cb(Right(ch))
68+
def failed(rsn: Throwable, attachment: Void): Unit =
69+
cb(Left(rsn))
70+
}
71+
)
72+
}.as(Some(F.delay(ch.close())))
7473
}
7574
}
7675

7776
setup.evalMap(ch => connect(ch) *> Socket.forAsync(ch))
78-
}
77+
}
7978

8079
override def bind(
81-
address: SocketAddress[Host],
82-
options: List[SocketOption]
80+
address: SocketAddress[Host],
81+
options: List[SocketOption]
8382
): Resource[F, ServerSocket[F]] = {
84-
8583

8684
val setup: Resource[F, AsynchronousServerSocketChannel] =
8785
Resource.eval(address.host.resolve[F]).flatMap { addr =>
@@ -111,17 +109,16 @@ private[net] class AsynchronousChannelGroupIpSocketsProvider[F[_]] private (
111109
poll {
112110
F.async[AsynchronousSocketChannel] { cb =>
113111
F.delay {
114-
sch.accept(
115-
null,
116-
new CompletionHandler[AsynchronousSocketChannel, Void] {
117-
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
118-
cb(Right(ch))
119-
def failed(rsn: Throwable, attachment: Void): Unit =
120-
cb(Left(rsn))
121-
}
122-
)
123-
}
124-
.as(Some(F.delay(sch.close())))
112+
sch.accept(
113+
null,
114+
new CompletionHandler[AsynchronousSocketChannel, Void] {
115+
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
116+
cb(Right(ch))
117+
def failed(rsn: Throwable, attachment: Void): Unit =
118+
cb(Left(rsn))
119+
}
120+
)
121+
}.as(Some(F.delay(sch.close())))
125122
}
126123
}
127124
}(ch => F.delay(if (ch.isOpen) ch.close else ()))

io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,30 @@ import com.comcast.ip4s.{Host, IpAddress, Ipv4Address, Port, SocketAddress}
2929

3030
private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
3131

32-
def fromIpSockets[F[_]: Async](ipSockets: IpSocketsProvider[F]): SocketGroup[F] = new SocketGroup[F] {
33-
def client(to: SocketAddress[Host], options: List[SocketOption]) =
34-
ipSockets.connect(to, options)
32+
def fromIpSockets[F[_]: Async](ipSockets: IpSocketsProvider[F]): SocketGroup[F] =
33+
new SocketGroup[F] {
34+
def client(to: SocketAddress[Host], options: List[SocketOption]) =
35+
ipSockets.connect(to, options)
3536

36-
def server(address: Option[Host], port: Option[Port], options: List[SocketOption]): Stream[F, Socket[F]] =
37-
Stream.resource(serverResource(address, port, options)).flatMap(_._2)
37+
def server(
38+
address: Option[Host],
39+
port: Option[Port],
40+
options: List[SocketOption]
41+
): Stream[F, Socket[F]] =
42+
Stream.resource(serverResource(address, port, options)).flatMap(_._2)
3843

39-
def serverResource(address: Option[Host], port: Option[Port], options: List[SocketOption]): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
40-
ipSockets.bind(SocketAddress(address.getOrElse(Ipv4Address.Wildcard), port.getOrElse(Port.Wildcard)), options).evalMap(b => b.localAddressGen.map(_.asInstanceOf[SocketAddress[IpAddress]]).tupleRight(b.accept))
41-
}
44+
def serverResource(
45+
address: Option[Host],
46+
port: Option[Port],
47+
options: List[SocketOption]
48+
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
49+
ipSockets
50+
.bind(
51+
SocketAddress(address.getOrElse(Ipv4Address.Wildcard), port.getOrElse(Port.Wildcard)),
52+
options
53+
)
54+
.evalMap(b =>
55+
b.localAddressGen.map(_.asInstanceOf[SocketAddress[IpAddress]]).tupleRight(b.accept)
56+
)
57+
}
4258
}

io/jvm-native/src/main/scala/fs2/io/net/SocketInfoPlatform.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,16 @@ private[net] trait SocketInfoCompanionPlatform {
5151
}
5252
)
5353

54-
override def supportedOptions: F[Set[SocketOption.Key[_]]] =
54+
override def supportedOptions: F[Set[SocketOption.Key[?]]] =
5555
asyncInstance.delay {
5656
channel.supportedOptions.asScala.toSet
5757
}
5858

5959
override def getOption[A](key: SocketOption.Key[A]): F[Option[A]] =
6060
asyncInstance.delay {
61-
try {
61+
try
6262
Some(channel.getOption(key))
63-
} catch {
63+
catch {
6464
case _: UnsupportedOperationException => None
6565
}
6666
}
@@ -73,4 +73,3 @@ private[net] trait SocketInfoCompanionPlatform {
7373
}
7474

7575
}
76-

io/jvm-native/src/main/scala/fs2/io/net/SocketPlatform.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ private[net] trait SocketCompanionPlatform {
108108
readMutex: Mutex[F],
109109
writeMutex: Mutex[F]
110110
)(implicit F: Async[F])
111-
extends BufferedReads[F](readMutex) with SocketInfo.AsyncSocketInfo[F] {
111+
extends BufferedReads[F](readMutex)
112+
with SocketInfo.AsyncSocketInfo[F] {
112113

113114
protected def asyncInstance = F
114115
protected def channel = ch

io/jvm/src/main/scala/fs2/io/net/JdkUnixSocketsProvider.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ private[net] class JdkUnixSocketsProvider[F[_]: Files](implicit F: Async[F])
7070
}
7171
.map { sch =>
7272
SocketInfo.forAsync(sch) ->
73-
Resource.makeFull[F, SocketChannel] { poll =>
74-
poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close())))
75-
}(ch => F.blocking(ch.close()))
73+
Resource
74+
.makeFull[F, SocketChannel] { poll =>
75+
poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close())))
76+
}(ch => F.blocking(ch.close()))
7677
.evalTap(ch => F.delay(options.foreach(o => ch.setOption(o.key, o.value))))
7778
}
7879
)

io/jvm/src/main/scala/fs2/io/net/JnrUnixSocketsProvider.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,23 @@ private[net] class JnrUnixSocketsProvider[F[_]](implicit F: Async[F], F2: Files[
7070
.cancelable(F.blocking(sch.close()))
7171
}
7272
.map { sch =>
73-
def raiseOptionError[A]: F[A] =
74-
F.raiseError(new UnsupportedOperationException("JNR unix server sockets do not support socket options"))
73+
def raiseOptionError[A]: F[A] =
74+
F.raiseError(
75+
new UnsupportedOperationException(
76+
"JNR unix server sockets do not support socket options"
77+
)
78+
)
7579
val info: SocketInfo[F] = new SocketInfo[F] {
7680
def supportedOptions = F.pure(Set.empty)
7781
def getOption[A](key: SocketOption.Key[A]) = raiseOptionError
7882
def setOption[A](key: SocketOption.Key[A], value: A) = raiseOptionError
7983
def localAddressGen = F.pure(address)
8084
}
8185
info ->
82-
Resource.makeFull[F, SocketChannel] { poll =>
83-
F.widen(poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close()))))
84-
}(ch => F.blocking(ch.close()))
86+
Resource
87+
.makeFull[F, SocketChannel] { poll =>
88+
F.widen(poll(F.blocking(sch.accept).cancelable(F.blocking(sch.close()))))
89+
}(ch => F.blocking(ch.close()))
8590
.evalTap(ch => F.delay(options.foreach(o => ch.setOption(o.key, o.value))))
8691
}
8792
}

0 commit comments

Comments
 (0)