@@ -23,11 +23,17 @@ module HStream.Server.Core.Stream
2323 , trimShards
2424 ) where
2525
26- import Control.Exception (catch , throwIO )
27- import Control.Monad (forM , forM_ , unless , when )
26+ import Control.Concurrent (getNumCapabilities )
27+ import Control.Concurrent.Async (mapConcurrently )
28+ import Control.Concurrent.QSem (QSem , newQSem , signalQSem ,
29+ waitQSem )
30+ import Control.Exception (bracket_ , catch , throwIO )
31+ import Control.Monad (forM , forM_ , unless , void ,
32+ when )
2833import qualified Data.Attoparsec.Text as AP
2934import qualified Data.ByteString as BS
3035import qualified Data.ByteString.Lazy as BSL
36+ import Data.Either (partitionEithers )
3137import Data.Functor ((<&>) )
3238import qualified Data.List as L
3339import qualified Data.Map.Strict as M
@@ -38,11 +44,6 @@ import qualified Data.Vector as V
3844import Data.Word (Word32 , Word64 )
3945import GHC.Stack (HasCallStack )
4046import Google.Protobuf.Timestamp (Timestamp )
41- import qualified Proto3.Suite as PT
42- import qualified Z.Data.CBytes as CB
43- import qualified ZooKeeper.Exception as ZK
44-
45- import Data.Either (partitionEithers )
4647import HStream.Base.Time (getSystemNsTimestamp )
4748import HStream.Common.Types
4849import qualified HStream.Common.ZookeeperSlotAlloc as Slot
@@ -59,6 +60,9 @@ import HStream.Server.Types (ServerContext (..),
5960import qualified HStream.Stats as Stats
6061import qualified HStream.Store as S
6162import HStream.Utils
63+ import qualified Proto3.Suite as PT
64+ import qualified Z.Data.CBytes as CB
65+ import qualified ZooKeeper.Exception as ZK
6266
6367-------------------------------------------------------------------------------
6468
@@ -213,8 +217,8 @@ trimStream ServerContext{..} streamName trimPoint = do
213217 Log. info $ " trimStream failed because stream " <> Log. build streamName <> " is not found."
214218 throwIO $ HE. StreamNotFound $ " stream " <> T. pack (show streamName) <> " is not found."
215219 shards <- M. elems <$> S. listStreamPartitions scLDClient streamId
216- forM_ shards $ \ shardId -> do
217- getTrimLSN scLDClient shardId trimPoint >>= S. trim scLDClient shardId
220+ concurrentCap <- getNumCapabilities
221+ void $ limitedMapConcuurently ( min 8 concurrentCap) ( \ shardId -> getTrimLSN scLDClient shardId trimPoint >>= S. trim scLDClient shardId) shards
218222 where
219223 streamId = transToStreamName streamName
220224
@@ -267,15 +271,18 @@ trimShards ServerContext{..} streamName recordIds = do
267271
268272 let streamId = transToStreamName streamName
269273 shards <- M. elems <$> S. listStreamPartitions scLDClient streamId
270- res <- forM points $ \ r@ Rid {.. } -> do
271- unless (rShardId `elem` shards) $
272- throwIO . HE. ShardNotExists $ " shard " <> show rShardId <> " doesn't belong to stream " <> show streamName
273- S. trim scLDClient rShardId (rBatchId - 1 )
274- Log. info $ " trim to " <> Log. build (show $ rBatchId - 1 )
275- <> " for shard " <> Log. build (show rShardId)
276- <> " , stream " <> Log. build streamName
277- return (rShardId, T. pack . show $ r)
274+ concurrentCap <- getNumCapabilities
275+ res <- limitedMapConcuurently (min 8 concurrentCap) (trim shards) points
278276 return $ M. fromList res
277+ where
278+ trim shards r@ Rid {.. } = do
279+ unless (rShardId `elem` shards) $
280+ throwIO . HE. ShardNotExists $ " shard " <> show rShardId <> " doesn't belong to stream " <> show streamName
281+ S. trim scLDClient rShardId (rBatchId - 1 )
282+ Log. info $ " trim to " <> Log. build (show $ rBatchId - 1 )
283+ <> " for shard " <> Log. build (show rShardId)
284+ <> " , stream " <> Log. build streamName
285+ return (rShardId, T. pack . show $ r)
279286
280287getStreamInfo :: ServerContext -> S. StreamId -> IO API. Stream
281288getStreamInfo ServerContext {.. } stream = do
@@ -479,3 +486,11 @@ getTrimLSN client shardId trimPoint = do
479486 OffsetTimestamp API. TimestampOffset {.. } -> do
480487 let accuracy = if timestampOffsetStrictAccuracy then S. FindKeyStrict else S. FindKeyApproximate
481488 S. findTime scLDClient logId timestampOffsetTimestampInMs accuracy
489+
490+ limitedMapConcuurently :: Int -> (a -> IO b ) -> [a ] -> IO [b ]
491+ limitedMapConcuurently maxConcurrency f inputs = do
492+ sem <- newQSem maxConcurrency
493+ mapConcurrently (limited sem . f) inputs
494+ where
495+ limited :: QSem -> IO c -> IO c
496+ limited sem = bracket_ (waitQSem sem) (signalQSem sem)
0 commit comments