@@ -47,9 +47,8 @@ object RedisConnection{
47
47
) extends RedisConnection [F ]{
48
48
def runRequest (inputs : Chunk [NonEmptyList [ByteVector ]], key : Option [ByteVector ]): F [Chunk [Resp ]] = {
49
49
val chunk = Chunk .from(inputs.toList.map(Resp .renderRequest))
50
- def withSocket (socket : Socket [F ]): F [Chunk [Resp ]] = explicitPipelineRequest[F ](socket, chunk, Defaults .maxBytes, redisRequestTimeout)
51
50
pool.take(()).use{
52
- m => withSocket (m.value).timeout(redisRequestTimeout).attempt.flatTap{
51
+ m => explicitPipelineRequest[ F ] (m.value, chunk, Defaults .maxBytes, redisRequestTimeout, m.canBeReused.set( Reusable . DontReuse ) ).timeout(redisRequestTimeout).attempt.flatTap{
53
52
case Left (_) => m.canBeReused.set(Reusable .DontReuse )
54
53
case _ => Applicative [F ].unit
55
54
}
@@ -60,7 +59,7 @@ object RedisConnection{
60
59
private [rediculous] case class DirectConnection [F [_]: Temporal ](socket : Socket [F ], commandTimeout : Duration , redisRequestTimeout : Duration ) extends RedisConnection [F ]{
61
60
def runRequest (inputs : Chunk [NonEmptyList [ByteVector ]], key : Option [ByteVector ]): F [Chunk [Resp ]] = {
62
61
val chunk = Chunk .from(inputs.toList.map(Resp .renderRequest))
63
- def withSocket (socket : Socket [F ]): F [Chunk [Resp ]] = explicitPipelineRequest[F ](socket, chunk, Defaults .maxBytes, redisRequestTimeout)
62
+ def withSocket (socket : Socket [F ]): F [Chunk [Resp ]] = explicitPipelineRequest[F ](socket, chunk, Defaults .maxBytes, redisRequestTimeout, socket.endOfOutput )
64
63
withSocket(socket)
65
64
}.timeoutTo(commandTimeout, Defer [F ].defer(Temporal [F ].raiseError[Chunk [Resp ]](RedisError .CommandTimeoutException (commandTimeout))))
66
65
}
@@ -78,14 +77,18 @@ object RedisConnection{
78
77
79
78
// Guarantees With Socket That Each Call Receives a Response
80
79
// Chunk must be non-empty but to do so incurs a penalty
81
- private [rediculous] def explicitPipelineRequest [F [_]: Temporal ](socket : Socket [F ], calls : Chunk [Resp ], maxBytes : Int , redisRequestTimeout : Duration ): F [Chunk [Resp ]] = {
80
+ private [rediculous] def explicitPipelineRequest [F [_]: Temporal ](socket : Socket [F ], calls : Chunk [Resp ], maxBytes : Int , redisRequestTimeout : Duration , removeConnection : F [ Unit ] ): F [Chunk [Resp ]] = {
82
81
val out = calls.flatMap(resp =>
83
82
Resp .CodecUtils .codec.encode(resp).toEither.traverse(bits => Chunk .byteVector(bits.bytes))
84
83
).sequence.leftMap(err => new Throwable (s " Failed To Encode Response $err" )).liftTo[F ]
85
84
out.flatMap{bytes =>
86
85
87
86
val request = socket.write(bytes) >>
88
87
Stream .eval(socket.read(maxBytes))
88
+ .evalTap{
89
+ case None => removeConnection
90
+ case _ => Applicative [F ].unit
91
+ }
89
92
.repeat
90
93
.unNoneTerminate
91
94
.unchunks
@@ -95,6 +98,7 @@ object RedisConnection{
95
98
.to(Chunk )
96
99
97
100
request.timeoutTo(redisRequestTimeout, Defer [F ].defer(Temporal [F ].raiseError[Chunk [Resp ]](RedisError .RedisRequestTimeoutException (redisRequestTimeout))))
101
+ .onError{ case _ => removeConnection }
98
102
}
99
103
}
100
104
@@ -497,7 +501,7 @@ object RedisConnection{
497
501
keypool.take(()).attempt.use{
498
502
case Right (m) =>
499
503
val out = chunk.map(_._2)
500
- explicitPipelineRequest(m.value, out, Defaults .maxBytes, redisRequestTimeout)
504
+ explicitPipelineRequest(m.value, out, Defaults .maxBytes, redisRequestTimeout, m.canBeReused.set( Reusable . DontReuse ) )
501
505
.attempt
502
506
.timeout(redisRequestTimeout) // Apply Timeout To Call to Redis, this is independent of the timeout on individual calls
503
507
.flatTap{// Currently Guarantee Chunk.size === returnSize
@@ -718,7 +722,7 @@ object RedisConnection{
718
722
keypool.take(server).attempt.use{
719
723
case Right (m) =>
720
724
val out = Chunk .from(rest.map(_._5))
721
- explicitPipelineRequest(m.value, out, Defaults .maxBytes, redisRequestTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
725
+ explicitPipelineRequest(m.value, out, Defaults .maxBytes, redisRequestTimeout, m.canBeReused.set( Reusable . DontReuse ) ).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
722
726
case Left (_) => m.canBeReused.set(Reusable .DontReuse )
723
727
case _ => Applicative [F ].unit
724
728
}
0 commit comments