|
6 | 6 | from .instance import DflyInstance, DflyInstanceFactory
|
7 | 7 |
|
8 | 8 |
|
| 9 | +def extract_fragmentation_waste(memory_arena): |
| 10 | + """ |
| 11 | + Extracts the fragmentation waste from the memory arena info. |
| 12 | + """ |
| 13 | + match = re.search(r"fragmentation waste:\s*([0-9.]+)%", memory_arena) |
| 14 | + assert match.group(1) is not None |
| 15 | + return float(match.group(1)) |
| 16 | + |
| 17 | + |
9 | 18 | @pytest.mark.slow
|
10 | 19 | @pytest.mark.opt_only
|
11 | 20 | @pytest.mark.parametrize(
|
@@ -176,49 +185,218 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
|
176 | 185 | assert rss_before_eval * 1.01 > info["used_memory_rss"]
|
177 | 186 |
|
178 | 187 |
|
179 |
| -@pytest.mark.skip("rss eviction disabled") |
180 | 188 | @pytest.mark.asyncio
|
181 |
| -@dfly_args( |
182 |
| - { |
183 |
| - "proactor_threads": 1, |
184 |
| - "cache_mode": "true", |
185 |
| - "maxmemory": "5gb", |
186 |
| - "rss_oom_deny_ratio": 0.8, |
187 |
| - "max_eviction_per_heartbeat": 100, |
188 |
| - } |
| 189 | +@pytest.mark.parametrize( |
| 190 | + "proactor_threads_param, maxmemory_param", |
| 191 | + [(1, 6 * (1024**3)), (4, 6 * (1024**3))], |
189 | 192 | )
|
190 |
| -async def test_cache_eviction_with_rss_deny_oom( |
191 |
| - async_client: aioredis.Redis, |
| 193 | +async def test_cache_eviction_with_rss_deny_oom_simple_case( |
| 194 | + df_factory: DflyInstanceFactory, |
| 195 | + proactor_threads_param, |
| 196 | + maxmemory_param, |
192 | 197 | ):
|
193 | 198 | """
|
194 | 199 | Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
|
195 | 200 | """
|
| 201 | + df_server = df_factory.create( |
| 202 | + proactor_threads=proactor_threads_param, |
| 203 | + cache_mode="true", |
| 204 | + maxmemory=maxmemory_param, |
| 205 | + rss_oom_deny_ratio=0.8, |
| 206 | + ) |
| 207 | + df_server.start() |
196 | 208 |
|
197 |
| - max_memory = 5 * 1024 * 1024 * 1024 # 5G |
198 |
| - rss_max_memory = int(max_memory * 0.8) |
| 209 | + async_client = df_server.client() |
199 | 210 |
|
200 |
| - data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory |
| 211 | + max_memory = maxmemory_param |
| 212 | + rss_oom_deny_ratio = 0.8 |
| 213 | + eviction_memory_budget_threshold = 0.1 # 10% of max_memory |
| 214 | + |
| 215 | + rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold) |
| 216 | + |
| 217 | + data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory |
201 | 218 |
|
202 | 219 | val_size = 1024 * 5 # 5 kb
|
203 | 220 | num_keys = data_fill_size // val_size
|
204 | 221 |
|
205 | 222 | await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
|
206 |
| - # Test that used memory is less than 90% of max memory |
| 223 | + |
| 224 | + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory |
207 | 225 | memory_info = await async_client.info("memory")
|
208 | 226 | assert (
|
209 | 227 | memory_info["used_memory"] < max_memory * 0.9
|
210 |
| - ), "Used memory should be less than 90% of max memory." |
| 228 | + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." |
211 | 229 | assert (
|
212 |
| - memory_info["used_memory_rss"] > rss_max_memory * 0.9 |
213 |
| - ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." |
| 230 | + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio |
| 231 | + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." |
214 | 232 |
|
215 |
| - # Get RSS memory after creating new connections |
216 | 233 | memory_info = await async_client.info("memory")
|
217 |
| - while memory_info["used_memory_rss"] > rss_max_memory * 0.9: |
| 234 | + prev_evicted_keys = 0 |
| 235 | + evicted_keys_repeat_count = 0 |
| 236 | + while True: |
| 237 | + # Wait for some time |
218 | 238 | await asyncio.sleep(1)
|
| 239 | + |
219 | 240 | memory_info = await async_client.info("memory")
|
220 | 241 | logging.info(
|
221 |
| - f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.' |
| 242 | + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' |
222 | 243 | )
|
| 244 | + |
223 | 245 | stats_info = await async_client.info("stats")
|
224 | 246 | logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
|
| 247 | + |
| 248 | + # Check if evicted keys are not increasing |
| 249 | + if prev_evicted_keys == stats_info["evicted_keys"]: |
| 250 | + evicted_keys_repeat_count += 1 |
| 251 | + else: |
| 252 | + prev_evicted_keys = stats_info["evicted_keys"] |
| 253 | + evicted_keys_repeat_count = 1 |
| 254 | + |
| 255 | + if evicted_keys_repeat_count > 2: |
| 256 | + break |
| 257 | + |
| 258 | + # Wait for some time |
| 259 | + await asyncio.sleep(2) |
| 260 | + |
| 261 | + memory_arena = await async_client.execute_command("MEMORY", "ARENA") |
| 262 | + fragmentation_waste = extract_fragmentation_waste(memory_arena) |
| 263 | + logging.info(f"Memory fragmentation waste: {fragmentation_waste}") |
| 264 | + assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%." |
| 265 | + |
| 266 | + # Assert that no more keys are evicted |
| 267 | + memory_info = await async_client.info("memory") |
| 268 | + stats_info = await async_client.info("stats") |
| 269 | + |
| 270 | + assert memory_info["used_memory"] > max_memory * ( |
| 271 | + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.12 |
| 272 | + ), "We should not evict all items." |
| 273 | + assert memory_info["used_memory"] < max_memory * ( |
| 274 | + rss_oom_deny_ratio - eviction_memory_budget_threshold |
| 275 | + ), "Used memory should be smaller than threshold." |
| 276 | + assert memory_info["used_memory_rss"] > max_memory * ( |
| 277 | + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.12 |
| 278 | + ), "We should not evict all items." |
| 279 | + |
| 280 | + evicted_keys = stats_info["evicted_keys"] |
| 281 | + # We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage |
| 282 | + assert ( |
| 283 | + evicted_keys > 0 |
| 284 | + and evicted_keys >= prev_evicted_keys |
| 285 | + and evicted_keys <= prev_evicted_keys * 1.0015 |
| 286 | + ), "We should not evict more items." |
| 287 | + |
| 288 | + |
| 289 | +@pytest.mark.asyncio |
| 290 | +@pytest.mark.parametrize( |
| 291 | + "proactor_threads_param, maxmemory_param", |
| 292 | + [(1, 6 * (1024**3)), (4, 6 * (1024**3))], |
| 293 | +) |
| 294 | +async def test_cache_eviction_with_rss_deny_oom_two_waves( |
| 295 | + df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param |
| 296 | +): |
| 297 | + """ |
| 298 | + Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit |
| 299 | + It is similar to the test_cache_eviction_with_rss_deny_oom_simple_case but here we have two waves of data filling: |
| 300 | + 1. First wave fills the instance to 85% of max memory, which is above rss_oom_deny_ratio. |
| 301 | + 2. Then we wait for eviction to happen based on rss memory usage. After eviction we should have 70% of max memory used. |
| 302 | + 3. Second wave fills the instance to 90% of max memory, which is above rss_oom_deny_ratio. |
| 303 | + 4. Second time eviction should happen |
| 304 | + """ |
| 305 | + df_server = df_factory.create( |
| 306 | + proactor_threads=proactor_threads_param, |
| 307 | + cache_mode="true", |
| 308 | + maxmemory=maxmemory_param, |
| 309 | + rss_oom_deny_ratio=0.8, |
| 310 | + ) |
| 311 | + df_server.start() |
| 312 | + |
| 313 | + async_client = df_server.client() |
| 314 | + |
| 315 | + max_memory = maxmemory_param |
| 316 | + rss_oom_deny_ratio = 0.8 |
| 317 | + eviction_memory_budget_threshold = 0.1 # 10% of max_memory |
| 318 | + |
| 319 | + rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold) |
| 320 | + |
| 321 | + # first wave fills 85% of max memory |
| 322 | + # second wave fills 20% of max memory |
| 323 | + data_fill_size = [ |
| 324 | + int((rss_oom_deny_ratio + 0.05) * max_memory), |
| 325 | + int((1 - rss_oom_deny_ratio) * max_memory), |
| 326 | + ] |
| 327 | + |
| 328 | + val_size = 1024 * 5 # 5 kb |
| 329 | + |
| 330 | + for i in range(2): |
| 331 | + if i > 0: |
| 332 | + await asyncio.sleep(2) |
| 333 | + |
| 334 | + num_keys = data_fill_size[i] // val_size |
| 335 | + logging.info( |
| 336 | + f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}." |
| 337 | + ) |
| 338 | + await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size) |
| 339 | + |
| 340 | + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory |
| 341 | + memory_info = await async_client.info("memory") |
| 342 | + assert ( |
| 343 | + memory_info["used_memory"] < max_memory * 0.9 |
| 344 | + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." |
| 345 | + assert ( |
| 346 | + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio |
| 347 | + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." |
| 348 | + |
| 349 | + memory_info = await async_client.info("memory") |
| 350 | + prev_evicted_keys = 0 |
| 351 | + evicted_keys_repeat_count = 0 |
| 352 | + while True: |
| 353 | + # Wait for some time |
| 354 | + await asyncio.sleep(1) |
| 355 | + |
| 356 | + memory_info = await async_client.info("memory") |
| 357 | + logging.info( |
| 358 | + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' |
| 359 | + ) |
| 360 | + |
| 361 | + stats_info = await async_client.info("stats") |
| 362 | + logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') |
| 363 | + |
| 364 | + # Check if evicted keys are not increasing |
| 365 | + if prev_evicted_keys == stats_info["evicted_keys"]: |
| 366 | + evicted_keys_repeat_count += 1 |
| 367 | + else: |
| 368 | + prev_evicted_keys = stats_info["evicted_keys"] |
| 369 | + evicted_keys_repeat_count = 1 |
| 370 | + |
| 371 | + if evicted_keys_repeat_count > 2: |
| 372 | + break |
| 373 | + |
| 374 | + # Wait for some time |
| 375 | + await asyncio.sleep(2) |
| 376 | + |
| 377 | + memory_arena = await async_client.execute_command("MEMORY", "ARENA") |
| 378 | + fragmentation_waste = extract_fragmentation_waste(memory_arena) |
| 379 | + logging.info(f"Memory fragmentation waste: {fragmentation_waste}") |
| 380 | + assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%." |
| 381 | + |
| 382 | + # Assert that no more keys are evicted |
| 383 | + memory_info = await async_client.info("memory") |
| 384 | + stats_info = await async_client.info("stats") |
| 385 | + |
| 386 | + assert memory_info["used_memory"] > max_memory * ( |
| 387 | + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.12 |
| 388 | + ), "We should not evict all items." |
| 389 | + assert memory_info["used_memory"] < max_memory * ( |
| 390 | + rss_oom_deny_ratio - eviction_memory_budget_threshold |
| 391 | + ), "Used memory should be smaller than threshold." |
| 392 | + assert memory_info["used_memory_rss"] > max_memory * ( |
| 393 | + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.12 |
| 394 | + ), "We should not evict all items." |
| 395 | + |
| 396 | + evicted_keys = stats_info["evicted_keys"] |
| 397 | + # We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage |
| 398 | + assert ( |
| 399 | + evicted_keys > 0 |
| 400 | + and evicted_keys >= prev_evicted_keys |
| 401 | + and evicted_keys <= prev_evicted_keys * 1.0015 |
| 402 | + ), "We should not evict more items." |
0 commit comments