Skip to content

Commit 6bf2d2a

Browse files
committed
artifact cache cancellation mechanism - use in dbgi searching, since this gets huge (& is often not needed quickly after querying)
1 parent 0d8590a commit 6bf2d2a

File tree

3 files changed

+96
-18
lines changed

3 files changed

+96
-18
lines changed

src/artifact_cache/artifact_cache.c

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ ac_init(void)
1818
ac_shared->req_batches[idx].mutex = mutex_alloc();
1919
ac_shared->req_batches[idx].arena = arena_alloc();
2020
}
21+
ac_shared->cancel_thread = thread_launch(ac_cancel_thread_entry_point, 0);
2122
}
2223

2324
////////////////////////////////
@@ -161,8 +162,8 @@ ac_artifact_from_key_(Access *access, String8 key, AC_ArtifactParams *params, U6
161162
}
162163
n->v.key = str8_copy(req_batch->arena, key);
163164
n->v.gen = params->gen;
165+
n->v.cancel_signal = &node->cancelled;
164166
n->v.create = params->create;
165-
n->v.cancel_signal = params->cancel_signal;
166167
}
167168
cond_var_broadcast(async_tick_start_cond_var);
168169
ins_atomic_u32_eval_assign(&async_loop_again, 1);
@@ -576,3 +577,60 @@ ac_async_tick(void)
576577

577578
scratch_end(scratch);
578579
}
580+
581+
////////////////////////////////
582+
//~ rjf: Cancel Thread
583+
584+
internal void
585+
ac_cancel_thread_entry_point(void *p)
586+
{
587+
for(;;)
588+
{
589+
os_sleep_milliseconds(100);
590+
591+
//- rjf: scan in-flight nodes for expiration
592+
for EachIndex(cache_slot_idx, ac_shared->cache_slots_count)
593+
{
594+
Stripe *cache_stripe = stripe_from_slot_idx(&ac_shared->cache_stripes, cache_slot_idx);
595+
RWMutexScope(cache_stripe->rw_mutex, 0)
596+
{
597+
for EachNode(cache, AC_Cache, ac_shared->cache_slots[cache_slot_idx])
598+
{
599+
Rng1U64 slot_range = lane_range(cache->slots_count);
600+
for EachInRange(slot_idx, slot_range)
601+
{
602+
AC_Slot *slot = &cache->slots[slot_idx];
603+
Stripe *stripe = stripe_from_slot_idx(&cache->stripes, slot_idx);
604+
for(B32 write_mode = 0; write_mode <= 1; write_mode += 1)
605+
{
606+
B32 slot_has_work = 0;
607+
RWMutexScope(stripe->rw_mutex, write_mode)
608+
{
609+
for(AC_Node *n = slot->first, *next = 0; n != 0; n = next)
610+
{
611+
next = n->next;
612+
if(access_pt_is_expired(&n->access_pt, .time = n->evict_threshold_us) && ins_atomic_u64_eval(&n->working_count) > 0)
613+
{
614+
slot_has_work = 1;
615+
if(!write_mode)
616+
{
617+
break;
618+
}
619+
else
620+
{
621+
n->cancelled = 1;
622+
}
623+
}
624+
}
625+
}
626+
if(!slot_has_work)
627+
{
628+
break;
629+
}
630+
}
631+
}
632+
}
633+
}
634+
}
635+
}
636+
}

src/artifact_cache/artifact_cache.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ struct AC_ArtifactParams
3737
U64 gen;
3838
U64 evict_threshold_us;
3939
B32 *stale_out;
40-
B32 *cancel_signal;
4140
AC_Flags flags;
4241
};
4342

@@ -127,6 +126,9 @@ struct AC_Shared
127126

128127
// rjf: requests
129128
AC_RequestBatch req_batches[2]; // 0: high priority, 1: low priority
129+
130+
// rjf: cancel thread
131+
Thread cancel_thread;
130132
};
131133

132134
////////////////////////////////
@@ -139,11 +141,6 @@ global AC_Shared *ac_shared = 0;
139141

140142
internal void ac_init(void);
141143

142-
////////////////////////////////
143-
//~ rjf: Helpers
144-
145-
internal B32 ac_cancelled(void);
146-
147144
////////////////////////////////
148145
//~ rjf: Cache Lookups
149146

@@ -155,4 +152,9 @@ internal AC_Artifact ac_artifact_from_key_(Access *access, String8 key, AC_Artif
155152

156153
internal void ac_async_tick(void);
157154

155+
////////////////////////////////
156+
//~ rjf: Cancel Thread
157+
158+
internal void ac_cancel_thread_entry_point(void *p);
159+
158160
#endif // ARTIFACT_CACHE_H

src/dbg_info/dbg_info.c

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,9 +1142,10 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
11421142
Rng1U64 range = lane_range(element_count);
11431143
for EachInRange(idx, range)
11441144
{
1145-
//- rjf: every so often, check if we need to cancel, and cancel
1145+
//- rjf: every so often, check if we need to cancel, and cancel
1146+
if(idx%10000 == 0 && !!ins_atomic_u32_eval(cancel_signal))
11461147
{
1147-
// TODO(rjf)
1148+
break;
11481149
}
11491150

11501151
//- rjf: get element, map to string; if empty, continue to next element
@@ -1244,7 +1245,15 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
12441245
}
12451246
}
12461247
lane_sync();
1247-
1248+
1249+
//- rjf: decide if we cancelled
1250+
B32 cancelled = 0;
1251+
if(lane_idx() == 0 && !!ins_atomic_u32_eval(cancel_signal))
1252+
{
1253+
cancelled = 1;
1254+
}
1255+
lane_sync_u64(&cancelled, 0);
1256+
12481257
//- rjf: produce sort records
12491258
typedef struct SortRecord SortRecord;
12501259
struct SortRecord
@@ -1255,7 +1264,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
12551264
U64 sort_records_count = all_items->total_count;
12561265
SortRecord *sort_records = 0;
12571266
SortRecord *sort_records__swap = 0;
1258-
ProfScope("produce sort records")
1267+
if(!cancelled) ProfScope("produce sort records")
12591268
{
12601269
if(lane_idx() == 0)
12611270
{
@@ -1283,7 +1292,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
12831292
lane_sync();
12841293

12851294
//- rjf: sort records
1286-
ProfScope("sort records")
1295+
if(!cancelled) ProfScope("sort records")
12871296
{
12881297
//- rjf: set up common data
12891298
U64 bits_per_digit = 8;
@@ -1382,7 +1391,7 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
13821391

13831392
//- rjf: produce final array
13841393
DI_SearchItemArray items = {0};
1385-
ProfScope("produce final array")
1394+
if(!cancelled) ProfScope("produce final array")
13861395
{
13871396
if(lane_idx() == 0)
13881397
{
@@ -1401,11 +1410,20 @@ di_search_artifact_create(String8 key, B32 *cancel_signal, B32 *retry_out, U64 *
14011410
}
14021411
lane_sync();
14031412

1404-
//- rjf: bundle as artifact
1405-
artifact.u64[0] = (U64)arenas;
1406-
artifact.u64[1] = arenas_count;
1407-
artifact.u64[2] = (U64)items.v;
1408-
artifact.u64[3] = items.count;
1413+
//- rjf: bundle as artifact
1414+
if(!cancelled)
1415+
{
1416+
artifact.u64[0] = (U64)arenas;
1417+
artifact.u64[1] = arenas_count;
1418+
artifact.u64[2] = (U64)items.v;
1419+
artifact.u64[3] = items.count;
1420+
}
1421+
1422+
//- rjf: release results on cancel
1423+
else
1424+
{
1425+
arena_release(arena);
1426+
}
14091427
}
14101428
scratch_end(scratch);
14111429
access_close(access);

0 commit comments

Comments
 (0)