Skip to content

Commit 37bf5f6

Browse files
committed
fix: Implement Bus::chain() recursive pattern for series import
Split Series import into recursive chunks to prevent overloading queue with too many jobs 🔥 - Initial dispatch: Creates a chain of 10 processing jobs + 1 checker job - Chain executes: Jobs run sequentially (guaranteed order) - Checker runs last: Calculates if more series remain - If more work: Dispatches a new chain with next 10 jobs + checker - If done: Triggers STRM sync or sends completion notification ✅ Simple & Robust: No coordinator complexity, just recursive chains ✅ Low Redis Memory: Only 10-11 jobs queued at any time ✅ Self-Healing: If queue is cleared, re-running sync continues from where it left off ✅ Progress Tracking: Logs show exact progress through series ✅ Guaranteed Order: Bus::chain() ensures checker runs after processing jobs CheckSeriesImportProgress.php - Recursively dispatches metadata sync chains CheckSeriesStrmProgress.php - Recursively dispatches STRM sync chains ProcessM3uImportSeriesEpisodes.php - Updated to start first chain SyncSeriesStrmFiles.php - Updated to start first chain Resolves #589
1 parent 1969dcf commit 37bf5f6

File tree

5 files changed

+363
-92
lines changed

5 files changed

+363
-92
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
3+
namespace App\Jobs;
4+
5+
use App\Models\Series;
6+
use App\Models\User;
7+
use App\Settings\GeneralSettings;
8+
use Filament\Notifications\Notification;
9+
use Illuminate\Contracts\Queue\ShouldQueue;
10+
use Illuminate\Foundation\Queue\Queueable;
11+
use Illuminate\Support\Facades\Bus;
12+
use Illuminate\Support\Facades\Log;
13+
14+
/**
15+
* Checks if there are more series to process and dispatches the next chain if needed.
16+
* This runs as the last job in a chain, keeping Redis memory usage low.
17+
*/
18+
class CheckSeriesImportProgress implements ShouldQueue
19+
{
20+
use Queueable;
21+
22+
public $tries = 1;
23+
24+
/**
25+
* How many batch jobs to include in each chain.
26+
* Lower = less Redis memory, more chains
27+
* Higher = fewer chains, more Redis memory
28+
*/
29+
public const JOBS_PER_CHAIN = 10;
30+
31+
public function __construct(
32+
public int $currentOffset,
33+
public int $totalSeries,
34+
public bool $notify = true,
35+
public bool $all_playlists = false,
36+
public ?int $playlist_id = null,
37+
public bool $overwrite_existing = false,
38+
public ?int $user_id = null,
39+
public ?bool $sync_stream_files = true,
40+
public ?float $startedAt = null,
41+
) {
42+
// Track start time on first checker
43+
if ($this->startedAt === null) {
44+
$this->startedAt = microtime(true);
45+
}
46+
}
47+
48+
public function handle(GeneralSettings $settings): void
49+
{
50+
$batchSize = ProcessM3uImportSeriesEpisodes::BATCH_SIZE;
51+
$seriesProcessed = $this->currentOffset;
52+
$seriesRemaining = $this->totalSeries - $seriesProcessed;
53+
54+
Log::info('Series Import: Progress check', [
55+
'processed' => $seriesProcessed,
56+
'total' => $this->totalSeries,
57+
'remaining' => $seriesRemaining,
58+
'progress_pct' => round(($seriesProcessed / $this->totalSeries) * 100, 1),
59+
]);
60+
61+
if ($seriesRemaining <= 0) {
62+
// All done! Trigger STRM sync if needed
63+
Log::info('Series Import: All metadata batches complete', [
64+
'total_series' => $this->totalSeries,
65+
]);
66+
67+
if ($this->sync_stream_files && $settings->stream_file_sync_enabled) {
68+
Log::info('Series Import: Dispatching STRM sync');
69+
dispatch(new SyncSeriesStrmFiles(
70+
series: null,
71+
notify: true,
72+
all_playlists: $this->all_playlists,
73+
playlist_id: $this->playlist_id,
74+
user_id: $this->user_id,
75+
));
76+
}
77+
78+
// Send completion notification
79+
if ($this->notify && $this->user_id) {
80+
$user = User::find($this->user_id);
81+
if ($user) {
82+
$duration = round(microtime(true) - $this->startedAt, 2);
83+
$minutes = floor($duration / 60);
84+
$seconds = $duration % 60;
85+
$timeStr = $minutes > 0
86+
? "{$minutes} minute(s) and " . round($seconds, 0) . " second(s)"
87+
: round($seconds, 1) . " second(s)";
88+
89+
Notification::make()
90+
->success()
91+
->title('Series Metadata Sync Complete')
92+
->body("Successfully processed {$this->totalSeries} series in {$timeStr}.")
93+
->broadcast($user)
94+
->sendToDatabase($user);
95+
}
96+
}
97+
98+
return;
99+
}
100+
101+
// More series to process - dispatch next chain
102+
$jobs = [];
103+
$jobsInThisChain = min(self::JOBS_PER_CHAIN, (int) ceil($seriesRemaining / $batchSize));
104+
105+
for ($i = 0; $i < $jobsInThisChain; $i++) {
106+
$offset = $seriesProcessed + ($i * $batchSize);
107+
$batchNumber = (int) floor($offset / $batchSize) + 1;
108+
$totalBatches = (int) ceil($this->totalSeries / $batchSize);
109+
110+
$jobs[] = new ProcessM3uImportSeriesEpisodes(
111+
playlistSeries: null,
112+
notify: false,
113+
all_playlists: $this->all_playlists,
114+
playlist_id: $this->playlist_id,
115+
overwrite_existing: $this->overwrite_existing,
116+
user_id: $this->user_id,
117+
sync_stream_files: false, // Don't trigger per-job STRM sync
118+
batchOffset: $offset,
119+
totalBatches: $totalBatches,
120+
currentBatch: $batchNumber,
121+
);
122+
}
123+
124+
// Add checker as last job in chain
125+
$nextOffset = $seriesProcessed + ($jobsInThisChain * $batchSize);
126+
$jobs[] = new self(
127+
currentOffset: $nextOffset,
128+
totalSeries: $this->totalSeries,
129+
notify: $this->notify,
130+
all_playlists: $this->all_playlists,
131+
playlist_id: $this->playlist_id,
132+
overwrite_existing: $this->overwrite_existing,
133+
user_id: $this->user_id,
134+
sync_stream_files: $this->sync_stream_files,
135+
startedAt: $this->startedAt,
136+
);
137+
138+
Log::info('Series Import: Dispatching next chain', [
139+
'jobs_in_chain' => $jobsInThisChain,
140+
'next_offset' => $nextOffset,
141+
'series_processed_after_chain' => $nextOffset,
142+
]);
143+
144+
Bus::chain($jobs)->dispatch();
145+
}
146+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<?php
2+
3+
namespace App\Jobs;
4+
5+
use App\Models\Series;
6+
use App\Models\User;
7+
use Filament\Notifications\Notification;
8+
use Illuminate\Contracts\Queue\ShouldQueue;
9+
use Illuminate\Foundation\Queue\Queueable;
10+
use Illuminate\Support\Facades\Bus;
11+
use Illuminate\Support\Facades\Log;
12+
13+
/**
14+
* Checks if there are more series STRM files to sync and dispatches the next chain if needed.
15+
*/
16+
class CheckSeriesStrmProgress implements ShouldQueue
17+
{
18+
use Queueable;
19+
20+
public $tries = 1;
21+
22+
public const JOBS_PER_CHAIN = 10;
23+
24+
public function __construct(
25+
public int $currentOffset,
26+
public int $totalSeries,
27+
public bool $notify = true,
28+
public bool $all_playlists = false,
29+
public ?int $playlist_id = null,
30+
public ?int $user_id = null,
31+
public bool $needsCleanup = false,
32+
) {
33+
$this->onQueue('file_sync');
34+
}
35+
36+
public function handle(): void
37+
{
38+
$batchSize = SyncSeriesStrmFiles::BATCH_SIZE;
39+
$seriesProcessed = $this->currentOffset;
40+
$seriesRemaining = $this->totalSeries - $seriesProcessed;
41+
42+
Log::info('STRM Sync: Progress check', [
43+
'processed' => $seriesProcessed,
44+
'total' => $this->totalSeries,
45+
'remaining' => $seriesRemaining,
46+
'progress_pct' => round(($seriesProcessed / $this->totalSeries) * 100, 1),
47+
]);
48+
49+
if ($seriesRemaining <= 0) {
50+
// All done! Run cleanup if needed
51+
Log::info('STRM Sync: All batches complete', [
52+
'total_series' => $this->totalSeries,
53+
]);
54+
55+
if ($this->needsCleanup) {
56+
Log::info('STRM Sync: Dispatching cleanup job');
57+
dispatch(new SyncSeriesStrmFiles(
58+
series: null,
59+
notify: $this->notify,
60+
all_playlists: $this->all_playlists,
61+
playlist_id: $this->playlist_id,
62+
user_id: $this->user_id,
63+
isCleanupJob: true,
64+
));
65+
} else {
66+
// Send completion notification
67+
if ($this->notify && $this->user_id) {
68+
$user = User::find($this->user_id);
69+
if ($user) {
70+
Notification::make()
71+
->success()
72+
->title('STRM File Sync Complete')
73+
->body("Successfully synced {$this->totalSeries} series.")
74+
->broadcast($user)
75+
->sendToDatabase($user);
76+
}
77+
}
78+
}
79+
80+
return;
81+
}
82+
83+
// More series to process - dispatch next chain
84+
$jobs = [];
85+
$jobsInThisChain = min(self::JOBS_PER_CHAIN, (int) ceil($seriesRemaining / $batchSize));
86+
87+
for ($i = 0; $i < $jobsInThisChain; $i++) {
88+
$offset = $seriesProcessed + ($i * $batchSize);
89+
$batchNumber = (int) floor($offset / $batchSize) + 1;
90+
$totalBatches = (int) ceil($this->totalSeries / $batchSize);
91+
92+
$jobs[] = new SyncSeriesStrmFiles(
93+
series: null,
94+
notify: false,
95+
all_playlists: $this->all_playlists,
96+
playlist_id: $this->playlist_id,
97+
user_id: $this->user_id,
98+
batchOffset: $offset,
99+
totalBatches: $totalBatches,
100+
currentBatch: $batchNumber,
101+
);
102+
}
103+
104+
// Add checker as last job in chain
105+
$nextOffset = $seriesProcessed + ($jobsInThisChain * $batchSize);
106+
$jobs[] = new self(
107+
currentOffset: $nextOffset,
108+
totalSeries: $this->totalSeries,
109+
notify: $this->notify,
110+
all_playlists: $this->all_playlists,
111+
playlist_id: $this->playlist_id,
112+
user_id: $this->user_id,
113+
needsCleanup: $this->needsCleanup,
114+
);
115+
116+
Log::info('STRM Sync: Dispatching next chain', [
117+
'jobs_in_chain' => $jobsInThisChain,
118+
'next_offset' => $nextOffset,
119+
]);
120+
121+
Bus::chain($jobs)->dispatch();
122+
}
123+
}

app/Jobs/ProcessM3uImportSeries.php

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -73,45 +73,22 @@ public function handle(): void
7373
public function importSeries(): void
7474
{
7575
try {
76-
$jobs = [];
77-
$series = $this->playlist->series()->where('enabled', true)->cursor();
78-
foreach ($series as $seriesItem) {
79-
$jobs[] = new ProcessM3uImportSeriesEpisodes(
80-
playlistSeries: $seriesItem,
81-
notify: false, // don't notify user for bulk syncs
82-
);
83-
}
84-
$playlist = $this->playlist;
85-
Bus::chain($jobs)
86-
->onConnection('redis') // force to use redis connection
87-
->onQueue('import')
88-
->catch(function (Throwable $e) use ($playlist) {
89-
$error = "Error processing series sync on \"{$playlist->name}\": {$e->getMessage()}";
90-
Log::error($error);
91-
Notification::make()
92-
->danger()
93-
->title("Error processing series sync on \"{$playlist->name}\"")
94-
->body('Please view your notifications for details.')
95-
->broadcast($playlist->user);
96-
Notification::make()
97-
->danger()
98-
->title("Error processing series sync on \"{$playlist->name}\"")
99-
->body($error)
100-
->sendToDatabase($playlist->user);
101-
$playlist->update([
102-
'status' => Status::Failed,
103-
'synced' => now(),
104-
'errors' => $error,
105-
'series_progress' => 100,
106-
'processing' => [
107-
...$playlist->processing ?? [],
108-
'series_processing' => false,
109-
],
110-
]);
76+
// Use the new bulk dispatcher pattern instead of creating individual jobs
77+
// This prevents flooding Redis with thousands of jobs
78+
dispatch(new ProcessM3uImportSeriesEpisodes(
79+
playlistSeries: null, // null triggers bulk mode
80+
notify: true,
81+
all_playlists: false,
82+
playlist_id: $this->playlist->id,
83+
overwrite_existing: false,
84+
user_id: $this->playlist->user_id,
85+
sync_stream_files: true,
86+
));
11187

112-
// Fire the playlist synced event
113-
event(new SyncCompleted($playlist));
114-
})->dispatch();
88+
Log::info('ProcessM3uImportSeries: Dispatched bulk series sync', [
89+
'playlist_id' => $this->playlist->id,
90+
'playlist_name' => $this->playlist->name,
91+
]);
11592
} catch (Exception $e) {
11693
// Update the playlist status to error
11794
$error = Str::limit($e->getMessage(), 255);

0 commit comments

Comments
 (0)