Skip to content

Commit b649d00

Browse files
committed
JS tests passing
1 parent 598d3ed commit b649d00

17 files changed

+495
-225
lines changed

io/js/src/main/scala/fs2/io/internal/facade/net.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private[io] object net {
5959
trait ServerAddress extends js.Object {
6060
def address: String = js.native
6161
def port: Int = js.native
62+
def path: String = js.native
6263
}
6364

6465
trait ServerOptions extends js.Object {
@@ -110,6 +111,8 @@ private[io] object net {
110111

111112
def setTimeout(timeout: Double): Socket = js.native
112113

114+
def timeout: Double = js.native
115+
113116
}
114117

115118
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package net
25+
26+
import cats.effect.{Async, Resource}
27+
import cats.effect.std.Dispatcher
28+
import cats.effect.syntax.all._
29+
import cats.syntax.all._
30+
import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress, UnixSocketAddress}
31+
import fs2.concurrent.Channel
32+
import fs2.io.internal.facade
33+
34+
import scala.scalajs.js
35+
36+
private[net] trait IpSocketsProviderCompanionPlatform { self: IpSocketsProvider.type =>
37+
38+
private[net] def forAsync[F[_]: Async]: IpSocketsProvider[F] =
39+
new AsyncIpSocketsProvider[F]()(implicitly, Dns.forAsync[F])
40+
41+
private[net] final class AsyncIpSocketsProvider[F[_]](implicit F: Async[F], F2: Dns[F])
42+
extends IpSocketsProvider[F] {
43+
44+
private def setSocketOptions(options: List[SocketOption])(socket: facade.net.Socket): F[Unit] =
45+
options.traverse_(option => option.key.set(socket, option.value))
46+
47+
override def connect(
48+
to: SocketAddress[Host],
49+
options: List[SocketOption]
50+
): Resource[F, Socket[F]] =
51+
(for {
52+
sock <- Resource
53+
.make(
54+
F.delay(
55+
new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true })
56+
)
57+
)(sock =>
58+
F.delay {
59+
if (!sock.destroyed)
60+
sock.destroy()
61+
}
62+
)
63+
.evalTap(setSocketOptions(options))
64+
socket <- Socket.forAsync(sock)
65+
_ <- F
66+
.async[Unit] { cb =>
67+
sock
68+
.registerOneTimeListener[F, js.Error]("error") { error =>
69+
cb(Left(js.JavaScriptException(error)))
70+
} <* F.delay {
71+
sock.connect(to.port.value, to.host.toString, () => cb(Right(())))
72+
}
73+
}
74+
.toResource
75+
} yield socket).adaptError { case IOException(ex) => ex }
76+
77+
override def bind(
78+
address: SocketAddress[Host],
79+
options: List[SocketOption]
80+
): Resource[F, ServerSocket[F]] =
81+
(for {
82+
dispatcher <- Dispatcher.sequential[F]
83+
channel <- Channel.unbounded[F, facade.net.Socket].toResource
84+
server <- Resource.make(
85+
F
86+
.delay(
87+
facade.net.createServer(
88+
new facade.net.ServerOptions {
89+
pauseOnConnect = true
90+
allowHalfOpen = true
91+
},
92+
sock => dispatcher.unsafeRunAndForget(channel.send(sock))
93+
)
94+
)
95+
)(server =>
96+
F.async[Unit] { cb =>
97+
if (server.listening)
98+
F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) *>
99+
channel.close.as(None)
100+
else
101+
F.delay(cb(Right(()))).as(None)
102+
}
103+
)
104+
ip <- Resource.eval(address.host.resolve[F])
105+
_ <- F
106+
.async[Unit] { cb =>
107+
server.registerOneTimeListener[F, js.Error]("error") { e =>
108+
cb(Left(js.JavaScriptException(e)))
109+
} <* F.delay {
110+
if (ip.isWildcard)
111+
server.listen(address.port.value, () => cb(Right(())))
112+
else
113+
server.listen(address.port.value, ip.toString, () => cb(Right(())))
114+
}
115+
}
116+
.toResource
117+
info = new SocketInfo[F] {
118+
def localAddressGen = F.delay {
119+
val address = server.address()
120+
if (address.port ne null)
121+
SocketAddress(IpAddress.fromString(address.address).get, Port.fromInt(address.port).get)
122+
else
123+
UnixSocketAddress(address.path)
124+
}
125+
126+
def getOption[A](key: SocketOption.Key[A]) =
127+
F.raiseError(new UnsupportedOperationException)
128+
def setOption[A](key: SocketOption.Key[A], value: A) =
129+
F.raiseError(new UnsupportedOperationException)
130+
def supportedOptions =
131+
F.raiseError(new UnsupportedOperationException)
132+
}
133+
sockets = channel.stream
134+
.evalTap(setSocketOptions(options))
135+
.flatMap(sock => Stream.resource(Socket.forAsync(sock)))
136+
} yield ServerSocket(info, sockets)).adaptError { case IOException(ex) => ex }
137+
}
138+
}

io/js/src/main/scala/fs2/io/net/NetworkPlatform.scala

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,8 @@ package fs2
2323
package io
2424
package net
2525

26-
import cats.effect.IO
27-
import cats.effect.LiftIO
28-
import cats.effect.kernel.Async
29-
import cats.effect.kernel.Resource
30-
import com.comcast.ip4s.Host
31-
import com.comcast.ip4s.IpAddress
32-
import com.comcast.ip4s.Port
33-
import com.comcast.ip4s.SocketAddress
26+
import cats.effect.{Async, IO, LiftIO, Resource}
27+
import com.comcast.ip4s.{GenSocketAddress, Host, Port, SocketAddress, UnixSocketAddress}
3428
import fs2.io.net.tls.TLSContext
3529

3630
private[net] trait NetworkPlatform[F[_]]
@@ -43,31 +37,37 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
4337
forAsync
4438
}
4539

40+
// TODO pull up
41+
import cats.ApplicativeThrow
42+
private def matchAddress[F[_]: ApplicativeThrow, A](address: GenSocketAddress, ifIp: SocketAddress[Host] => F[A], ifUnix: UnixSocketAddress => F[A]): F[A] =
43+
address match {
44+
case sa: SocketAddress[Host] => ifIp(sa)
45+
case ua: UnixSocketAddress => ifUnix(ua)
46+
case other => ApplicativeThrow[F].raiseError(new UnsupportedOperationException(s"Unsupported address type: $other"))
47+
}
48+
4649
def forAsync[F[_]](implicit F: Async[F]): Network[F] =
47-
new UnsealedNetwork[F] {
50+
new AsyncNetwork[F] {
4851

49-
private lazy val socketGroup = SocketGroup.forAsync[F]
52+
private lazy val ipSockets = IpSocketsProvider.forAsync[F]
53+
private lazy val unixSockets = UnixSocketsProvider.forAsync[F]
5054
private lazy val datagramSocketGroup = DatagramSocketGroup.forAsync[F]
5155

52-
override def client(
53-
to: SocketAddress[Host],
54-
options: List[SocketOption]
56+
override def connect(
57+
address: GenSocketAddress,
58+
options: List[SocketOption]
5559
): Resource[F, Socket[F]] =
56-
socketGroup.client(to, options)
60+
matchAddress(address,
61+
sa => ipSockets.connect(sa, options),
62+
ua => unixSockets.connect(ua, options))
5763

58-
override def server(
59-
address: Option[Host],
60-
port: Option[Port],
61-
options: List[SocketOption]
62-
): Stream[F, Socket[F]] =
63-
socketGroup.server(address, port, options)
64-
65-
override def serverResource(
66-
address: Option[Host],
67-
port: Option[Port],
68-
options: List[SocketOption]
69-
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] =
70-
socketGroup.serverResource(address, port, options)
64+
override def bind(
65+
address: GenSocketAddress,
66+
options: List[SocketOption]
67+
): Resource[F, ServerSocket[F]] =
68+
matchAddress(address,
69+
sa => ipSockets.bind(sa, options),
70+
ua => unixSockets.bind(ua, options))
7171

7272
override def openDatagramSocket(
7373
address: Option[Host],

io/jvm-native/src/main/scala/fs2/io/net/ServerSocketPlatform.scala renamed to io/js/src/main/scala/fs2/io/net/SocketInfoPlatform.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,34 @@ package fs2
2323
package io
2424
package net
2525

26-
private[net] trait ServerSocketCompanionPlatform {
27-
private[net] def apply[F[_]](info: SocketInfo[F], accept: Stream[F, Socket[F]]): ServerSocket[F] = {
28-
val accept0 = accept
29-
new UnsealedServerSocket[F] {
30-
def accept: Stream[F, Socket[F]] = accept0
26+
import com.comcast.ip4s.{GenSocketAddress, SocketAddress}
27+
import cats.effect.Async
28+
import fs2.io.internal.facade
3129

32-
def getOption[A](key: SocketOption.Key[A]): F[Option[A]] = info.getOption(key)
33-
def setOption[A](key: SocketOption.Key[A], value: A) = info.setOption(key, value)
34-
def supportedOptions = info.supportedOptions
35-
36-
def localAddressGen = info.localAddressGen
30+
private[net] trait SocketInfoCompanionPlatform {
31+
private[net] def forAsync[F[_]](sock: facade.net.Socket)(implicit F: Async[F]): SocketInfo[F] = {
32+
val sock0 = sock
33+
new AsyncSocketInfo[F] {
34+
def asyncInstance = F
35+
def sock: facade.net.Socket = sock0
3736
}
3837
}
39-
}
4038

39+
40+
private[net] trait AsyncSocketInfo[F[_]] extends SocketInfo[F] {
41+
42+
implicit protected def asyncInstance: Async[F]
43+
44+
protected def sock: facade.net.Socket
45+
46+
override def localAddressGen: F[GenSocketAddress] = ???
47+
48+
override def supportedOptions: F[Set[SocketOption.Key[_]]] = ???
49+
50+
override def getOption[A](key: SocketOption.Key[A]): F[Option[A]] =
51+
key.get(sock)
52+
53+
override def setOption[A](key: SocketOption.Key[A], value: A): F[Unit] =
54+
key.set(sock, value)
55+
}
56+
}

io/js/src/main/scala/fs2/io/net/SocketOptionPlatform.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ package fs2.io.net
2424
import cats.effect.kernel.Sync
2525
import fs2.io.internal.facade
2626

27-
import scala.concurrent.duration.FiniteDuration
27+
import scala.concurrent.duration._
2828

2929
private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
3030
sealed trait Key[A] {
3131
private[net] def set[F[_]: Sync](sock: facade.net.Socket, value: A): F[Unit]
32+
private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[A]]
3233
}
3334

3435
private object Encoding extends Key[String] {
@@ -37,6 +38,8 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
3738
sock.setEncoding(value)
3839
()
3940
}
41+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[String]] =
42+
Sync[F].raiseError(new UnsupportedOperationException)
4043
}
4144

4245
private object KeepAlive extends Key[Boolean] {
@@ -45,6 +48,8 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
4548
sock.setKeepAlive(value)
4649
()
4750
}
51+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[Boolean]] =
52+
Sync[F].raiseError(new UnsupportedOperationException)
4853
}
4954

5055
private object NoDelay extends Key[Boolean] {
@@ -53,6 +58,9 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
5358
sock.setNoDelay(value)
5459
()
5560
}
61+
62+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[Boolean]] =
63+
Sync[F].raiseError(new UnsupportedOperationException)
5664
}
5765

5866
private object Timeout extends Key[FiniteDuration] {
@@ -64,11 +72,36 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
6472
sock.setTimeout(value.toMillis.toDouble)
6573
()
6674
}
75+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[FiniteDuration]] =
76+
Sync[F].delay {
77+
Some(sock.timeout.toLong.millis)
78+
}
79+
}
80+
81+
object UnixServerSocketDeleteIfExists extends Key[Boolean] {
82+
override private[net] def set[F[_]: Sync](
83+
sock: facade.net.Socket,
84+
value: Boolean
85+
): F[Unit] = Sync[F].unit
86+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[Boolean]] =
87+
Sync[F].pure(None)
88+
}
89+
90+
object UnixServerSocketDeleteOnClose extends Key[Boolean] {
91+
override private[net] def set[F[_]: Sync](
92+
sock: facade.net.Socket,
93+
value: Boolean
94+
): F[Unit] = Sync[F].unit
95+
override private[net] def get[F[_]: Sync](sock: facade.net.Socket): F[Option[Boolean]] =
96+
Sync[F].pure(None)
6797
}
6898

6999
def encoding(value: String): SocketOption = apply(Encoding, value)
70100
def keepAlive(value: Boolean): SocketOption = apply(KeepAlive, value)
71101
def noDelay(value: Boolean): SocketOption = apply(NoDelay, value)
72102
def timeout(value: FiniteDuration): SocketOption = apply(Timeout, value)
73-
103+
def unixServerSocketDeleteIfExists(value: Boolean): SocketOption =
104+
apply(UnixServerSocketDeleteIfExists, value)
105+
def unixServerSocketDeleteOnClose(value: Boolean): SocketOption =
106+
apply(UnixServerSocketDeleteOnClose, value)
74107
}

0 commit comments

Comments
 (0)