Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give DurableObjectStorage reference to primary #3112

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@ kj::Maybe<kj::String> getCurrentActorId() {

} // namespace

DurableObjectStorage::DurableObjectStorage(IoPtr<ActorCacheInterface> cache,
bool enableSql,
kj::Own<IoChannelFactory::ActorChannel> primaryActorChannel,
kj::Own<ActorIdFactory::ActorId> primaryActorId)
: cache(kj::mv(cache)),
enableSql(enableSql) {

auto replicaFactory = kj::heap<ReplicaActorOutgoingFactory>(
kj::mv(primaryActorChannel), primaryActorId->toString());
auto outgoingFactory =
IoContext::current().addObject<Fetcher::OutgoingFactory>(kj::mv(replicaFactory));
auto requiresHost = FeatureFlags::get(IoContext::current().getCurrentLock())
.getDurableObjectFetchRequiresSchemeAuthority()
? Fetcher::RequiresHostAndProtocol::YES
: Fetcher::RequiresHostAndProtocol::NO;

this->maybePrimary = jsg::alloc<DurableObject>(
jsg::alloc<DurableObjectId>(kj::mv(primaryActorId)), kj::mv(outgoingFactory), requiresHost);
}

jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::get(jsg::Lock& js,
kj::OneOf<kj::String, kj::Array<kj::String>> keys,
jsg::Optional<GetOptions> maybeOptions) {
Expand Down Expand Up @@ -743,6 +763,13 @@ void DurableObjectStorage::ensureReplicas() {
return cache->ensureReplicas();
}

jsg::Optional<jsg::Ref<DurableObject>> DurableObjectStorage::getPrimary(jsg::Lock& js) {
KJ_IF_SOME(primary, maybePrimary) {
return primary.addRef();
}
return kj::none;
}

ActorCacheOps& DurableObjectTransaction::getCache(OpName op) {
JSG_REQUIRE(!rolledBack, Error, kj::str("Cannot ", op, " on rolled back transaction"));
auto& result = *JSG_REQUIRE_NONNULL(cacheTxn, Error,
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/api/actor-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//
// See actor.h for APIs used by other Workers to talk to Actors.

#include <workerd/api/actor.h>
#include <workerd/io/actor-cache.h>
#include <workerd/io/actor-id.h>
#include <workerd/io/compatibility-date.capnp.h>
Expand All @@ -20,6 +21,7 @@ namespace workerd::api {
class SqlStorage;

// Forward-declared to avoid dependency cycle (actor.h -> http.h -> basics.h -> actor-state.h)
class DurableObject;
class DurableObjectId;
class WebSocket;

Expand Down Expand Up @@ -177,6 +179,16 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera
: cache(kj::mv(cache)),
enableSql(enableSql) {}

// This constructor is only used when we're setting up the `DurableObjectStorage` for a replica
// Durable Object instance. Replicas need to retain a reference to their primary so they can
// forward write requests, and since we already have a reference to the primary prior to
// constructing the `DurableObjectStorage`, we can just pass in the information we need to build
// a stub. The stub is then stored in `maybePrimary`.
DurableObjectStorage(IoPtr<ActorCacheInterface> cache,
bool enableSql,
kj::Own<IoChannelFactory::ActorChannel> primaryActorChannel,
kj::Own<ActorIdFactory::ActorId> primaryActorId);

ActorCacheInterface& getActorCacheInterface() {
return *cache;
}
Expand Down Expand Up @@ -241,6 +253,8 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera
// thus it is idempotent.
void ensureReplicas();

jsg::Optional<jsg::Ref<DurableObject>> getPrimary(jsg::Lock& js);

JSG_RESOURCE_TYPE(DurableObjectStorage, CompatibilityFlags::Reader flags) {
JSG_METHOD(get);
JSG_METHOD(list);
Expand All @@ -262,6 +276,7 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera

if (flags.getWorkerdExperimental()) {
JSG_METHOD(waitForBookmark);
JSG_READONLY_INSTANCE_PROPERTY(primary, getPrimary);
}

if (flags.getReplicaRouting()) {
Expand Down Expand Up @@ -296,6 +311,9 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera
IoPtr<ActorCacheInterface> cache;
bool enableSql;
uint transactionSyncDepth = 0;

// Set if this is a replica Durable Object.
kj::Maybe<jsg::Ref<DurableObject>> maybePrimary;
};

class DurableObjectTransaction final: public jsg::Object, public DurableObjectStorageOperations {
Expand Down
19 changes: 19 additions & 0 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ private:
kj::Maybe<kj::Own<IoChannelFactory::ActorChannel>> actorChannel;
};

kj::Own<WorkerInterface> ReplicaActorOutgoingFactory::newSingleUseClient(
kj::Maybe<kj::String> cfStr) {
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
if (tracing.span.isObserved()) {
tracing.span.setTag("actor_id"_kjc, kj::heapString(actorId));
}

// Unlike in `GlobalActorOutgoingFactory`, we do not create this lazily, since our channel was
// already open prior to this DO starting up.
return actorChannel->startRequest({.cfBlobJson = kj::mv(cfStr), .tracing = tracing});
},
{.inHouse = true,
.wrapMetrics = true,
.operationName = kj::ConstString("actor_subrequest"_kjc)}));
}

jsg::Ref<Fetcher> ColoLocalActorNamespace::get(kj::String actorId) {
JSG_REQUIRE(actorId.size() > 0 && actorId.size() <= 2048, TypeError,
"Actor ID length must be in the range [1, 2048].");
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/api/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ class DurableObject final: public Fetcher {
}
};

// Like `GlobalActorOutgoingFactory` in the source file, but only used for creating a stub to
// primary DO so the stub can be given to a replica.
//
// The main distinction here is we already have the capability to the primary, so we don't need to
// make an outgoing request to set things up.
class ReplicaActorOutgoingFactory final: public Fetcher::OutgoingFactory {
public:
ReplicaActorOutgoingFactory(kj::Own<IoChannelFactory::ActorChannel> channel, kj::String actorId)
: actorChannel(kj::mv(channel)),
actorId(kj::mv(actorId)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;

private:
kj::Own<IoChannelFactory::ActorChannel> actorChannel;
kj::String actorId;
};

// Global durable object class binding type.
class DurableObjectNamespace: public jsg::Object {

Expand Down