Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/changelog/1521-from-description
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Significance: minor
Type: changed

Batch processing jobs can now be scheduled with individual hooks.
20 changes: 4 additions & 16 deletions includes/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ class Dispatcher {
*/
public static $batch_size = ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE;

/**
* Callback for the async batch processing.
*
* @var array
*/
public static $callback = array( self::class, 'send_to_followers' );

/**
* Error codes that qualify for a retry.
*
Expand Down Expand Up @@ -103,8 +96,8 @@ public static function process_outbox( $id ) {
self::send_to_additional_inboxes( $activity, $actor->get__id(), $outbox_item );

if ( self::should_send_to_followers( $activity, $actor, $outbox_item ) ) {
Scheduler::async_batch(
self::$callback,
\do_action(
'activitypub_send_activity',
$outbox_item->ID,
self::$batch_size,
\get_post_meta( $outbox_item->ID, '_activitypub_outbox_offset', true ) ?: 0 // phpcs:ignore
Expand Down Expand Up @@ -253,13 +246,8 @@ private static function schedule_retry( $retries, $outbox_item_id, $attempt = 1

\wp_schedule_single_event(
\time() + ( $attempt * $attempt * HOUR_IN_SECONDS ),
'activitypub_async_batch',
array(
array( self::class, 'retry_send_to_followers' ),
$transient_key,
$outbox_item_id,
$attempt,
)
'activitypub_retry_activity',
array( $transient_key, $outbox_item_id, $attempt )
);
}

Expand Down
91 changes: 33 additions & 58 deletions includes/class-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use Activitypub\Collection\Actors;
use Activitypub\Collection\Outbox;
use Activitypub\Collection\Followers;
use Activitypub\Transformer\Factory;

/**
* Scheduler class.
Expand All @@ -38,8 +37,8 @@ public static function init() {
self::register_schedulers();

self::$batch_callbacks = array(
Dispatcher::$callback,
array( Dispatcher::class, 'retry_send_to_followers' ),
'activitypub_send_activity' => array( Dispatcher::class, 'send_to_followers' ),
'activitypub_retry_activity' => array( Dispatcher::class, 'retry_send_to_followers' ),
);

// Follower Cleanups.
Expand All @@ -48,6 +47,8 @@ public static function init() {

// Event callbacks.
\add_action( 'activitypub_async_batch', array( self::class, 'async_batch' ), 10, 99 );
\add_action( 'activitypub_send_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_retry_activity', array( self::class, 'async_batch' ), 10, 3 );
\add_action( 'activitypub_reprocess_outbox', array( self::class, 'reprocess_outbox' ) );
\add_action( 'activitypub_outbox_purge', array( self::class, 'purge_outbox' ) );

Expand Down Expand Up @@ -199,17 +200,6 @@ public static function schedule_outbox_activity_for_federation( $id, $offset = 0
* Reprocess the outbox.
*/
public static function reprocess_outbox() {
// Bail if there is a pending batch.
if ( self::next_scheduled_hook( 'activitypub_async_batch' ) ) {
return;
}

// Bail if there is a batch in progress.
$key = \md5( \serialize( Dispatcher::$callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
if ( self::is_locked( $key ) ) {
return;
}

$ids = \get_posts(
array(
'post_type' => Outbox::POST_TYPE,
Expand All @@ -220,6 +210,18 @@ public static function reprocess_outbox() {
);

foreach ( $ids as $id ) {
// Bail if there is a pending batch.
$offset = \get_post_meta( $id, '_activitypub_outbox_offset', true ) ?: 0; // phpcs:ignore
if ( \wp_next_scheduled( 'activitypub_send_activity', array( $id, ACTIVITYPUB_OUTBOX_PROCESSING_BATCH_SIZE, $offset ) ) ) {
return;
}

// Bail if there is a batch in progress.
$key = \md5( \serialize( $id ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
if ( self::is_locked( $key ) ) {
return;
}

self::schedule_outbox_activity_for_federation( $id );
}
}
Expand Down Expand Up @@ -278,42 +280,39 @@ public static function handle_outbox_purge_days_update( $old_value, $value ) {
* The batching part is optional and only comes into play if the callback returns anything.
* Beyond that it's a helper to run a callback asynchronously with locking to prevent simultaneous processing.
*
* @param callable $callback Callable processing routine.
* @params mixed ...$args Optional. Parameters that get passed to the callback.
* @params mixed ...$args Optional. Parameters that get passed to the callback.
*/
public static function async_batch( $callback ) {
if ( ! in_array( $callback, self::$batch_callbacks, true ) || ! \is_callable( $callback ) ) {
_doing_it_wrong( __METHOD__, 'The first argument must be a valid callback.', '5.2.0' );
public static function async_batch() {
$args = \func_get_args(); // phpcs:ignore PHPCompatibility.FunctionUse.ArgumentFunctionsReportCurrentValue
$callback = self::$batch_callbacks[ \current_action() ] ?? $args[0] ?? null;
if ( ! \is_callable( $callback ) ) {
\_doing_it_wrong( __METHOD__, 'There must be a valid callback associated with the current action.', '5.2.0' );
return;
}

$args = \func_get_args(); // phpcs:ignore PHPCompatibility.FunctionUse.ArgumentFunctionsReportCurrentValue
$key = \md5( \serialize( $callback ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
$key = \md5( \serialize( $args[0] ?? $args ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize

// Bail if the existing lock is still valid.
if ( self::is_locked( $key ) ) {
\wp_schedule_single_event( time() + MINUTE_IN_SECONDS, 'activitypub_async_batch', $args );
\wp_schedule_single_event( \time() + MINUTE_IN_SECONDS, \current_action(), $args );
return;
}

self::lock( $key );

$callback = array_shift( $args ); // Remove $callback from arguments.
$next = \call_user_func_array( $callback, $args );
if ( \is_callable( $args[0] ) ) {
$callback = \array_shift( $args ); // Remove $callback from arguments.
}
$next = \call_user_func_array( $callback, $args );

self::unlock( $key );

if ( ! empty( $next ) ) {
// Schedule the next run, adding the result to the arguments.
\wp_schedule_single_event(
\time() + 30,
'activitypub_async_batch',
\array_merge( array( $callback ), \array_values( $next ) )
);
\wp_schedule_single_event( \time() + 30, \current_action(), \array_values( $next ) );
}
}


/**
* Locks the async batch process for individual callbacks to prevent simultaneous processing.
*
Expand Down Expand Up @@ -365,37 +364,13 @@ public static function is_locked( $key ) {
return true;
}

/**
* Get the next scheduled hook.
*
* @param string $hook The hook name.
* @return int|bool The timestamp of the next scheduled hook, or false if none found.
*/
private static function next_scheduled_hook( $hook ) {
$crons = _get_cron_array();
if ( empty( $crons ) ) {
return false;
}

// Get next event.
$next = false;
foreach ( $crons as $timestamp => $cron ) {
if ( isset( $cron[ $hook ] ) ) {
$next = $timestamp;
break;
}
}

return $next;
}

/**
* Send announces.
*
* @param int $outbox_activity_id The outbox activity ID.
* @param \Activitypub\Activity\Activity $activity The activity object.
* @param int $actor_id The actor ID.
* @param int $content_visibility The content visibility.
* @param int $outbox_activity_id The outbox activity ID.
* @param Activity $activity The activity object.
* @param int $actor_id The actor ID.
* @param int $content_visibility The content visibility.
*/
public static function schedule_announce_activity( $outbox_activity_id, $activity, $actor_id, $content_visibility ) {
// Only if we're in both Blog and User modes.
Expand Down
5 changes: 2 additions & 3 deletions includes/collection/class-outbox.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,13 @@ private static function invalidate_existing_items( $object_id, $activity_type, $

foreach ( $existing_items as $existing_item_id ) {
$event_args = array(
Dispatcher::$callback,
$existing_item_id,
Dispatcher::$batch_size,
\get_post_meta( $existing_item_id, '_activitypub_outbox_offset', true ) ?: 0, // phpcs:ignore
);

$timestamp = \wp_next_scheduled( 'activitypub_async_batch', $event_args );
\wp_unschedule_event( $timestamp, 'activitypub_async_batch', $event_args );
$timestamp = \wp_next_scheduled( 'activitypub_send_activity', $event_args );
\wp_unschedule_event( $timestamp, 'activitypub_send_activity', $event_args );

$timestamp = \wp_next_scheduled( 'activitypub_process_outbox', array( $existing_item_id ) );
\wp_unschedule_event( $timestamp, 'activitypub_process_outbox', array( $existing_item_id ) );
Expand Down
4 changes: 2 additions & 2 deletions tests/includes/class-test-scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ public function test_async_batch_with_invalid_callback() {
$mock_class->expects( $this->never() )
->method( 'callback' );

// Run async_batch with invalid callback.
Scheduler::async_batch( array( $mock_class, 'callback' ) );
// Run async_batch without registered callback.
Scheduler::async_batch();
}

/**
Expand Down
Loading