Skip to content

Commit b81084d

Browse files
authored
hstream-kafka: add an OffsetManager (#1602)
* add an OffsetManager * Fix SQL tests due to upgraded hashable
1 parent a9b3773 commit b81084d

File tree

10 files changed

+256
-59
lines changed

10 files changed

+256
-59
lines changed

cabal.project

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ package rocksdb-haskell-bindings
5656
extra-lib-dirs: /usr/local/lib
5757

5858
constraints:
59-
Z-Data == 2.0.0.1
59+
Z-Data == 2.0.0.2
6060
, zoovisitor == 0.2.6.1
6161
, blaze-textual == 0.2.1.0
6262
, entropy == 0.4.1.7

common/kafka/hstream-kafka.cabal

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ common shared-properties
4343
library
4444
import: shared-properties
4545
exposed-modules:
46+
Kafka.Common.OffsetManager
47+
Kafka.Common.RecordFormat
4648
Kafka.Protocol
4749
Kafka.Protocol.Encoding
4850
Kafka.Protocol.Error
@@ -61,11 +63,15 @@ library
6163
build-depends:
6264
, base >=4.11 && <5
6365
, bytestring
66+
, containers
6467
, digest
68+
, hashtables
6569
, hstream-common-base
70+
, hstream-store
6671
, network
6772
, text
6873
, vector
74+
, Z-Data
6975

7076
default-language: GHC2021
7177
default-extensions:
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
3+
module Kafka.Common.OffsetManager
4+
( OffsetManager
5+
, newOffsetManager
6+
, withOffset
7+
, withOffsetN
8+
, cleanOffsetCache
9+
, getOldestOffset
10+
, getLatestOffset
11+
) where
12+
13+
import Control.Concurrent
14+
import Control.Exception
15+
import Control.Monad
16+
import Data.ByteString (ByteString)
17+
import qualified Data.HashTable.IO as H
18+
import Data.Int
19+
import Data.Word
20+
import GHC.Stack (HasCallStack)
21+
22+
import qualified HStream.Store as S
23+
import qualified HStream.Store.Internal.LogDevice as S
24+
import Kafka.Common.RecordFormat
25+
import qualified Kafka.Protocol.Encoding as K
26+
27+
-------------------------------------------------------------------------------
28+
29+
type HashTable k v = H.BasicHashTable k v
30+
31+
data OffsetManager = OffsetManager
32+
{ offsets :: HashTable Word64 (MVar Int64)
33+
-- ^ Offsets cache
34+
--
35+
-- TODO:
36+
-- * use FastMutInt as value (?)
37+
, offsetsLock :: MVar ()
38+
, store :: S.LDClient
39+
, reader :: S.LDReader
40+
}
41+
42+
newOffsetManager :: S.LDClient -> Int -> IO OffsetManager
43+
newOffsetManager store maxLogs = do
44+
offsets <- H.new
45+
offsetsLock <- newMVar ()
46+
reader <- S.newLDReader store (fromIntegral maxLogs) Nothing
47+
pure OffsetManager{..}
48+
49+
withOffset :: OffsetManager -> Word64 -> (Int64 -> IO a) -> IO a
50+
withOffset m logid = withOffsetN m logid 0
51+
52+
-- thread safe version
53+
--
54+
-- NOTE: n must >= 1 and < (maxBound :: Int32)
55+
withOffsetN :: OffsetManager -> Word64 -> Int64 -> (Int64 -> IO a) -> IO a
56+
withOffsetN m@OffsetManager{..} logid n f = do
57+
m_offset <- H.lookup offsets logid
58+
case m_offset of
59+
Just offset -> modifyMVar offset $ \o -> do
60+
let !o' = o + n
61+
!a <- f o'
62+
pure (o', a)
63+
Nothing -> withMVar offsetsLock $ \_ -> do
64+
o' <- catch (do mo <- getLatestOffset m logid
65+
pure $ maybe (n - 1) (+ n) mo)
66+
(\(_ :: S.NOTFOUND) -> pure $ n - 1)
67+
H.insert offsets logid =<< newMVar o'
68+
f o'
69+
70+
cleanOffsetCache :: OffsetManager -> Word64 -> IO ()
71+
cleanOffsetCache OffsetManager{..} logid = H.delete offsets logid
72+
73+
getOldestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
74+
getOldestOffset OffsetManager{..} logid = do
75+
isEmpty <- S.isLogEmpty store logid
76+
if isEmpty
77+
then pure Nothing
78+
else do
79+
-- Actually, we only need the first lsn but there is no easy way to get
80+
Just . offset <$> readOneRecord reader logid S.LSN_MIN S.LSN_MAX
81+
82+
getLatestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
83+
getLatestOffset OffsetManager{..} logid = do
84+
-- FIXME: first check is empty log seems blocking.
85+
isEmpty <- S.isLogEmpty store logid
86+
if isEmpty
87+
then pure Nothing
88+
else do tailLsn <- S.getTailLSN store logid
89+
Just . offset <$> readOneRecord reader logid tailLsn tailLsn
90+
91+
-- TODO
92+
-- getOffsetByTime :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO Int64
93+
-- getOffsetByTime OffsetManager{..} logid timestamp = undefined
94+
95+
-------------------------------------------------------------------------------
96+
97+
-- Return the first read RecordFormat
98+
--
99+
-- FIXME: what happens when read an empty log?
100+
readOneRecord
101+
:: HasCallStack
102+
=> S.LDReader -> Word64 -> S.LSN -> S.LSN -> IO RecordFormat
103+
readOneRecord reader logid start end = finally acquire release
104+
where
105+
acquire = do
106+
S.readerSetTimeout reader 1000
107+
S.readerStartReading reader logid start end
108+
dataRecords <- S.readerRead @ByteString reader 1
109+
case dataRecords of
110+
[S.DataRecord{..}] -> K.runGet recordPayload
111+
xs -> -- TODO
112+
ioError $ userError $ "Invalid reader result " <> show xs
113+
release = do
114+
isReading <- S.readerIsReading reader logid
115+
when isReading $ S.readerStopReading reader logid
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Kafka.Common.RecordFormat
2+
( RecordFormat (..)
3+
) where
4+
5+
import Data.ByteString (ByteString)
6+
import Data.Int
7+
import GHC.Generics (Generic)
8+
9+
import qualified Kafka.Protocol.Encoding as K
10+
11+
-- Format to store in logdevice
12+
data RecordFormat = RecordFormat
13+
{ offset :: Int64
14+
, batchLength :: Int32
15+
, recordBytes :: ByteString
16+
} deriving (Generic, Show)
17+
18+
instance K.Serializable RecordFormat

hstream-admin/server/hstream-admin-server.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ library
8282
, deepseq
8383
, grpc-haskell
8484
, grpc-haskell-core
85-
, hashable >=1.2.7.0 && <1.4
85+
, hashable >=1.2.7.0 && <1.5
8686
, haskeline ^>=0.8.1
8787
, hstream-api-hs
8888
, hstream-common

hstream-diffflow/test/DiffFlow/LoopSpec.hs

Lines changed: 93 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ checkStep2 isDone reach_m reachSummary_m = describe "check reach summary out" $
157157
#if MIN_VERSION_aeson(2,0,0)
158158
--------------------------------------------------------------------------------
159159

160+
#if MIN_VERSION_hashable(1,4,0)
161+
dcbs1 :: [[DataChange Object Word32]]
162+
dcbs1 = dcbs1'
163+
164+
dcbs2 :: [[DataChange Object Word32]]
165+
dcbs2 = dcbs2'
166+
167+
dcbs3 :: [[DataChange Object Word32]]
168+
dcbs3 = dcbs3'
169+
170+
dcbs4 :: [[DataChange Object Word32]]
171+
dcbs4 = dcbs4'
172+
#else
160173
dcbs1 :: [[DataChange Object Word32]]
161174
dcbs1 = [ [DataChange (A.fromList [("v1", "c"), ("v2", "a")]) (Timestamp 0 []) 1]
162175
, [DataChange (A.fromList [("v1", "a"), ("v2", "b")]) (Timestamp 0 []) 1]
@@ -221,6 +234,7 @@ dcbs4 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1
221234
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1
222235
, DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)]
223236
]
237+
#endif
224238

225239
--------------------------------------------------------------------------------
226240
#else
@@ -244,53 +258,92 @@ dcbs1 = [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 0 []) 1
244258
]
245259

246260
dcbs2 :: [[DataChange Object Word32]]
247-
dcbs2 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 0 [1]) 1]
248-
, [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [1]) 1]
249-
, [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [1]) 1]
250-
251-
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 0 [2]) (-1)
252-
, DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 0 [2]) 1]
253-
, [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [2]) (-1)
254-
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 0 [2]) 1]
255-
, [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [2]) (-1)
256-
, DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [2]) 1]
257-
258-
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 0 [3]) (-1)
259-
, DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 0 [3]) 1]
260-
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 0 [3]) (-1)
261-
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 0 [3]) 1]
262-
, [DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [3]) (-1)
263-
, DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 0 [3]) 1]
264-
]
261+
dcbs2 = dcbs2'
265262

266263
dcbs3 :: [[DataChange Object Word32]]
267-
dcbs3 = [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)]
264+
dcbs3 =
265+
[ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)]
268266

269-
, [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)]
270-
, [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)]
267+
, [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)]
268+
, [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)]
271269

272-
, [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)]
273-
, [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)]
274-
, [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)]
275-
]
270+
, [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)]
271+
, [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)]
272+
, [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)]
273+
]
276274

277275
dcbs4 :: [[DataChange Object Word32]]
278-
dcbs4 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1 [1]) (-1)
279-
, DataChange (A.fromList [("v1", "b"), ("reduced", "d" )]) (Timestamp 1 [1]) 1]
280-
281-
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 1 [2]) 1
282-
, DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 1 [2]) (-1)]
283-
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 1 [2]) (-1)
284-
, DataChange (A.fromList [("v1", "a"), ("reduced", "bd" )]) (Timestamp 1 [2]) 1]
285-
286-
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1
287-
, DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)]
288-
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 1 [3]) 1
289-
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 1 [3]) (-1)]
290-
, [DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 1 [3]) (-1)
291-
, DataChange (A.fromList [("v1", "c"), ("reduced", "abd" )]) (Timestamp 1 [3]) 1]
292-
]
276+
dcbs4 = dcbs4'
293277

294278
--------------------------------------------------------------------------------
295279
#endif
296280
--------------------------------------------------------------------------------
281+
282+
dcbs1' :: [[DataChange Object Word32]]
283+
dcbs1' =
284+
[ [DataChange (A.fromList [("v1", "a"), ("v2", "b")]) (Timestamp 0 []) 1]
285+
, [DataChange (A.fromList [("v1", "b"), ("v2", "d")]) (Timestamp 0 []) 1]
286+
, [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 0 []) 1]
287+
, [DataChange (A.fromList [("v1", "c"), ("v2", "a")]) (Timestamp 0 []) 1]
288+
289+
, [DataChange (A.fromList [("v1", "a"), ("v2", "d")]) (Timestamp 0 []) 1]
290+
, [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 0 []) 1]
291+
, [DataChange (A.fromList [("v1", "c"), ("v2", "b")]) (Timestamp 0 []) 1]
292+
, [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 0 []) 1]
293+
294+
, [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 0 []) 1]
295+
, [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 0 []) 1]
296+
, [DataChange (A.fromList [("v1", "c"), ("v2", "d")]) (Timestamp 0 []) 1]
297+
, [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 0 []) 1]
298+
]
299+
300+
dcbs2' :: [[DataChange Object Word32]]
301+
dcbs2' =
302+
[ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 0 [1]) 1]
303+
, [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [1]) 1]
304+
, [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [1]) 1]
305+
306+
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 0 [2]) (-1)
307+
, DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 0 [2]) 1]
308+
, [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [2]) (-1)
309+
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 0 [2]) 1]
310+
, [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [2]) (-1)
311+
, DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [2]) 1]
312+
313+
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 0 [3]) (-1)
314+
, DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 0 [3]) 1]
315+
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 0 [3]) (-1)
316+
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 0 [3]) 1]
317+
, [DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [3]) (-1)
318+
, DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 0 [3]) 1]
319+
]
320+
321+
dcbs3' :: [[DataChange Object Word32]]
322+
dcbs3' =
323+
[ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)]
324+
325+
, [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)]
326+
, [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)]
327+
328+
, [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)]
329+
, [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)]
330+
, [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)]
331+
]
332+
333+
dcbs4' :: [[DataChange Object Word32]]
334+
dcbs4' =
335+
[ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1 [1]) (-1)
336+
, DataChange (A.fromList [("v1", "b"), ("reduced", "d" )]) (Timestamp 1 [1]) 1]
337+
338+
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 1 [2]) 1
339+
, DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 1 [2]) (-1)]
340+
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 1 [2]) (-1)
341+
, DataChange (A.fromList [("v1", "a"), ("reduced", "bd" )]) (Timestamp 1 [2]) 1]
342+
343+
, [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1
344+
, DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)]
345+
, [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 1 [3]) 1
346+
, DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 1 [3]) (-1)]
347+
, [DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 1 [3]) (-1)
348+
, DataChange (A.fromList [("v1", "c"), ("reduced", "abd" )]) (Timestamp 1 [3]) 1]
349+
]

hstream-sql/etc/syntax-test-cases.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,9 @@ testSuiteCases:
238238
- testCaseFail: null
239239
testCaseLabel: Create Connectors
240240
testCaseResult: RQCreate (RCreateConnector "SOURCE" "source01" "mysql" False (RConnectorOptions
241-
(fromList [("port",Number 3306.0),("host",String "mysql-s1"),("password",String
242-
"password"),("user",String "root"),("database",String "d1"),("table",String "person"),("stream",String
243-
"stream01")])))
241+
(fromList [("password",String "password"),("table",String "person"),("database",String
242+
"d1"),("port",Number 3306.0),("stream",String "stream01"),("host",String "mysql-s1"),("user",String
243+
"root")])))
244244
testCaseStmts:
245245
- create source connector source01 from mysql with ("host" = 'mysql-s1', "port"
246246
= 3306, "user" = 'root', "password" = 'password', "database" = 'd1', "table" =

hstream-store/HStream/Store/Internal/LogDevice/Reader.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,12 @@ startReadingFromCheckpoint reader logid untilSeq =
121121
E.throwStreamErrorIfNotOK . warnSlow $
122122
ld_checkpointed_reader_start_reading_from_ckp ptr logid untilSeq
123123

124-
-- Note: The exception NOTFOUND means the log is not being read, either because
125-
-- readerStartReading() was never called (or readerStopReading() was
126-
-- called), or because `until` LSN was reached
127-
readerStopReading :: LDReader -> C_LogID -> IO ()
124+
-- Exceptions:
125+
-- NOTFOUND the log is not being read, either because readerStartReading()
126+
-- was never called (or readerStopReading() was called), or because
127+
-- `until` LSN was reached
128+
-- Any exceptions from AsyncReader::stopReading
129+
readerStopReading :: HasCallStack => LDReader -> C_LogID -> IO ()
128130
readerStopReading reader logid =
129131
withForeignPtr reader $ \ptr -> void $
130132
E.throwStreamErrorIfNotOK $ c_ld_reader_stop_reading ptr logid

0 commit comments

Comments
 (0)