diff --git a/.github/changelog/1521-from-description b/.github/changelog/1521-from-description new file mode 100644 index 0000000000..a91fb249eb --- /dev/null +++ b/.github/changelog/1521-from-description @@ -0,0 +1,4 @@ +Significance: minor +Type: changed + +Batch processing jobs can now be scheduled with individual hooks. diff --git a/includes/class-dispatcher.php b/includes/class-dispatcher.php index b76f00d1c5..cbd46add0b 100644 --- a/includes/class-dispatcher.php +++ b/includes/class-dispatcher.php @@ -27,13 +27,6 @@ class Dispatcher { */ public static $batch_size = ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE; - /** - * Callback for the async batch processing. - * - * @var array - */ - public static $callback = array( self::class, 'send_to_followers' ); - /** * Error codes that qualify for a retry. * @@ -103,8 +96,8 @@ public static function process_outbox( $id ) { self::send_to_additional_inboxes( $activity, $actor->get__id(), $outbox_item ); if ( self::should_send_to_followers( $activity, $actor, $outbox_item ) ) { - Scheduler::async_batch( - self::$callback, + \do_action( + 'activitypub_send_activity', $outbox_item->ID, self::$batch_size, \get_post_meta( $outbox_item->ID, '_activitypub_outbox_offset', true ) ?: 0 // phpcs:ignore @@ -253,13 +246,8 @@ private static function schedule_retry( $retries, $outbox_item_id, $attempt = 1 \wp_schedule_single_event( \time() + ( $attempt * $attempt * HOUR_IN_SECONDS ), - 'activitypub_async_batch', - array( - array( self::class, 'retry_send_to_followers' ), - $transient_key, - $outbox_item_id, - $attempt, - ) + 'activitypub_retry_activity', + array( $transient_key, $outbox_item_id, $attempt ) ); } diff --git a/includes/class-scheduler.php b/includes/class-scheduler.php index 2e2ca070a5..c15c89ecef 100644 --- a/includes/class-scheduler.php +++ b/includes/class-scheduler.php @@ -15,7 +15,6 @@ use Activitypub\Collection\Actors; use Activitypub\Collection\Outbox; use Activitypub\Collection\Followers; -use Activitypub\Transformer\Factory; /** * Scheduler class. @@ -38,8 +37,8 @@ public static function init() { self::register_schedulers(); self::$batch_callbacks = array( - Dispatcher::$callback, - array( Dispatcher::class, 'retry_send_to_followers' ), + 'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ), + 'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ), ); // Follower Cleanups. @@ -48,6 +47,8 @@ public static function init() { // Event callbacks. \add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 ); + \add_action( 'activitypub_send_activity', array( self::class, 'async_batch' ), 10, 3 ); + \add_action( 'activitypub_retry_activity', array( self::class, 'async_batch' ), 10, 3 ); \add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) ); \add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) ); @@ -199,17 +200,6 @@ public static function schedule_outbox_activity_for_federation( $id, $offset = 0 * Reprocess the outbox. */ public static function reprocess_outbox() { - // Bail if there is a pending batch. - if ( self::next_scheduled_hook( 'activitypub_async_batch' ) ) { - return; - } - - // Bail if there is a batch in progress. - $key = \md5( \serialize( Dispatcher::$callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize - if ( self::is_locked( $key ) ) { - return; - } - $ids = \get_posts( array( 'post_type' => Outbox::POST_TYPE, @@ -220,6 +210,18 @@ public static function reprocess_outbox() { ); foreach ( $ids as $id ) { + // Bail if there is a pending batch. + $offset = \get_post_meta( $id, '_activitypub_outbox_offset', true ) ?: 0; // phpcs:ignore + if ( \wp_next_scheduled( 'activitypub_send_activity', array( $id, ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE, $offset ) ) ) { + return; + } + + // Bail if there is a batch in progress. + $key = \md5( \serialize( $id ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize + if ( self::is_locked( $key ) ) { + return; + } + self::schedule_outbox_activity_for_federation( $id ); } } @@ -278,42 +280,39 @@ public static function handle_outbox_purge_days_update( $old_value, $value ) { * The batching part is optional and only comes into play if the callback returns anything. * Beyond that it's a helper to run a callback asynchronously with locking to prevent simultaneous processing. * - * @param callable $callback Callable processing routine. - * @params mixed ...$args Optional. Parameters that get passed to the callback. + * @params mixed ...$args Optional. Parameters that get passed to the callback. */ - public static function async_batch( $callback ) { - if ( ! in_array( $callback, self::$batch_callbacks, true ) || ! \is_callable( $callback ) ) { - _doing_it_wrong( __METHOD__, 'The first argument must be a valid callback.', '5.2.0' ); + public static function async_batch() { + $args = \func_get_args(); // phpcs:ignore PHPCompatibility.FunctionUse.ArgumentFunctionsReportCurrentValue + $callback = self::$batch_callbacks[ \current_action() ] ?? $args[0] ?? null; + if ( ! \is_callable( $callback ) ) { + \_doing_it_wrong( __METHOD__, 'There must be a valid callback associated with the current action.', '5.2.0' ); return; } - $args = \func_get_args(); // phpcs:ignore PHPCompatibility.FunctionUse.ArgumentFunctionsReportCurrentValue - $key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize + $key = \md5( \serialize( $args[0] ?? $args ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize // Bail if the existing lock is still valid. if ( self::is_locked( $key ) ) { - \wp_schedule_single_event( time() + MINUTE_IN_SECONDS, 'activitypub_async_batch', $args ); + \wp_schedule_single_event( \time() + MINUTE_IN_SECONDS, \current_action(), $args ); return; } self::lock( $key ); - $callback = array_shift( $args ); // Remove $callback from arguments. - $next = \call_user_func_array( $callback, $args ); + if ( \is_callable( $args[0] ) ) { + $callback = \array_shift( $args ); // Remove $callback from arguments. + } + $next = \call_user_func_array( $callback, $args ); self::unlock( $key ); if ( ! empty( $next ) ) { // Schedule the next run, adding the result to the arguments. - \wp_schedule_single_event( - \time() + 30, - 'activitypub_async_batch', - \array_merge( array( $callback ), \array_values( $next ) ) - ); + \wp_schedule_single_event( \time() + 30, \current_action(), \array_values( $next ) ); } } - /** * Locks the async batch process for individual callbacks to prevent simultaneous processing. * @@ -365,37 +364,13 @@ public static function is_locked( $key ) { return true; } - /** - * Get the next scheduled hook. - * - * @param string $hook The hook name. - * @return int|bool The timestamp of the next scheduled hook, or false if none found. - */ - private static function next_scheduled_hook( $hook ) { - $crons = _get_cron_array(); - if ( empty( $crons ) ) { - return false; - } - - // Get next event. - $next = false; - foreach ( $crons as $timestamp => $cron ) { - if ( isset( $cron[ $hook ] ) ) { - $next = $timestamp; - break; - } - } - - return $next; - } - /** * Send announces. * - * @param int $outbox_activity_id The outbox activity ID. - * @param \Activitypub\Activity\Activity $activity The activity object. - * @param int $actor_id The actor ID. - * @param int $content_visibility The content visibility. + * @param int $outbox_activity_id The outbox activity ID. + * @param Activity $activity The activity object. + * @param int $actor_id The actor ID. + * @param int $content_visibility The content visibility. */ public static function schedule_announce_activity( $outbox_activity_id, $activity, $actor_id, $content_visibility ) { // Only if we're in both Blog and User modes. diff --git a/includes/collection/class-outbox.php b/includes/collection/class-outbox.php index cf1dff89fd..409c78be13 100644 --- a/includes/collection/class-outbox.php +++ b/includes/collection/class-outbox.php @@ -141,14 +141,13 @@ private static function invalidate_existing_items( $object_id, $activity_type, $ foreach ( $existing_items as $existing_item_id ) { $event_args = array( - Dispatcher::$callback, $existing_item_id, Dispatcher::$batch_size, \get_post_meta( $existing_item_id, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore ); - $timestamp = \wp_next_scheduled( 'activitypub_async_batch', $event_args ); - \wp_unschedule_event( $timestamp, 'activitypub_async_batch', $event_args ); + $timestamp = \wp_next_scheduled( 'activitypub_send_activity', $event_args ); + \wp_unschedule_event( $timestamp, 'activitypub_send_activity', $event_args ); $timestamp = \wp_next_scheduled( 'activitypub_process_outbox', array( $existing_item_id ) ); \wp_unschedule_event( $timestamp, 'activitypub_process_outbox', array( $existing_item_id ) ); diff --git a/tests/includes/class-test-scheduler.php b/tests/includes/class-test-scheduler.php index 0b9ebbb3c7..b00e68319b 100644 --- a/tests/includes/class-test-scheduler.php +++ b/tests/includes/class-test-scheduler.php @@ -295,8 +295,8 @@ public function test_async_batch_with_invalid_callback() { $mock_class->expects( $this->never() ) ->method( 'callback' ); - // Run async_batch with invalid callback. - Scheduler::async_batch( array( $mock_class, 'callback' ) ); + // Run async_batch without registered callback. + Scheduler::async_batch(); } /**