diff --git a/src/workerd/api/actor-state.c++ b/src/workerd/api/actor-state.c++ index a9f6ccee479..3f7a90b80da 100644 --- a/src/workerd/api/actor-state.c++ +++ b/src/workerd/api/actor-state.c++ @@ -230,6 +230,26 @@ kj::Maybe getCurrentActorId() { } // namespace +DurableObjectStorage::DurableObjectStorage(IoPtr cache, + bool enableSql, + kj::Own primaryActorChannel, + kj::Own primaryActorId) + : cache(kj::mv(cache)), + enableSql(enableSql) { + + auto replicaFactory = kj::heap( + kj::mv(primaryActorChannel), primaryActorId->toString()); + auto outgoingFactory = + IoContext::current().addObject(kj::mv(replicaFactory)); + auto requiresHost = FeatureFlags::get(IoContext::current().getCurrentLock()) + .getDurableObjectFetchRequiresSchemeAuthority() + ? Fetcher::RequiresHostAndProtocol::YES + : Fetcher::RequiresHostAndProtocol::NO; + + this->maybePrimary = jsg::alloc( + jsg::alloc(kj::mv(primaryActorId)), kj::mv(outgoingFactory), requiresHost); +} + jsg::Promise> DurableObjectStorageOperations::get(jsg::Lock& js, kj::OneOf> keys, jsg::Optional maybeOptions) { @@ -743,6 +763,13 @@ void DurableObjectStorage::ensureReplicas() { return cache->ensureReplicas(); } +jsg::Optional> DurableObjectStorage::getPrimary(jsg::Lock& js) { + KJ_IF_SOME(primary, maybePrimary) { + return primary.addRef(); + } + return kj::none; +} + ActorCacheOps& DurableObjectTransaction::getCache(OpName op) { JSG_REQUIRE(!rolledBack, Error, kj::str("Cannot ", op, " on rolled back transaction")); auto& result = *JSG_REQUIRE_NONNULL(cacheTxn, Error, diff --git a/src/workerd/api/actor-state.h b/src/workerd/api/actor-state.h index f2120f9f8b5..166811f9cd7 100644 --- a/src/workerd/api/actor-state.h +++ b/src/workerd/api/actor-state.h @@ -7,6 +7,7 @@ // // See actor.h for APIs used by other Workers to talk to Actors. +#include #include #include #include @@ -20,6 +21,7 @@ namespace workerd::api { class SqlStorage; // Forward-declared to avoid dependency cycle (actor.h -> http.h -> basics.h -> actor-state.h) +class DurableObject; class DurableObjectId; class WebSocket; @@ -177,6 +179,16 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera : cache(kj::mv(cache)), enableSql(enableSql) {} + // This constructor is only used when we're setting up the `DurableObjectStorage` for a replica + // Durable Object instance. Replicas need to retain a reference to their primary so they can + // forward write requests, and since we already have a reference to the primary prior to + // constructing the `DurableObjectStorage`, we can just pass in the information we need to build + // a stub. The stub is then stored in `maybePrimary`. + DurableObjectStorage(IoPtr cache, + bool enableSql, + kj::Own primaryActorChannel, + kj::Own primaryActorId); + ActorCacheInterface& getActorCacheInterface() { return *cache; } @@ -241,6 +253,8 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera // thus it is idempotent. void ensureReplicas(); + jsg::Optional> getPrimary(jsg::Lock& js); + JSG_RESOURCE_TYPE(DurableObjectStorage, CompatibilityFlags::Reader flags) { JSG_METHOD(get); JSG_METHOD(list); @@ -262,6 +276,7 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera if (flags.getWorkerdExperimental()) { JSG_METHOD(waitForBookmark); + JSG_READONLY_INSTANCE_PROPERTY(primary, getPrimary); } if (flags.getReplicaRouting()) { @@ -296,6 +311,9 @@ class DurableObjectStorage: public jsg::Object, public DurableObjectStorageOpera IoPtr cache; bool enableSql; uint transactionSyncDepth = 0; + + // Set if this is a replica Durable Object. + kj::Maybe> maybePrimary; }; class DurableObjectTransaction final: public jsg::Object, public DurableObjectStorageOperations { diff --git a/src/workerd/api/actor.c++ b/src/workerd/api/actor.c++ index 7ecd7b4b14d..196ba49ec26 100644 --- a/src/workerd/api/actor.c++ +++ b/src/workerd/api/actor.c++ @@ -94,6 +94,25 @@ private: kj::Maybe> actorChannel; }; +kj::Own ReplicaActorOutgoingFactory::newSingleUseClient( + kj::Maybe cfStr) { + auto& context = IoContext::current(); + + return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest( + [&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) { + if (tracing.span.isObserved()) { + tracing.span.setTag("actor_id"_kjc, kj::heapString(actorId)); + } + + // Unlike in `GlobalActorOutgoingFactory`, we do not create this lazily, since our channel was + // already open prior to this DO starting up. + return actorChannel->startRequest({.cfBlobJson = kj::mv(cfStr), .tracing = tracing}); + }, + {.inHouse = true, + .wrapMetrics = true, + .operationName = kj::ConstString("actor_subrequest"_kjc)})); +} + jsg::Ref ColoLocalActorNamespace::get(kj::String actorId) { JSG_REQUIRE(actorId.size() > 0 && actorId.size() <= 2048, TypeError, "Actor ID length must be in the range [1, 2048]."); diff --git a/src/workerd/api/actor.h b/src/workerd/api/actor.h index 4f9762a98fd..64416a07b3f 100644 --- a/src/workerd/api/actor.h +++ b/src/workerd/api/actor.h @@ -135,6 +135,24 @@ class DurableObject final: public Fetcher { } }; +// Like `GlobalActorOutgoingFactory` in the source file, but only used for creating a stub to +// primary DO so the stub can be given to a replica. +// +// The main distinction here is we already have the capability to the primary, so we don't need to +// make an outgoing request to set things up. +class ReplicaActorOutgoingFactory final: public Fetcher::OutgoingFactory { +public: + ReplicaActorOutgoingFactory(kj::Own channel, kj::String actorId) + : actorChannel(kj::mv(channel)), + actorId(kj::mv(actorId)) {} + + kj::Own newSingleUseClient(kj::Maybe cfStr) override; + +private: + kj::Own actorChannel; + kj::String actorId; +}; + // Global durable object class binding type. class DurableObjectNamespace: public jsg::Object {