Skip to content

Commit 95c266f

Browse files
authored
hstream: trim log which stores changelog of query state after snapshotting (#1745)
1 parent 43b96cb commit 95c266f

File tree

2 files changed

+7
-0
lines changed
  • hstream/src/HStream/Server/Handler
  • hstream-processing/src/HStream/Processing/Processor

2 files changed

+7
-0
lines changed

hstream-processing/src/HStream/Processing/Processor/ChangeLog.hs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import qualified RIO.ByteString.Lazy as BL
1818
class ChangeLogger h where
1919
logChangelog :: h -> BL.ByteString -> IO ()
2020
getChangelogProgress :: h -> IO Word64 -- FIXME: use type variable i
21+
trimChangelog :: h -> Word64 -> IO () -- FIXME: use type variable i
2122

2223
data StateStoreChangelog k v ser
2324
= CLKSPut Text k v -- HS.table: K/V; HG.aggregate: K/V; HTW.aggregate: K/V

hstream/src/HStream/Server/Handler/Common.hs

+6
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,14 @@ data QueryRunner = QueryRunner {
309309
instance ChangeLogger () where
310310
logChangelog () _ = return ()
311311
getChangelogProgress () = return minBound
312+
trimChangelog () _ = return ()
312313

313314
-- use logdevice stream
314315
instance ChangeLogger (S.LDClient, S.C_LogID) where
315316
logChangelog (ldClient, logId) bs =
316317
void $ S.append ldClient logId (lazyByteStringToBytes bs) Nothing
317318
getChangelogProgress (ldClient, logId) = S.getTailLSN ldClient logId
319+
trimChangelog (ldClient, logId) lsn = S.trim ldClient logId lsn
318320

319321
---- store processing node states (snapshot)
320322
-- do nothing
@@ -325,6 +327,8 @@ instance Snapshotter () where
325327
instance Snapshotter RocksDB.DB where
326328
snapshot db = RocksDB.put db def
327329

330+
-- | Do snapshot for a task, then trim old changelogs for this task.
331+
-- May throw exceptions.
328332
doSnapshot :: (ChangeLogger h1, Snapshotter h2) => h1 -> h2 -> Task -> IO ()
329333
doSnapshot h1 h2 Task{..} = do
330334
changelogTail <- getChangelogProgress h1
@@ -350,6 +354,8 @@ doSnapshot h1 h2 Task{..} = do
350354
valueSer = BL.toStrict $ Aeson.encode value
351355
snapshot h2 keySer valueSer
352356
Log.debug $ "Query " <> Log.build taskName <> ": I have successfully done a snapshot!"
357+
trimChangelog h1 changelogTail
358+
Log.debug $ "Query " <> Log.build taskName <> ": I have successfully trimmed the old changelog!"
353359

354360
--------------------------------------------------------------------------------
355361

0 commit comments

Comments
 (0)