Skip to content

Commit

Permalink
Unleashing mechanism for stalled notification jobs (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
rwngallego authored Aug 4, 2021
1 parent 5540c9f commit 7fdae07
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 44 deletions.
4 changes: 3 additions & 1 deletion README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Contributors: rwngallego, mociofiletto
Donate link: https://github.com/perfectyorg
Tags: Push Notifications, Web Push Notifications, Notifications, User engagement
Requires at least: 5.0
Tested up to: 5.7
Tested up to: 5.8
Stable tag: 1.3.2
Requires PHP: 7.2
License: GPLv2 or later
Expand Down Expand Up @@ -97,6 +97,8 @@ You can create an issue in our Github repo:

= 1.3.3 =
* Send logs to error_log() by default when logging is not even enabled. Fixes [#85](https://github.com/perfectyorg/perfecty-push-wp/issues/85)
* Tested up to WordPress 5.8
* Unleashing mechanism for stalled notification jobs. Fixes [#86](https://github.com/perfectyorg/perfecty-push-wp/issues/86)

= 1.3.2 =
* Add the plugin links shown in the WordPress Plugin installer
Expand Down
2 changes: 1 addition & 1 deletion admin/class-perfecty-push-admin-notifications-table.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ function column_payload( $item ) {

function column_status( $item ) {
if ( $item['status'] == Perfecty_Push_Lib_Db::NOTIFICATIONS_STATUS_SCHEDULED ) {
$timestamp = Perfecty_Push_Lib_Db::get_notification_scheduled_time( $item['id'] );
$timestamp = Perfecty_Push_Lib_Push_Server::get_notification_scheduled_time( $item['id'] );
return sprintf(
'%s %s',
$item['status'],
Expand Down
2 changes: 1 addition & 1 deletion admin/partials/perfecty-push-admin-notifications-view.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<div>
<?php
if ( $item->status == Perfecty_Push_Lib_Db::NOTIFICATIONS_STATUS_SCHEDULED ) {
$timestamp = Perfecty_Push_Lib_Db::get_notification_scheduled_time( $item->id );
$timestamp = Perfecty_Push_Lib_Push_Server::get_notification_scheduled_time( $item->id );
echo esc_html( $item->status ) . ' ' . esc_html__( 'at', 'perfecty-push-notifications' ) . ' ' . get_date_from_gmt( date( 'Y-m-d H:i:s', $timestamp ), 'Y-m-d H:i:s' );
} else {
echo esc_html( $item->status );
Expand Down
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,4 @@ services:
MYSQL_RANDOM_ROOT_PASSWORD: '1'

networks:
perfecty-net:
name: perfecty-net
perfecty-net:
2 changes: 1 addition & 1 deletion includes/class-perfecty-push.php
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private function define_admin_hooks() {
$this->loader->add_action( 'admin_menu', $plugin_admin, 'register_admin_menu' );
$this->loader->add_action( 'admin_init', $plugin_admin, 'register_options' );
$this->loader->add_action( 'admin_init', $plugin_admin, 'check_cron' );
$this->loader->add_action( 'perfecty_push_broadcast_notification_event', $plugin_admin, 'execute_broadcast_batch', 10, 1 );
$this->loader->add_action( Perfecty_Push_Lib_Push_Server::BROADCAST_HOOK, $plugin_admin, 'execute_broadcast_batch', 10, 1 );
$this->loader->add_action( 'add_meta_boxes', $plugin_admin, 'register_metaboxes' );
$this->loader->add_action( 'save_post', $plugin_admin, 'on_save_post' );
$this->loader->add_action( 'transition_post_status', $plugin_admin, 'on_transition_post_status', 10, 3 );
Expand Down
2 changes: 1 addition & 1 deletion lib/class-perfecty-push-lib-cron-check.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Perfecty_Push_Lib_Cron_Check {
private const FAILURES_COUNT = 'perfecty_push_cron_failures';
private const SCHEDULE_OFFSET = 60; // 60 seconds
private const MAX_DIFF = 60; // 60 seconds
private const THRESHOLD = 3; // consider error after 3 failures
private const THRESHOLD = 5; // consider error after 5 failures

/**
* Runs the ticker and checks if the executions are correctly done
Expand Down
53 changes: 32 additions & 21 deletions lib/class-perfecty-push-lib-db.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class Perfecty_Push_Lib_Db {

private static $allowed_users_fields = 'id,uuid,wp_user_id,endpoint,key_auth,key_p256dh,remote_ip,created_at';
private static $allowed_notifications_fields = 'id,payload,total,succeeded,last_cursor,batch_size,status,is_taken,created_at,finished_at';
private static $allowed_notifications_fields = 'id,payload,total,succeeded,last_cursor,batch_size,status,is_taken,created_at,finished_at,last_execution_at';
private static $allowed_logs_fields = 'level,message,created_at';

public const NOTIFICATIONS_STATUS_SCHEDULED = 'scheduled';
Expand Down Expand Up @@ -76,6 +76,7 @@ public static function db_create() {
status varchar(15) DEFAULT 'scheduled' NOT NULL,
is_taken tinyint(1) DEFAULT 0 NOT NULL,
created_at datetime DEFAULT CURRENT_TIMESTAMP NOT NULL,
last_execution_at datetime NULL,
finished_at datetime NULL,
PRIMARY KEY (id)
) $charset;";
Expand Down Expand Up @@ -395,6 +396,27 @@ public static function get_notification( $notification_id ) {
return $result;
}

/**
* Get notifications that are stalled:
* - Status = "Running"
* - Not taken
* - Last execution > 30 seconds
*
* @return array The result
*/
public static function get_notifications_stalled() {
global $wpdb;

$sql = $wpdb->prepare(
'SELECT ' . self::$allowed_notifications_fields .
' FROM ' . self::notifications_table() .
' WHERE status = %s AND is_taken = %d AND last_execution_at <= NOW() - INTERVAL 30 SECOND',
self::NOTIFICATIONS_STATUS_RUNNING,
0
);
return $wpdb->get_results( $sql );
}

/**
* Get notifications
*
Expand Down Expand Up @@ -556,13 +578,14 @@ public static function update_notification( $notification ) {
$result = $wpdb->update(
self::notifications_table(),
array(
'payload' => $notification->payload,
'total' => $notification->total,
'succeeded' => $notification->succeeded,
'last_cursor' => $notification->last_cursor,
'batch_size' => $notification->batch_size,
'status' => $notification->status,
'is_taken' => $notification->is_taken,
'payload' => $notification->payload,
'total' => $notification->total,
'succeeded' => $notification->succeeded,
'last_cursor' => $notification->last_cursor,
'batch_size' => $notification->batch_size,
'status' => $notification->status,
'is_taken' => $notification->is_taken,
'last_execution_at' => $notification->last_execution_at,
),
array( 'id' => $notification->id )
);
Expand Down Expand Up @@ -672,7 +695,7 @@ public static function delete_notifications( $notification_ids ) {
// First delete scheduled events.
foreach ( $notification_ids as $nid ) {
$args = array( intval( $nid ) );
wp_clear_scheduled_hook( 'perfecty_push_broadcast_notification_event', $args );
wp_clear_scheduled_hook( Perfecty_Push_Lib_Push_Server::BROADCAST_HOOK, $args );
}
return $wpdb->query( 'DELETE FROM ' . self::notifications_table() . " WHERE id IN ($ids)" );
}
Expand All @@ -696,18 +719,6 @@ private static function take_untake_notification( $notification_id, $take ) {
return $result;
}

/**
* Get scheduled time
*
* @param $notification_id int Notification id
* @return int|bool Scheduled Unix timestamp for the notification or false
*/
public static function get_notification_scheduled_time( $notification_id ) {
$args = array( intval( $notification_id ) );
$result = wp_next_scheduled( 'perfecty_push_broadcast_notification_event', $args );
return $result;
}

/**
* Inserts a log entry in the DB
*
Expand Down
57 changes: 43 additions & 14 deletions lib/class-perfecty-push-lib-push-server.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
class Perfecty_Push_Lib_Push_Server {

public const DEFAULT_BATCH_SIZE = 30;
public const BROADCAST_HOOK = 'perfecty_push_broadcast_notification_event';

private static $auth;
private static $webpush;
Expand Down Expand Up @@ -135,7 +136,7 @@ public static function schedule_broadcast_async( $payload, $scheduled_time = nul
$date = new DateTime( $scheduled_time );
$scheduled_time = $date->getTimestamp();
}
$result = wp_schedule_single_event( $scheduled_time, 'perfecty_push_broadcast_notification_event', array( $notification_id ) );
$result = wp_schedule_single_event( $scheduled_time, self::BROADCAST_HOOK, array( $notification_id ) );
Log::info( 'Scheduling job id=' . $notification_id . ', result: ' . $result );

do_action( 'perfecty_push_broadcast_scheduled', $payload );
Expand Down Expand Up @@ -181,6 +182,32 @@ public static function broadcast( $payload ) {
return self::schedule_broadcast_async( $payload );
}

/**
* Get scheduled time
*
* @param $notification_id int Notification id
* @return int|bool Scheduled Unix timestamp for the notification or false
*/
public static function get_notification_scheduled_time( $notification_id ) {
$args = array( intval( $notification_id ) );
$result = wp_next_scheduled( self::BROADCAST_HOOK, $args );
return $result;
}

/**
* Get the job notifications that are stalled and
* schedule the execution automatically
*/
public static function unleash_stalled() {
$running = Perfecty_Push_Lib_Db::get_notifications_stalled();
foreach ( $running as $item ) {
if ( ! wp_next_scheduled( self::BROADCAST_HOOK, array( $item->id ) ) ) {
wp_schedule_single_event( time(), self::BROADCAST_HOOK, array( $item->id ) );
Log::info( 'An stalled notification job was unleashed, id = ' . $item->id );
}
}
}

/**
* Execute one broadcast batch
*
Expand All @@ -199,9 +226,7 @@ public static function execute_broadcast_batch( $notification_id ) {

// if it has been taken but not released, that means a wrong state
if ( $notification->is_taken ) {
Log::error( 'Halted, notification taken but not released, notification_id: ' . $notification_id );
Perfecty_Push_Lib_Db::mark_notification_failed( $notification_id );
Perfecty_Push_Lib_Db::untake_notification( $notification_id );
Log::error( 'Halted, notification job already taken, notification_id: ' . $notification_id );
return false;
}

Expand Down Expand Up @@ -238,13 +263,14 @@ public static function execute_broadcast_batch( $notification_id ) {
// we send one batch
$result = self::send_notification( $notification->payload, $users );
if ( is_array( $result ) ) {
$notification = Perfecty_Push_Lib_Db::get_notification( $notification_id );
$total_batch = $result[0];
$succeeded = $result[1];
$notification->last_cursor += $total_batch;
$notification->succeeded += $succeeded;
$notification->is_taken = 0;
$result = Perfecty_Push_Lib_Db::update_notification( $notification );
$notification = Perfecty_Push_Lib_Db::get_notification( $notification_id );
$total_batch = $result[0];
$succeeded = $result[1];
$notification->last_cursor += $total_batch;
$notification->succeeded += $succeeded;
$notification->is_taken = 0;
$notification->last_execution_at = current_time( 'mysql', 1 );
$result = Perfecty_Push_Lib_Db::update_notification( $notification );

Log::info( 'Notification batch for id=' . $notification_id . ' sent. Cursor: ' . $notification->last_cursor . ', Total: ' . $total_batch . ', Succeeded: ' . $succeeded );
if ( ! $result ) {
Expand All @@ -259,9 +285,12 @@ public static function execute_broadcast_batch( $notification_id ) {
}

// execute the next batch
$result = wp_schedule_single_event( time(), 'perfecty_push_broadcast_notification_event', array( $notification_id ) );
Log::info( 'Scheduling next batch for id=' . $notification_id . ' . Result: ' . $result );

if ( ! wp_next_scheduled( self::BROADCAST_HOOK, array( $notification_id ) ) ) {
$result = wp_schedule_single_event( time(), self::BROADCAST_HOOK, array( $notification_id ) );
Log::info( 'Scheduling next batch for id=' . $notification_id . ' . Result: ' . $result );
} else {
Log::warning( "Don't schedule next batch, it's already scheduled, id=" . $notification_id );
}
return true;
}

Expand Down
1 change: 1 addition & 0 deletions public/class-perfecty-push-public.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public function enqueue_scripts() {
public function print_head() {
$options = get_option( 'perfecty_push' );

Perfecty_Push_Lib_Push_Server::unleash_stalled();
require_once plugin_dir_path( __FILE__ ) . 'partials/perfecty-push-public-head.php';
}

Expand Down
4 changes: 2 additions & 2 deletions tests/test-push-server.php
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ public function test_execute_broadcast_batch_notification_taken() {
// there should not be any execution afterwards
$next_after_execution = wp_next_scheduled( 'perfecty_push_broadcast_notification_event', array( $notification_id ) );

// should have been marked as failed
// the notification status should be the same (scheduled)
$notification = Perfecty_Push_Lib_Db::get_notification( $notification_id );

$this->assertSame( false, $res );
$this->assertSame( 'failed', $notification->status );
$this->assertSame( 'scheduled', $notification->status );
$this->assertFalse( $next_after_execution );
}
}

0 comments on commit 7fdae07

Please sign in to comment.