diff --git a/include/SQS.inc.php b/include/SQS.inc.php index 88b870f3..a2a2e370 100644 --- a/include/SQS.inc.php +++ b/include/SQS.inc.php @@ -102,7 +102,7 @@ public static function deleteBatch($queueURL, $batchEntries) { private static function load() { if (!self::$sqs) { - self::$sqs = Z_Core::$AWS->get('sqs'); + self::$sqs = Z_Core::$AWS->createSqs(); } } } diff --git a/include/config/config.inc.php-sample b/include/config/config.inc.php-sample index d7db38f7..7febd19f 100644 --- a/include/config/config.inc.php-sample +++ b/include/config/config.inc.php-sample @@ -59,7 +59,9 @@ class Z_CONFIG { ); public static $SEARCH_HOSTS = ['']; - + + public static $ITEM_FEEDER_QUEUE = ''; + public static $GLOBAL_ITEMS_URL = ''; public static $ATTACHMENT_SERVER_HOSTS = array("files1.localdomain", "files2.localdomain"); diff --git a/include/header.inc.php b/include/header.inc.php index 288819f6..e3bbfd97 100644 --- a/include/header.inc.php +++ b/include/header.inc.php @@ -194,6 +194,7 @@ function zotero_autoload($className) { Zotero_DB::addCallback("commit", array("Zotero_Notifier", "commit")); Zotero_DB::addCallback("callback", array("Zotero_Notifier", "reset")); Zotero_NotifierObserver::init(); +Zotero_ItemFeederObserver::init(); // Memcached require('Memcached.inc.php'); diff --git a/model/DataObjects.inc.php b/model/DataObjects.inc.php index a26e2f42..3862377b 100644 --- a/model/DataObjects.inc.php +++ b/model/DataObjects.inc.php @@ -661,6 +661,19 @@ public static function delete($libraryID, $key) { } } + if ($deleted && $type == 'item') { + Zotero_Notifier::trigger( + 'delete', + 'item', + $libraryID . "/" . $key, + [ + $libraryID . "/" . $key => [ + 'version' => $obj->version + ] + ] + ); + } + self::unload($obj->id); if ($deleted) { diff --git a/model/ItemFeederObserver.inc.php b/model/ItemFeederObserver.inc.php new file mode 100644 index 00000000..59cf02a2 --- /dev/null +++ b/model/ItemFeederObserver.inc.php @@ -0,0 +1,148 @@ +. + + ***** END LICENSE BLOCK ***** +*/ + +class Zotero_ItemFeederObserver { + // SQS has maximum message size of 262144 bytes + const MAX_JSON_SIZE = 250 * 1024; + + public static function init() { + Zotero_Notifier::registerObserver( + __CLASS__, + ["item"], + "ItemFeederObserver" + ); + } + + public static function notify($event, $type, $ids, $extraData) { + if ($type != "item") return; + $batch = []; + foreach ($ids as $id) { + $data = $extraData[$id]; + if ($event == 'add') { + list($libraryID, $key) = explode("/", $id); + $item = Zotero_Items::getByLibraryAndKey($libraryID, $key); + $arr = [ + 'action' => 'add', + 'id' => $id, + 'version' => $item->version, + 'item' => $item->toJSON(true) + ]; + } + else if ($event == 'modify') { + list($libraryID, $key) = explode("/", $id); + $item = Zotero_Items::getByLibraryAndKey($libraryID, $key); + $arr = [ + 'action' => 'modify', + 'id' => $id, + 'version' => $item->version, + 'item' => $item->toJSON(true) + ]; + } + else if ($event == 'delete') { + $arr = [ + 'action' => 'delete', + 'id' => $id, + 'version' => $data['version'] + ]; + } + + $json = self::formatLimitedSizeJSON($arr); + if ($json) { + $batch[] = $json; + } + else { + Z_Core::logError("Failed to limit JSON size for $id"); + } + + // SQS accepts up to 10 messages per batch + if (count($batch) == 10) { + self::send($batch); + $batch = []; + } + } + + if (count($batch)) { + self::send($batch); + } + } + + private static function send($batch) { + $result = Z_SQS::sendBatch(Z_CONFIG::$ITEM_FEEDER_QUEUE, $batch); + $failedMessages = $result['Failed']; + if ($failedMessages) { + foreach ($failedMessages as $failedMessage) { + Z_Core::logError('Failed to send message to SQS: ' + . $failedMessage['Message']); + } + } + } + + /** + * Format JSON and limit its size by emptying its biggest string values + * @param $arr + * @return null|string + */ + private static function formatLimitedSizeJSON($arr) { + $json = json_encode($arr, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + + $retries_left = 10; + while (strlen($json) > self::MAX_JSON_SIZE) { + if (!$retries_left) return null; + $retries_left--; + $biggest =& self::getBiggestString($arr); + if (!$biggest) return null; + $biggest = 'VALUE TOO BIG!'; + $json = json_encode($arr, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + } + return $json; + } + + /** + * Get a reference to the biggest string value + * + * @param $arr + * @return null|&string + */ + private static function &getBiggestString(&$arr) { + $biggest = null; + foreach ($arr as $key => &$value) { + if (is_array($value)) { + $biggest_child =& self::getBiggestString($value); + if ($biggest_child) { + if (!$biggest || strlen($biggest_child) > strlen($biggest)) { + $biggest = &$biggest_child; + } + } + } + else if (is_string($value)) { + if (!$biggest || strlen($value) > strlen($biggest)) { + $biggest = &$value; + } + } + } + return $biggest; + } +}