Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix updating job & synchronization when reaching rate limit during synchronization #127

Open
wants to merge 14 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/Action/SynchronizationAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use OCA\OpenConnector\Service\SynchronizationService;
use OCA\OpenConnector\Db\SynchronizationMapper;
use OCA\OpenConnector\Db\SynchronizationContractMapper;
use Symfony\Component\HttpKernel\Exception\TooManyRequestsHttpException;

/**
* This action handles the synchronization of data from the source to the target.
Expand Down Expand Up @@ -70,7 +71,15 @@ public function run(array $argument = []): array
$response['stackTrace'][] = 'Doing the synchronization';
try {
$objects = $this->synchronizationService->synchronize($synchronization);
} catch (Exception $e) {
} catch (TooManyRequestsHttpException $e) {
$response['level'] = 'WARNING';
$response['stackTrace'][] = $response['message'] = 'Stopped synchronization: ' . $e->getMessage();
if (isset($e->getHeaders()['X-RateLimit-Reset']) === true) {
$response['nextRun'] = $e->getHeaders()['X-RateLimit-Reset'];
$response['stackTrace'][] = $response['message'] = 'Returning X-RateLimit-Reset header to update Job nextRun: ' . $response['nextRun'];
}
return $response;
} catch (Exception $e) {
$response['level'] = 'ERROR';
$response['stackTrace'][] = $response['message'] = 'Failed to synchronize: ' . $e->getMessage();
return $response;
Expand Down
87 changes: 51 additions & 36 deletions lib/Controller/SynchronizationsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace OCA\OpenConnector\Controller;

use GuzzleHttp\Exception\GuzzleException;
use OCA\OpenConnector\Service\ObjectService;
use OCA\OpenConnector\Service\SearchService;
use OCA\OpenConnector\Service\SynchronizationService;
Expand All @@ -15,6 +16,8 @@
use OCP\IRequest;
use Exception;
use OCP\AppFramework\Db\DoesNotExistException;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;

class SynchronizationsController extends Controller
{
Expand Down Expand Up @@ -213,33 +216,36 @@ public function logs(int $id): JSONResponse
}
}

/**
* Tests a synchronization
*
* This method tests a synchronization without persisting anything to the database.
*
* @NoAdminRequired
* @NoCSRFRequired
*
* @param int $id The ID of the synchronization
*
* @return JSONResponse A JSON response containing the test results
*
* @example
* Request:
* empty POST
*
* Response:
* {
* "resultObject": {
* "fullName": "John Doe",
* "userAge": 30,
* "contactEmail": "[email protected]"
* },
* "isValid": true,
* "validationErrors": []
* }
*/
/**
* Tests a synchronization
*
* This method tests a synchronization without persisting anything to the database.
*
* @NoAdminRequired
* @NoCSRFRequired
*
* @param int $id The ID of the synchronization
*
* @return JSONResponse A JSON response containing the test results
* @throws GuzzleException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*
* @example
* Request:
* empty POST
*
* Response:
* {
* "resultObject": {
* "fullName": "John Doe",
* "userAge": 30,
* "contactEmail": "[email protected]"
* },
* "isValid": true,
* "validationErrors": []
* }
*/
public function test(int $id): JSONResponse
{
try {
Expand All @@ -250,17 +256,26 @@ public function test(int $id): JSONResponse

// Try to synchronize
try {
$logAndContractArray = $this->synchronizationService->synchronize(synchronization: $synchronization, isTest: true);
$logAndContractArray = $this->synchronizationService->synchronize(
synchronization: $synchronization,
isTest: true
);

// Return the result as a JSON response
return new JSONResponse(data: $logAndContractArray, statusCode: 200);
} catch (Exception $e) {
// If synchronizaiton fails, return an error response
return new JSONResponse([
'error' => 'Synchronization error',
'message' => $e->getMessage()
], 400);
}
// Check if getHeaders method exists and use it if available
$headers = method_exists($e, 'getHeaders') ? $e->getHeaders() : [];

return new JSONResponse($resultFromTest, 200);
// If synchronization fails, return an error response
return new JSONResponse(
data: [
'error' => 'Synchronization error',
'message' => $e->getMessage()
],
statusCode: $e->getCode() ?? 400,
headers: $headers
);
}
}
}
}
31 changes: 27 additions & 4 deletions lib/Cron/ActionTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use OCP\BackgroundJob\IJobList;
use OCP\IUserManager;
use OCP\IUserSession;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Symfony\Component\Uid\Uuid;
use DateInterval;
use DateTime;
Expand Down Expand Up @@ -59,6 +61,9 @@ public function __construct(
* @param $argument
*
* @return JobLog|void
* @throws \OCP\DB\Exception
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function run($argument)
{
Expand All @@ -81,10 +86,22 @@ public function run($argument)

// if the next run is in the the future, we don't need to do anything
if ($job->getNextRun() !== null && $job->getNextRun() > new DateTime()) {
return;
$jobLog = $this->jobLogMapper->createFromArray([
'level' => 'WARNING',
'message' => 'Next Run is still in the future for this job',
'jobId' => $job->getId(),
'jobClass' => $job->getJobClass(),
'jobListId' => $job->getJobListId(),
'arguments' => $job->getArguments(),
'lastRun' => $job->getLastRun(),
'nextRun' => $job->getNextRun(),
'executionTime' => 0
]);

return $jobLog;
}

if(empty($job->getUserId()) === false && $this->userSession->getUser() === null) {
if (empty($job->getUserId()) === false && $this->userSession->getUser() === null) {
$user = $this->userManager->get($job->getUserId());
$this->userSession->setUser($user);
}
Expand All @@ -106,11 +123,17 @@ public function run($argument)
$job->setIsEnabled(false);
}


// Update the job
$job->setLastRun(new DateTime());
$nextRun = new DateTime('now + '.$job->getInterval().' seconds');
if (isset($result['nextRun']) === true) {
$nextRun = DateTime::createFromFormat('U', $result['nextRun'], $nextRun->getTimezone());
// Check if the current seconds part is not zero, and if so, round up to the next minute
if ($nextRun->format('s') !== '00') {
$nextRun->modify('next minute');
}
}
$nextRun->setTime(hour: $nextRun->format('H'), minute: $nextRun->format('i'));
$job->setLastRun(new DateTime());
$job->setNextRun($nextRun);
$this->jobMapper->update($job);

Expand Down
3 changes: 3 additions & 0 deletions lib/Db/Synchronization.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Synchronization extends Entity implements JsonSerializable
protected ?DateTime $sourceLastChanged = null; // The last changed date of the source object
protected ?DateTime $sourceLastChecked = null; // The last checked date of the source object
protected ?DateTime $sourceLastSynced = null; // The last synced date of the source object
protected ?int $currentPage = 1; // The last page synced. Used for keeping track where to continue syncing after Rate Limit has been exceeded on source with pagination.
// Target
protected ?string $targetId = null; // The id of the target object
protected ?string $targetType = null; // The type of the target object (e.g. api, database, register/schema.)
Expand Down Expand Up @@ -51,6 +52,7 @@ public function __construct() {
$this->addType('sourceLastChanged', 'datetime');
$this->addType('sourceLastChecked', 'datetime');
$this->addType('sourceLastSynced', 'datetime');
$this->addType('currentPage', 'integer');
$this->addType('targetId', 'string');
$this->addType('targetType', 'string');
$this->addType('targetHash', 'string');
Expand Down Expand Up @@ -126,6 +128,7 @@ public function jsonSerialize(): array
'sourceLastChanged' => isset($this->sourceLastChanged) === true ? $this->sourceLastChanged->format('c') : null,
'sourceLastChecked' => isset($this->sourceLastChecked) === true ? $this->sourceLastChecked->format('c') : null,
'sourceLastSynced' => isset($this->sourceLastSynced) === true ? $this->sourceLastSynced->format('c') : null,
'currentPage' => $this->currentPage,
'targetId' => $this->targetId,
'targetType' => $this->targetType,
'targetHash' => $this->targetHash,
Expand Down
3 changes: 3 additions & 0 deletions lib/Db/SynchronizationContractLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class SynchronizationContractLog extends Entity implements JsonSerializable
{
protected ?string $uuid = null;
protected ?string $message = null;
protected ?string $synchronizationId = null;
protected ?string $synchronizationContractId = null;
protected ?array $source = [];
Expand All @@ -20,6 +21,7 @@ class SynchronizationContractLog extends Entity implements JsonSerializable

public function __construct() {
$this->addType('uuid', 'string');
$this->addType('message', 'string');
$this->addType('synchronizationId', 'string');
$this->addType('synchronizationContractId', 'string');
$this->addType('source', 'json');
Expand Down Expand Up @@ -65,6 +67,7 @@ public function jsonSerialize(): array
return [
'id' => $this->id,
'uuid' => $this->uuid,
'message' => $this->message,
'synchronizationId' => $this->synchronizationId,
'synchronizationContractId' => $this->synchronizationContractId,
'source' => $this->source,
Expand Down
2 changes: 1 addition & 1 deletion lib/Migration/Version0Date20240826193657.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt
$table->addIndex(['user_id'], 'openconnector_job_logs_user_id_index');
}

if (!$schema->hasTable('openconnector_source_contract_logs')) {
if (!$schema->hasTable('openconnector_synchronization_contract_logs')) {
$table = $schema->createTable('openconnector_synchronization_contract_logs');
$table->addColumn('id', Types::BIGINT, ['autoincrement' => true, 'notnull' => true, 'length' => 20]);
$table->addColumn('uuid', Types::STRING, ['notnull' => true, 'length' => 36]);
Expand Down
3 changes: 2 additions & 1 deletion lib/Migration/Version1Date20241121160300.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

/**
* This migration changes the following:
* - Adding 3 new columns for the table Source: rateLimitLimit, rateLimitRemaining & rateLimitReset
* - Adding 4 new columns for the table Source: rateLimitLimit, rateLimitRemaining, rateLimitReset & rateLimitWindow
*/
class Version1Date20241121160300 extends SimpleMigrationStep {

Expand All @@ -41,6 +41,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt
* @var ISchemaWrapper $schema
*/
$schema = $schemaClosure();
// Sources table
$table = $schema->getTable('openconnector_sources');

if ($table->hasColumn('rate_limit_limit') === false) {
Expand Down
79 changes: 79 additions & 0 deletions lib/Migration/Version1Date20241210120155.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

declare(strict_types=1);

/**
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

namespace OCA\OpenConnector\Migration;

use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;

/**
* This migration changes the following:
* - Adding 1 new column for the table Synchronization: currentPage
* - Adding 1 new column for the table SynchronizationContractLogs: message
*/
class Version1Date20241210120155 extends SimpleMigrationStep {

/**
* @param IOutput $output
* @param Closure(): ISchemaWrapper $schemaClosure
* @param array $options
*/
public function preSchemaChange(IOutput $output, Closure $schemaClosure, array $options): void {
}

/**
* @param IOutput $output
* @param Closure(): ISchemaWrapper $schemaClosure
* @param array $options
* @return null|ISchemaWrapper
*/
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/**
* @var ISchemaWrapper $schema
*/
$schema = $schemaClosure();

// Synchronizations table
if ($schema->hasTable('openconnector_synchronizations') === true) {
$table = $schema->getTable('openconnector_synchronizations');

if ($table->hasColumn('current_page') === false) {
$table->addColumn('current_page', Types::INTEGER, [
'notnull' => false,
'default' => 1
]);
}
}

// SynchronizationContractLogs table
if ($schema->hasTable('openconnector_synchronization_contract_logs') === true) {
$table = $schema->getTable('openconnector_synchronization_contract_logs');

if ($table->hasColumn('message') === false) {
$table->addColumn('message', Types::STRING, [
'length' => 255,
'notnull' => false,
])->setDefault(null);
}
}

return $schema;
}

/**
* @param IOutput $output
* @param Closure(): ISchemaWrapper $schemaClosure
* @param array $options
*/
public function postSchemaChange(IOutput $output, Closure $schemaClosure, array $options): void {
}
}
2 changes: 1 addition & 1 deletion lib/Service/CallService.php
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public function call(

$body = $response->getBody()->getContents();

// Let create the data array
// Let's create the data array
$data = [
'request' => [
'url' => $url,
Expand Down
Loading
Loading