Skip to content

Commit

Permalink
Implement transactions based on MarkerDatabase
Browse files Browse the repository at this point in the history
 * Remove all manual interaction with MarkerDatabase. It is used solely
   to implement writing transactions and waiting for them.
 * MarkerDatabase provides a JVM internal as well as external locking
   and waiting mechanism.
 * Transactions are a per thread ID which may (or may not) cause a
   MarkerDatabase with that ID to be created whenever required, i.e.
   when objects are actually inserted in the BHive.
 * Nested transactions are fully supported. The order in which
   transactions are closed *must* be the reverse of how they started.
 * PruneOperation locks the root for all MarkerDatabase's which prevents
   the creation of new transactions (delays them). Existing transactions
   are fully respected and can continue to work while this happens.
 * All transactions can run fully in parallel, PruneOperation is the
   only place that can cause waiting.
 * Transactions can be inherited by child Threads and are inherently
   thread-safe. This is to support multi-threaded file operations.
 * ObjectDatabase knows about transactions and marks objects it inserts
   on the active transaction. If there is no transaction, a warning is
   issued on this level, but no further failure is provoked.
 * TransactedOperation - a new base class for "writing" operations will
   in its own check and assure that there is an open transaction, which
   has to be opened by the caller of such operations, since a single
   TransactedOperation is very likely to only be a part of a multi-step
   action (e.g. insert-object, insert-tree, insert-manifest, each
   separate operations which should be on the same transaction).

Request: DCS-1292
Change-Id: Ia852af156717b8996c456dae74ea4989d30765f8
  • Loading branch information
mduft committed Feb 4, 2021
1 parent fbc516a commit 7397803
Show file tree
Hide file tree
Showing 64 changed files with 750 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.bdeploy.api.plugin.v1.Plugin;
import io.bdeploy.api.product.v1.impl.ScopedManifestKey;
import io.bdeploy.bhive.BHive;
import io.bdeploy.bhive.BHiveTransactions.Transaction;
import io.bdeploy.bhive.model.Manifest;
import io.bdeploy.bhive.model.ObjectId;
import io.bdeploy.bhive.model.Tree;
Expand Down Expand Up @@ -91,6 +92,12 @@ public synchronized ProductManifestBuilder addApplicationTemplate(Path tmplPath)
}

public synchronized void insert(BHive hive, Manifest.Key manifest, String productName) {
try (Transaction t = hive.getTransactions().begin()) {
doInsertLocked(hive, manifest, productName);
}
}

private void doInsertLocked(BHive hive, Manifest.Key manifest, String productName) {
Tree.Builder tree = new Tree.Builder();

// add application references
Expand Down Expand Up @@ -336,8 +343,10 @@ private static ApplicationDescriptorApi importDependenciesAndApplication(BHive h

fetcher.fetch(hive, appDesc.runtimeDependencies, os).forEach(builder::add);

builder.add(hive.execute(new ImportOperation().setSourcePath(appPath.getParent())
.setManifest(new ScopedManifestKey(baseName + appName, os, versions.version).getKey())));
try (Transaction t = hive.getTransactions().begin()) {
builder.add(hive.execute(new ImportOperation().setSourcePath(appPath.getParent())
.setManifest(new ScopedManifestKey(baseName + appName, os, versions.version).getKey())));
}

return appDesc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.bdeploy.api.remote.v1.PublicRootResource;
import io.bdeploy.api.remote.v1.dto.SoftwareRepositoryConfigurationApi;
import io.bdeploy.bhive.BHive;
import io.bdeploy.bhive.BHiveTransactions.Transaction;
import io.bdeploy.bhive.model.Manifest;
import io.bdeploy.bhive.model.Manifest.Key;
import io.bdeploy.bhive.model.ObjectId;
Expand Down Expand Up @@ -99,7 +100,9 @@ private SortedSet<String> fetchSingleRemote(BHive hive, SortedSet<String> deps,
return unresolved;
}

hive.execute(new FetchOperation().setRemote(svc).setHiveName(group).addManifest(toFetch));
try (Transaction t = hive.getTransactions().begin()) {
hive.execute(new FetchOperation().setRemote(svc).setHiveName(group).addManifest(toFetch));
}

return unresolved;
}
Expand Down
40 changes: 38 additions & 2 deletions bhive/src/main/java/io/bdeploy/bhive/BHive.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class BHive implements AutoCloseable, BHiveExecution {
private final FileSystem zipFs;
private final Path objTmp;
private final Path markerTmp;
private final BHiveTransactions transactions;
private final ObjectDatabase objects;
private final ManifestDatabase manifests;
private final ActivityReporter reporter;
Expand Down Expand Up @@ -108,7 +109,8 @@ public BHive(URI uri, Auditor auditor, ActivityReporter reporter) {
} catch (IOException e) {
throw new IllegalStateException("Cannot create temporary directory for zipped BHive", e);
}
this.objects = new ObjectDatabase(objRoot, objTmp, reporter);
this.transactions = new BHiveTransactions(markerTmp, reporter);
this.objects = new ObjectDatabase(objRoot, objTmp, reporter, transactions);
this.manifests = new ManifestDatabase(relRoot.resolve("manifests"));
this.reporter = reporter;
}
Expand Down Expand Up @@ -153,6 +155,14 @@ public <T> T execute(Operation<T> op) {
}
}

/**
* @return the {@link BHiveTransactions} tracker.
*/
@Override
public BHiveTransactions getTransactions() {
return transactions;
}

@Override
public void close() {
if (zipFs != null) {
Expand Down Expand Up @@ -180,7 +190,7 @@ public abstract static class Operation<T> implements Callable<T>, BHiveExecution
/**
* Used internally to associate the operation with the executing hive
*/
private final void initOperation(BHive hive) {
void initOperation(BHive hive) {
this.hive = hive;
this.fileOps = Executors.newFixedThreadPool(hive.parallelism,
new NamedDaemonThreadFactory(() -> "File-OPS-" + fileOpNum.incrementAndGet()));
Expand Down Expand Up @@ -225,6 +235,11 @@ protected ActivityReporter getActivityReporter() {
return hive.reporter;
}

@Override
public BHiveTransactions getTransactions() {
return hive.getTransactions();
}

/**
* Submit a {@link Runnable} performing a file operation to the pool managing
* those operations.
Expand All @@ -248,4 +263,25 @@ public <X> X execute(BHive.Operation<X> other) {

}

/**
* Base class for operations which require an open transaction, set up by the caller.
*/
public abstract static class TransactedOperation<T> extends Operation<T> {

@Override
public final T call() throws Exception {
if (!getTransactions().hasTransaction()) {
throw new IllegalStateException("Operation requires active transaction: " + getClass().getSimpleName());
}

return callTransacted();
}

/**
* Executes the operation. The current thread is guaranteed to be associated with a transaction.
*/
protected abstract T callTransacted() throws Exception;

}

}
5 changes: 5 additions & 0 deletions bhive/src/main/java/io/bdeploy/bhive/BHiveExecution.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public interface BHiveExecution {
*/
public <X> X execute(BHive.Operation<X> op);

/**
* @return the transactions manager for this {@link BHiveExecution}
*/
public BHiveTransactions getTransactions();

}
134 changes: 134 additions & 0 deletions bhive/src/main/java/io/bdeploy/bhive/BHiveTransactions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.bdeploy.bhive;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Stack;
import java.util.TreeMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.bdeploy.bhive.model.ObjectId;
import io.bdeploy.bhive.objects.MarkerDatabase;
import io.bdeploy.bhive.objects.ObjectDatabase;
import io.bdeploy.common.ActivityReporter;
import io.bdeploy.common.util.PathHelper;
import io.bdeploy.common.util.UuidHelper;

/**
* Keeps track of running operations in the JVM.
* <p>
* Uses MarkerDatabase to synchronize with other JVMs which might have operations running as well.
*/
public class BHiveTransactions {

private static final Logger log = LoggerFactory.getLogger(BHiveTransactions.class);

private final InheritableThreadLocal<Stack<String>> transactions = new InheritableThreadLocal<>();
private final Map<String, MarkerDatabase> dbs = new TreeMap<>();
private final ActivityReporter reporter;
private final Path markerRoot;

public BHiveTransactions(Path markerRoot, ActivityReporter reporter) {
this.markerRoot = markerRoot;
this.reporter = reporter;
}

private Stack<String> getOrCreate() {
Stack<String> result = transactions.get();
if (result == null) {
result = new Stack<>();
transactions.set(result);
}
return result;
}

/**
* @param object the object which should be considered "touched", i.e. inserted.
*/
public void touchObject(ObjectId object) {
Stack<String> all = transactions.get();
String id = (all == null || all.isEmpty()) ? null : all.peek();
if (id == null) {
throw new IllegalStateException("No transaction active while inserting object.");
}

MarkerDatabase mdb = dbs.get(id);
if (mdb == null) {
throw new IllegalStateException("Transaction database missing for transaction " + id);
}

if (!mdb.hasObject(object)) {
mdb.addMarker(object);
}
}

/**
* @return whether the current thread has an associated transaction.
*/
public boolean hasTransaction() {
Stack<String> stack = transactions.get();
return stack != null && !stack.isEmpty();
}

/**
* Begins a new transaction on this thread.
* <p>
* Inserts on the {@link ObjectDatabase} of a {@link BHive} will use this transaction to keep track of objects inserted.
*
* @return a {@link Transaction} which will cleanup associated resources when closed.
*/
public Transaction begin() {
MarkerDatabase.waitRootLock(markerRoot);

String uuid = UuidHelper.randomId();
getOrCreate().push(uuid);
dbs.put(uuid, new MarkerDatabase(markerRoot.resolve(uuid), reporter));

if (log.isDebugEnabled()) {
log.debug("Starting transaction {}", uuid, new RuntimeException("Starting Transaction"));
}

return new Transaction() {

@Override
public void close() {
MarkerDatabase.waitRootLock(markerRoot);

Stack<String> stack = transactions.get();
if (stack == null || stack.isEmpty()) {
throw new IllegalStateException("No transaction has been started on this thread!");
}

if (!stack.peek().equals(uuid)) {
log.warn("Out-of-order transaction found: {}, expected: {}", stack.peek(), uuid);
}

if (log.isDebugEnabled()) {
log.debug("Ending transaction {}", uuid, new RuntimeException("Ending Transaction"));
}

stack.remove(uuid);
dbs.remove(uuid);

Path mdb = markerRoot.resolve(uuid);
if (!Files.isDirectory(mdb)) {
return; // nothing to clean.
}

PathHelper.deleteRecursive(mdb);
}
};
}

/**
* Represents a writing transaction in the BHive.
*/
public interface Transaction extends AutoCloseable {

@Override
public void close();
}

}
4 changes: 3 additions & 1 deletion bhive/src/main/java/io/bdeploy/bhive/cli/FetchTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.nio.file.Paths;

import io.bdeploy.bhive.BHive;
import io.bdeploy.bhive.BHiveTransactions.Transaction;
import io.bdeploy.bhive.cli.FetchTool.FetchConfig;
import io.bdeploy.bhive.model.Manifest;
import io.bdeploy.bhive.op.remote.FetchOperation;
Expand Down Expand Up @@ -53,7 +54,8 @@ public FetchTool() {
protected RenderableResult run(FetchConfig config, RemoteService svc) {
helpAndFailIfMissing(config.hive(), "Missing --hive");

try (BHive hive = new BHive(Paths.get(config.hive()).toUri(), getActivityReporter())) {
try (BHive hive = new BHive(Paths.get(config.hive()).toUri(), getActivityReporter());
Transaction t = hive.getTransactions().begin()) {
FetchOperation op = new FetchOperation().setRemote(svc).setHiveName(config.source());

for (String m : config.manifest()) {
Expand Down
3 changes: 2 additions & 1 deletion bhive/src/main/java/io/bdeploy/bhive/cli/ImportTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.TreeMap;

import io.bdeploy.bhive.BHive;
import io.bdeploy.bhive.BHiveTransactions.Transaction;
import io.bdeploy.bhive.cli.ImportTool.ImportConfig;
import io.bdeploy.bhive.model.Manifest;
import io.bdeploy.bhive.op.ImportOperation;
Expand Down Expand Up @@ -79,7 +80,7 @@ protected RenderableResult run(ImportConfig config) {
labels.put(k, v);
}

try (BHive hive = new BHive(target.toUri(), getActivityReporter())) {
try (BHive hive = new BHive(target.toUri(), getActivityReporter()); Transaction t = hive.getTransactions().begin()) {
hive.setParallelism(config.jobs());

ImportOperation op = new ImportOperation().setSourcePath(source).setManifest(Manifest.Key.parse(config.manifest()));
Expand Down
38 changes: 21 additions & 17 deletions bhive/src/main/java/io/bdeploy/bhive/meta/MetaManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.LoggerFactory;

import io.bdeploy.bhive.BHiveExecution;
import io.bdeploy.bhive.BHiveTransactions.Transaction;
import io.bdeploy.bhive.model.Manifest;
import io.bdeploy.bhive.model.Manifest.Key;
import io.bdeploy.bhive.model.ObjectId;
Expand Down Expand Up @@ -148,28 +149,31 @@ public Manifest.Key write(BHiveExecution target, T meta) {
Manifest.Builder newMf = new Manifest.Builder(targetKey);
Tree.Builder newTree = new Tree.Builder();

// add the new metadata object
String metaFileName = metaFileName();
if (meta != null) {
// if null, we don't write the entry -> delete.
ObjectId oid = target.execute(new ImportObjectOperation().setData(StorageHelper.toRawBytes(meta)));
newTree.add(new Tree.Key(metaFileName, EntryType.BLOB), oid);
}
try (Transaction t = target.getTransactions().begin()) {
// add the new metadata object
String metaFileName = metaFileName();
if (meta != null) {
// if null, we don't write the entry -> delete.
ObjectId oid = target.execute(new ImportObjectOperation().setData(StorageHelper.toRawBytes(meta)));
newTree.add(new Tree.Key(metaFileName, EntryType.BLOB), oid);
}

// now copy other entries to the new tree
if (oldTree != null) {
for (Entry<Tree.Key, ObjectId> entry : oldTree.getChildren().entrySet()) {
if (entry.getKey().getName().equals(metaFileName)) {
continue; // we updated this one.
}
// now copy other entries to the new tree
if (oldTree != null) {
for (Entry<Tree.Key, ObjectId> entry : oldTree.getChildren().entrySet()) {
if (entry.getKey().getName().equals(metaFileName)) {
continue; // we updated this one.
}

newTree.add(entry.getKey(), entry.getValue());
newTree.add(entry.getKey(), entry.getValue());
}
}

// insert tree and manifest
ObjectId newTreeId = target.execute(new InsertArtificialTreeOperation().setTree(newTree));
target.execute(new InsertManifestOperation().addManifest(newMf.setRoot(newTreeId).build(target)));
}

// insert tree and manifest
ObjectId newTreeId = target.execute(new InsertArtificialTreeOperation().setTree(newTree));
target.execute(new InsertManifestOperation().addManifest(newMf.setRoot(newTreeId).build(target)));
target.execute(new ManifestDeleteOldByIdOperation().setAmountToKeep(META_HIST_SIZE).setToDelete(metaName));

return targetKey;
Expand Down
Loading

0 comments on commit 7397803

Please sign in to comment.