@@ -207,10 +207,8 @@ impl CatalogController {
207
207
}
208
208
} ) ;
209
209
210
- let ( source_fragments, removed_actors, removed_fragments) =
211
- resolve_source_register_info_for_jobs ( & txn, to_drop_streaming_jobs. clone ( ) ) . await ?;
212
-
213
- let fragment_ids = get_fragment_ids_by_jobs ( & txn, to_drop_streaming_jobs. clone ( ) ) . await ?;
210
+ let ( removed_source_fragments, removed_actors, removed_fragments) =
211
+ get_fragments_for_jobs ( & txn, to_drop_streaming_jobs. clone ( ) ) . await ?;
214
212
215
213
// Find affect users with privileges on all this objects.
216
214
let to_update_user_ids: Vec < UserId > = UserPrivilege :: find ( )
@@ -245,10 +243,10 @@ impl CatalogController {
245
243
. notify_frontend ( NotificationOperation :: Delete , relation_group)
246
244
. await ;
247
245
248
- let fragment_mappings = fragment_ids
249
- . into_iter ( )
246
+ let fragment_mappings = removed_fragments
247
+ . iter ( )
250
248
. map ( |fragment_id| PbFragmentWorkerSlotMapping {
251
- fragment_id : fragment_id as _ ,
249
+ fragment_id : * fragment_id as _ ,
252
250
mapping : None ,
253
251
} )
254
252
. collect ( ) ;
@@ -259,11 +257,10 @@ impl CatalogController {
259
257
Ok ( (
260
258
ReleaseContext {
261
259
database_id,
262
- streaming_job_ids : to_drop_streaming_jobs,
263
- state_table_ids : to_drop_state_table_ids,
264
- source_ids : to_drop_source_ids,
265
- connections : vec ! [ ] ,
266
- source_fragments,
260
+ removed_streaming_job_ids : to_drop_streaming_jobs,
261
+ removed_state_table_ids : to_drop_state_table_ids,
262
+ removed_source_ids : to_drop_source_ids,
263
+ removed_source_fragments,
267
264
removed_actors,
268
265
removed_fragments,
269
266
} ,
@@ -418,8 +415,8 @@ impl CatalogController {
418
415
. all ( & txn)
419
416
. await ?;
420
417
421
- let ( source_fragments , removed_actors, removed_fragments) =
422
- resolve_source_register_info_for_jobs ( & txn, streaming_jobs. clone ( ) ) . await ?;
418
+ let ( removed_source_fragments , removed_actors, removed_fragments) =
419
+ get_fragments_for_jobs ( & txn, streaming_jobs. clone ( ) ) . await ?;
423
420
424
421
let state_table_ids: Vec < TableId > = Table :: find ( )
425
422
. select_only ( )
@@ -445,15 +442,6 @@ impl CatalogController {
445
442
. all ( & txn)
446
443
. await ?;
447
444
448
- let connections = Connection :: find ( )
449
- . inner_join ( Object )
450
- . filter ( object:: Column :: DatabaseId . eq ( Some ( database_id) ) )
451
- . all ( & txn)
452
- . await ?
453
- . into_iter ( )
454
- . map ( |conn| conn. connection_id )
455
- . collect_vec ( ) ;
456
-
457
445
// Find affect users with privileges on the database and the objects in the database.
458
446
let to_update_user_ids: Vec < UserId > = UserPrivilege :: find ( )
459
447
. select_only ( )
@@ -503,11 +491,10 @@ impl CatalogController {
503
491
Ok ( (
504
492
ReleaseContext {
505
493
database_id,
506
- streaming_job_ids : streaming_jobs,
507
- state_table_ids,
508
- source_ids,
509
- connections,
510
- source_fragments,
494
+ removed_streaming_job_ids : streaming_jobs,
495
+ removed_state_table_ids : state_table_ids,
496
+ removed_source_ids : source_ids,
497
+ removed_source_fragments,
511
498
removed_actors,
512
499
removed_fragments,
513
500
} ,
0 commit comments