Skip to content

Commit ef2bc13

Browse files
committed
Merge branch 'dev' into experimental
2 parents 44d0ea4 + 9425b41 commit ef2bc13

File tree

3 files changed

+344
-100
lines changed

3 files changed

+344
-100
lines changed

app/Jobs/ProcessM3uImportSeries.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ public function importSeries(): void
8181
notify: false, // don't notify user for bulk syncs
8282
);
8383
}
84-
$jobs[] = new ProcessM3uImportSeriesComplete(
85-
playlist: $this->playlist,
86-
batchNo: $this->batchNo,
87-
);
8884
$playlist = $this->playlist;
8985
Bus::chain($jobs)
9086
->onConnection('redis') // force to use redis connection

app/Jobs/ProcessM3uImportSeriesEpisodes.php

Lines changed: 174 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@
99
use Filament\Notifications\Notification;
1010
use Illuminate\Contracts\Queue\ShouldQueue;
1111
use Illuminate\Foundation\Queue\Queueable;
12+
use Illuminate\Support\Facades\Log;
1213

1314
class ProcessM3uImportSeriesEpisodes implements ShouldQueue
1415
{
15-
use Queueable;
1616
use ProviderRequestDelay;
17+
use Queueable;
1718

1819
// Don't retry the job on failure
1920
public $tries = 1;
2021

22+
/**
23+
* Batch size for processing series.
24+
* Each batch is dispatched as a separate job to prevent timeouts.
25+
*/
26+
public const BATCH_SIZE = 100;
27+
2128
/**
2229
* Create a new job instance.
2330
*/
@@ -29,6 +36,9 @@ public function __construct(
2936
public bool $overwrite_existing = false,
3037
public ?int $user_id = null,
3138
public ?bool $sync_stream_files = true,
39+
public ?int $batchOffset = null, // For batch processing: starting offset
40+
public ?int $totalBatches = null, // For tracking progress
41+
public ?int $currentBatch = null, // Current batch number (1-indexed)
3242
) {}
3343

3444
/**
@@ -45,52 +55,181 @@ public function handle(GeneralSettings $settings): void
4555
];
4656

4757
if ($series) {
58+
// Single series processing
4859
$this->fetchMetadataForSeries($series, $global_sync_settings);
60+
} elseif ($this->batchOffset !== null) {
61+
// Batch processing mode - process a specific batch
62+
$this->processBatch($settings, $global_sync_settings);
4963
} else {
50-
// Disable notifications for bulk processing
51-
$this->notify = false;
52-
53-
// Process all series in chunks
54-
Series::query()
55-
->where([
56-
['enabled', true],
57-
['user_id', $this->user_id],
58-
])
59-
->when($this->playlist_id, function ($query) {
60-
$query->where('playlist_id', $this->playlist_id);
61-
})
64+
// Initial dispatch - calculate batches and dispatch them
65+
$this->dispatchBatches($settings);
66+
}
67+
}
68+
69+
/**
70+
* Calculate total series count and dispatch batch jobs.
71+
* This prevents one giant job from running for hours.
72+
*/
73+
private function dispatchBatches(GeneralSettings $settings): void
74+
{
75+
// Count total series to process
76+
$totalCount = Series::query()
77+
->where([
78+
['enabled', true],
79+
['user_id', $this->user_id],
80+
])
81+
->when($this->playlist_id, function ($query) {
82+
$query->where('playlist_id', $this->playlist_id);
83+
})
84+
->count();
85+
86+
if ($totalCount === 0) {
87+
Log::info('Series Sync: No series to process');
88+
89+
return;
90+
}
91+
92+
$totalBatches = (int) ceil($totalCount / self::BATCH_SIZE);
93+
94+
Log::info('Series Sync: Dispatching batch jobs', [
95+
'total_series' => $totalCount,
96+
'batch_size' => self::BATCH_SIZE,
97+
'total_batches' => $totalBatches,
98+
'user_id' => $this->user_id,
99+
'playlist_id' => $this->playlist_id,
100+
]);
101+
102+
// Dispatch batch jobs
103+
for ($batch = 0; $batch < $totalBatches; $batch++) {
104+
$offset = $batch * self::BATCH_SIZE;
105+
106+
dispatch(new self(
107+
playlistSeries: null,
108+
notify: false,
109+
all_playlists: $this->all_playlists,
110+
playlist_id: $this->playlist_id,
111+
overwrite_existing: $this->overwrite_existing,
112+
user_id: $this->user_id,
113+
sync_stream_files: $this->sync_stream_files,
114+
batchOffset: $offset,
115+
totalBatches: $totalBatches,
116+
currentBatch: $batch + 1,
117+
));
118+
}
119+
120+
// Dispatch the STRM sync job at the end (will be queued after all batch jobs)
121+
$global_sync_settings = [
122+
'enabled' => $settings->stream_file_sync_enabled ?? false,
123+
];
124+
125+
if ($global_sync_settings['enabled'] && $this->sync_stream_files) {
126+
dispatch(new SyncSeriesStrmFiles(
127+
series: null,
128+
notify: true,
129+
all_playlists: $this->all_playlists,
130+
playlist_id: $this->playlist_id,
131+
user_id: $this->user_id,
132+
));
133+
}
134+
135+
// Notify user that batches were dispatched
136+
if ($this->user_id) {
137+
$user = User::find($this->user_id);
138+
if ($user) {
139+
Notification::make()
140+
->info()
141+
->title('Series Sync Started')
142+
->body("Processing {$totalCount} series in {$totalBatches} batches...")
143+
->broadcast($user)
144+
->sendToDatabase($user);
145+
}
146+
}
147+
}
148+
149+
/**
150+
* Process a specific batch of series.
151+
*/
152+
private function processBatch(GeneralSettings $settings, array $global_sync_settings): void
153+
{
154+
$startTime = microtime(true);
155+
$processedCount = 0;
156+
157+
Log::info("Series Sync: Processing batch {$this->currentBatch}/{$this->totalBatches}", [
158+
'offset' => $this->batchOffset,
159+
'batch_size' => self::BATCH_SIZE,
160+
]);
161+
162+
// Get series IDs for this batch (using offset/limit instead of chunkById)
163+
$seriesIds = Series::query()
164+
->where([
165+
['enabled', true],
166+
['user_id', $this->user_id],
167+
])
168+
->when($this->playlist_id, function ($query) {
169+
$query->where('playlist_id', $this->playlist_id);
170+
})
171+
->orderBy('id')
172+
->skip($this->batchOffset)
173+
->take(self::BATCH_SIZE)
174+
->pluck('id')
175+
->toArray();
176+
177+
// Process in smaller chunks for memory management
178+
foreach (array_chunk($seriesIds, 10) as $chunkIds) {
179+
$seriesChunk = Series::query()
180+
->whereIn('id', $chunkIds)
62181
->with(['playlist'])
63-
->chunkById(100, function ($seriesChunk) use ($global_sync_settings) {
64-
foreach ($seriesChunk as $series) {
65-
$this->fetchMetadataForSeries($series, $global_sync_settings);
66-
}
67-
});
68-
69-
// Notify the user we're done!
70-
if ($this->user_id) {
71-
$user = User::find($this->user_id);
72-
if ($user) {
73-
Notification::make()
74-
->success()
75-
->title("Series Sync Completed")
76-
->body("Series sync completed successfully for all series.")
77-
->broadcast($user)
78-
->sendToDatabase($user);
79-
}
182+
->get();
183+
184+
foreach ($seriesChunk as $series) {
185+
// Pass dispatchSync: false to prevent per-series job dispatch
186+
$this->fetchMetadataForSeries($series, $global_sync_settings, dispatchSync: false);
187+
$processedCount++;
188+
}
189+
190+
// Clear memory after each mini-chunk
191+
unset($seriesChunk);
192+
gc_collect_cycles();
193+
}
194+
195+
$duration = round(microtime(true) - $startTime, 2);
196+
Log::info("Series Sync: Batch {$this->currentBatch}/{$this->totalBatches} completed", [
197+
'processed' => $processedCount,
198+
'duration_seconds' => $duration,
199+
]);
200+
201+
// On last batch, notify user
202+
if ($this->currentBatch === $this->totalBatches && $this->user_id) {
203+
$user = User::find($this->user_id);
204+
if ($user) {
205+
Notification::make()
206+
->success()
207+
->title('Series Metadata Sync Completed')
208+
->body("All {$this->totalBatches} batches processed successfully.")
209+
->broadcast($user)
210+
->sendToDatabase($user);
80211
}
81212
}
82213
}
83214

84-
private function fetchMetadataForSeries($series, $settings)
215+
/**
216+
* Fetch metadata for a single series.
217+
*
218+
* @param bool $dispatchSync Whether to dispatch sync job (false for bulk mode)
219+
*/
220+
private function fetchMetadataForSeries($series, $settings, bool $dispatchSync = true)
85221
{
86222
// Get the playlist
87223
$playlist = $series->playlist;
88224

225+
// In bulk mode (dispatchSync=false), don't trigger per-series sync
226+
$shouldSync = $dispatchSync && $this->sync_stream_files;
227+
89228
// Use provider throttling to limit concurrent requests and apply delay
90-
$results = $this->withProviderThrottling(function () use ($series) {
229+
$results = $this->withProviderThrottling(function () use ($series, $shouldSync) {
91230
return $series->fetchMetadata(
92231
refresh: $this->overwrite_existing,
93-
sync: $this->sync_stream_files
232+
sync: $shouldSync
94233
);
95234
});
96235

@@ -100,9 +239,9 @@ private function fetchMetadataForSeries($series, $settings)
100239
$syncStrmFiles = $settings['enabled'] ?? $sync_settings['enabled'] ?? false;
101240
$body = "Series sync completed successfully for \"{$series->name}\". Imported {$results} episodes.";
102241
if ($syncStrmFiles) {
103-
$body .= " .strm file sync is enabled, syncing now.";
242+
$body .= ' .strm file sync is enabled, syncing now.';
104243
} else {
105-
$body .= " .strm file sync is not enabled.";
244+
$body .= ' .strm file sync is not enabled.';
106245
}
107246
Notification::make()
108247
->success()

0 commit comments

Comments
 (0)