Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EDR refresh Synchronization #1648

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ dependencies {
implementation(libs.edc.spi.edrstore)
implementation(libs.edc.spi.transactionspi)

implementation(libs.edc.spi.transaction.datasource)

implementation(project(":spi:tokenrefresh-spi"))
implementation(project(":spi:edr-spi"))
implementation(project(":spi:core-spi"))

testImplementation(libs.edc.junit)
testImplementation(libs.edc.core.edrstore)
testImplementation(libs.edc.lib.query)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler;

Expand All @@ -49,13 +50,16 @@ public class EdrCoreServiceExtension implements ServiceExtension {
@Inject
private TransactionContext transactionContext;

@Inject
private EndpointDataReferenceLock edrLock;

@Override
public String name() {
return NAME;
}

@Provider
public EdrService edrService() {
return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor);
return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor, edrLock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;

@Extension("Provides A Default EdrLock Provider")
public class DefaultEdrLockProviderExtension implements ServiceExtension {

@Inject
EndpointDataReferenceEntryIndex entryIndex;

@Inject
TransactionContext transactionContext;

@Provider(isDefault = true)
public EndpointDataReferenceLock createInMemoryEdrLock() {
return new InMemoryEdrLock(entryIndex, transactionContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class InMemoryEdrLock implements EndpointDataReferenceLock {

private final EndpointDataReferenceEntryIndex entryIndex;
private final TransactionContext transactionContext;
private final Map<String, ReentrantReadWriteLock> lockedEdrs = new ConcurrentHashMap<>();

public InMemoryEdrLock(EndpointDataReferenceEntryIndex entryIndex, TransactionContext transactionContext) {
this.entryIndex = entryIndex;
this.transactionContext = transactionContext;
}

@Override
public StoreResult<Boolean> acquireLock(String edrId, DataAddress edr) {

var rowLock = lockedEdrs.computeIfAbsent(edrId, k -> new ReentrantReadWriteLock());

rowLock.writeLock().lock(); // this lock synchronizes row-level access

var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId));

return StoreResult.success(isExpired(edr, edrEntry));

}


@Override
public StoreResult<Void> releaseLock(String edrId) {

lockedEdrs.computeIfPresent(edrId, (k, rowLock) -> {
if (rowLock.writeLock().isHeldByCurrentThread()) {
rowLock.writeLock().unlock();
if (!rowLock.hasQueuedThreads()) {
return null;
}
}
return rowLock;
});

return StoreResult.success();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.types.RefreshMode;
import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler;

import java.time.Instant;
import java.util.List;

import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.EDR_PROPERTY_EXPIRES_IN;

public class EdrServiceImpl implements EdrService {

private final EndpointDataReferenceStore edrStore;
private final TokenRefreshHandler tokenRefreshHandler;
private final TransactionContext transactionContext;
private final Monitor monitor;
private final EndpointDataReferenceLock edrLock;

public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor) {
public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor, EndpointDataReferenceLock edrLock) {
this.edrStore = edrStore;
this.tokenRefreshHandler = tokenRefreshHandler;
this.transactionContext = transactionContext;
this.monitor = monitor;
this.edrLock = edrLock;
}

@Override
Expand Down Expand Up @@ -74,12 +74,26 @@ private ServiceResult<DataAddress> autoRefresh(String id, DataAddress edr, Refre
if (edrEntry == null) {
return ServiceResult.notFound("An EndpointDataReferenceEntry with ID '%s' does not exist".formatted(id));
}
if (isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) {
monitor.debug("Token expired, need to refresh.");
return tokenRefreshHandler.refreshToken(id, edr)
.compose(updated -> updateEdr(edrEntry, updated));
if (edrLock.isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) {
var result = ServiceResult.from(edrLock.acquireLock(id, edr))
.compose(shouldRefresh -> {
if (!shouldRefresh && !mode.equals(RefreshMode.FORCE_REFRESH)) {
monitor.debug("Dont need to refresh. Will resolve existing.");
var refreshedEdr = edrStore.resolveByTransferProcess(id);
return ServiceResult.from(refreshedEdr);
} else {
monitor.debug("Token '%s' expired, need to refresh.".formatted(id));
return tokenRefreshHandler.refreshToken(id, edr)
.compose(updated -> updateEdr(edrEntry, updated));
}
});
edrLock.releaseLock(id)
.onFailure(error -> monitor.warning("Error releasing lock: %s".formatted(error)));
return result;

}
return ServiceResult.success(edr);
var refreshedEdr = edrStore.resolveByTransferProcess(id);
return ServiceResult.from(refreshedEdr);
}

private ServiceResult<DataAddress> updateEdr(EndpointDataReferenceEntry entry, DataAddress dataAddress) {
Expand All @@ -94,24 +108,12 @@ private ServiceResult<DataAddress> updateEdr(EndpointDataReferenceEntry entry, D

var updateResult = edrStore.save(newEntry, dataAddress);


if (updateResult.failed()) {
return ServiceResult.fromFailure(updateResult);
}
return ServiceResult.success(dataAddress);
}

private boolean isExpired(DataAddress edr, EndpointDataReferenceEntry metadata) {
var expiresInString = edr.getStringProperty(EDR_PROPERTY_EXPIRES_IN);
if (expiresInString == null) {
return false;
}

var expiresIn = Long.parseLong(expiresInString);
// createdAt is in millis, expires-in is in seconds
var expiresAt = metadata.getCreatedAt() / 1000L + expiresIn;
var expiresAtInstant = Instant.ofEpochSecond(expiresAt);

return expiresAtInstant.isBefore(Instant.now());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
#################################################################################

org.eclipse.tractusx.edc.edr.core.EdrCoreServiceExtension
org.eclipse.tractusx.edc.edr.core.lock.DefaultEdrLockProviderExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.store.defaults.InMemoryEndpointDataReferenceEntryIndex;
import org.eclipse.edc.junit.annotations.ComponentTest;
import org.eclipse.edc.query.CriterionOperatorRegistryImpl;
import org.eclipse.edc.transaction.spi.NoopTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.testfixtures.index.lock.EndpointDataReferenceLockBaseTest;
import org.junit.jupiter.api.BeforeEach;


@ComponentTest
class InMemoryEdrLockTest extends EndpointDataReferenceLockBaseTest {

private InMemoryEdrLock edrLock;
private final TransactionContext transactionContext = new NoopTransactionContext();

@BeforeEach
void setUp() {
var entryIndex = new InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistryImpl.ofDefaults());
edrLock = new InMemoryEdrLock(entryIndex, transactionContext);
entryIndex.save(edrEntry("mock", ACQUIRE_LOCK_TP));
}

@Override
protected EndpointDataReferenceLock getStore() {
return edrLock;
}
}
Loading
Loading