Skip to content

Commit

Permalink
Waiting on Register Events (#164)
Browse files Browse the repository at this point in the history
* Add `AsymmetricSemaphore`, `tag_waiter` field for register, and `onchange_tag` function

* Add waiting mechanism on protocols

* Add tests
---------

Co-authored-by: Stefan Krastanov <[email protected]>
  • Loading branch information
hanakl and Krastanov committed Jan 12, 2025
1 parent 8e0ba5b commit b1c8f5e
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## v0.5.1-dev

- Add `classical_delay` and `quantum_delay` as keyword arguments to the `RegisterNet` constructor to set a default global network edge latency.
- `onchange_tag` now permits a protocol to wait for any change to the tag metadata. Implemented thanks to the new `AsymmetricSemaphore`, a resource object that allows multiple processes to wait for an update.

## v0.5.0 - 2024-10-16

Expand Down
37 changes: 25 additions & 12 deletions src/ProtocolZoo/ProtocolZoo.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ProtocolZoo

using QuantumSavory
import QuantumSavory: get_time_tracker, Tag, isolderthan
import QuantumSavory: get_time_tracker, Tag, isolderthan, onchange_tag
using QuantumSavory: Wildcard
using QuantumSavory.CircuitZoo: EntanglementSwap, LocalEntanglementSwap

Expand Down Expand Up @@ -208,9 +208,13 @@ end
b_ = findfreeslot(prot.net[prot.nodeB]; randomize=prot.randomize, margin=margin)

if isnothing(a_) || isnothing(b_)
isnothing(prot.retry_lock_time) && error("We do not yet support waiting on register to make qubits available") # TODO
@debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n retrying..."
@yield timeout(prot.sim, prot.retry_lock_time)
if isnothing(prot.retry_lock_time)
@debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n waiting for changes to tags..."
@yield onchange_tag(prot.net[prot.nodeA]) | onchange_tag(prot.net[prot.nodeB])
else
@debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n waiting a fixed amount of time..."
@yield timeout(prot.sim, prot.retry_lock_time)
end
continue
end
# we are now certain that a_ and b_ are not nothing. The compiler is not smart enough to figure this out
Expand Down Expand Up @@ -397,20 +401,27 @@ function EntanglementConsumer(net::RegisterNet, nodeA::Int, nodeB::Int; kwargs..
end

@resumable function (prot::EntanglementConsumer)()
if isnothing(prot.period)
error("In `EntanglementConsumer` we do not yet support waiting on register to make qubits available") # TODO
end
while true
query1 = query(prot.net[prot.nodeA], EntanglementCounterpart, prot.nodeB, ❓; locked=false, assigned=true) # TODO Need a `querydelete!` dispatch on `Register` rather than using `query` here followed by `untag!` below
if isnothing(query1)
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on first node found no entanglement"
@yield timeout(prot.sim, prot.period)
if isnothing(prot.period)
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on first node found no entanglement. Waiting on tag updates in $(prot.nodeA)."
@yield onchange_tag(prot.net[prot.nodeA])
else
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on first node found no entanglement. Waiting a fixed amount of time."
@yield timeout(prot.sim, prot.period)
end
continue
else
query2 = query(prot.net[prot.nodeB], EntanglementCounterpart, prot.nodeA, query1.slot.idx; locked=false, assigned=true)
if isnothing(query2) # in case EntanglementUpdate hasn't reached the second node yet, but the first node has the EntanglementCounterpart
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on second node found no entanglement (yet...)"
@yield timeout(prot.sim, prot.period)
if isnothing(prot.period)
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on second node found no entanglement (yet...). Waiting on tag updates in $(prot.nodeB)."
@yield onchange_tag(prot.net[prot.nodeB])
else
@debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on second node found no entanglement (yet...). Waiting a fixed amount of time."
@yield timeout(prot.sim, prot.period)
end
continue
end
end
Expand All @@ -431,7 +442,9 @@ end
push!(prot.log, (now(prot.sim), ob1, ob2))
unlock(q1)
unlock(q2)
@yield timeout(prot.sim, prot.period)
if !isnothing(prot.period)
@yield timeout(prot.sim, prot.period)
end
end
end

Expand Down
9 changes: 5 additions & 4 deletions src/ProtocolZoo/cutoff.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ function CutoffProt(sim::Simulation, net::RegisterNet, node::Int; kwargs...)
end

@resumable function (prot::CutoffProt)()
if isnothing(prot.period)
error("In `CutoffProt` we do not yet support quing up and waiting on register") # TODO
end
reg = prot.net[prot.node]
while true
for slot in reg # TODO these should be done in parallel, otherwise we will be waiting on each slot, greatly slowing down the cutoffs
Expand Down Expand Up @@ -72,6 +69,10 @@ end

unlock(slot)
end
@yield timeout(prot.sim, prot.period)
if isnothing(prot.period)
@yield onchange_tag(reg)
else
@yield timeout(prot.sim, prot.period)
end
end
end
9 changes: 7 additions & 2 deletions src/ProtocolZoo/swapping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,13 @@ end
while rounds != 0
qubit_pair_ = findswapablequbits(prot.net, prot.node, prot.nodeL, prot.nodeH, prot.chooseL, prot.chooseH; agelimit=prot.agelimit)
if isnothing(qubit_pair_)
isnothing(prot.retry_lock_time) && error("We do not yet support waiting on register to make qubits available") # TODO
@yield timeout(prot.sim, prot.retry_lock_time)
if isnothing(prot.retry_lock_time)
@debug "SwapperProt: no swappable qubits found. Waiting for tag change..."
@yield onchange_tag(prot.net[prot.node])
else
@debug "SwapperProt: no swappable qubits found. Waiting a fixed amount of time..."
@yield timeout(prot.sim, prot.retry_lock_time)
end
continue
end
# The compiler is not smart enough to figure out that qubit_pair_ is not nothing, so we need to tell it explicitly. A new variable name is needed due to @resumable.
Expand Down
3 changes: 3 additions & 0 deletions src/QuantumSavory.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export
CliffordRepr, QuantumOpticsRepr, QuantumMCRepr,
UseAsState, UseAsObservable, UseAsOperation,
AbstractBackground,
onchange_tag,
# networks.jl
RegisterNet, channel, qchannel, messagebuffer,
# initialize.jl
Expand Down Expand Up @@ -79,6 +80,8 @@ include("traits_and_defaults.jl")

include("tags.jl")

include("semaphore.jl")

include("states_registers.jl")
include("quantumchannel.jl")
include("messagebuffer.jl")
Expand Down
2 changes: 2 additions & 0 deletions src/concurrentsim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ end
##

function get_time_tracker(rn::RegisterNet)
# TODO assert they are all the same
return get_time_tracker(rn.registers[1])
end
function get_time_tracker(r::Register)
# TODO assert all locks and tag_waiters and similar have the same env
r.locks[1].env::Simulation
end
function get_time_tracker(r::RegRef)
Expand Down
1 change: 1 addition & 0 deletions src/networks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function RegisterNet(graph::SimpleGraph, registers, vertex_metadata, edge_metada
if !all_are_same
if all_are_at_zero
for r in registers
r.tag_waiter[] = AsymmetricSemaphore(env)
for i in eachindex(r.locks)
r.locks[i] = ConcurrentSim.Resource(env,1)
end
Expand Down
2 changes: 2 additions & 0 deletions src/queries.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ function tag!(ref::RegRef, tag)
id = guid()
push!(ref.reg.guids, id)
ref.reg.tag_info[id] = (;tag, slot=ref.idx, time=now(get_time_tracker(ref)))
unlock(ref.reg.tag_waiter[])
return id
end

Expand All @@ -41,6 +42,7 @@ function untag!(ref::RegOrRegRef, id::Integer)
isnothing(i) ? throw(QueryError("Attempted to delete a nonexistent tag id", untag!, id)) : deleteat!(reg.guids, i) # TODO make sure there is a clear error message
to_be_deleted = reg.tag_info[id]
delete!(reg.tag_info, id)
unlock(reg.tag_waiter[])
return to_be_deleted
end

Expand Down
33 changes: 33 additions & 0 deletions src/semaphore.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using ConcurrentSim
using ResumableFunctions
import Base: unlock, lock, islocked

"""Multiple processes can wait on this semaphore for a permission to run given by another process"""
struct AsymmetricSemaphore
nbwaiters::Ref{Int}
lock::Resource
end
AsymmetricSemaphore(sim) = AsymmetricSemaphore(Ref(0), Resource(sim,1,level=1)) # start locked

function Base.lock(s::AsymmetricSemaphore)
return @process _lock(s.lock.env, s)
end

@resumable function _lock(sim, s::AsymmetricSemaphore)
s.nbwaiters[] += 1
@yield lock(s.lock)
s.nbwaiters[] -= 1
if s.nbwaiters[] > 0
unlock(s.lock)
end
end

function unlock(s::AsymmetricSemaphore)
if s.nbwaiters[] > 0
unlock(s.lock)
end
end

function islocked(s::AsymmetricSemaphore)
return islocked(s.lock)
end
8 changes: 7 additions & 1 deletion src/states_registers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ struct Register # TODO better type description
tag_info::Dict{Int128, @NamedTuple{tag::Tag, slot::Int, time::Float64}}
guids::Vector{Int128}
netparent::Ref{Any}
tag_waiter::Ref{AsymmetricSemaphore} # TODO This being a ref is a bit of code smell... but we also want to be able to have registers that are not linked to a net so we need to be able to have this field un-initialized
end

function Register(traits, reprs, bg, sr, si, at)
env = ConcurrentSim.Simulation()
Register(traits, reprs, bg, sr, si, at, [ConcurrentSim.Resource(env) for _ in traits], Dict{Int128, Tuple{Tag, Int64, Float64}}(), [], nothing)
Register(traits, reprs, bg, sr, si, at, [ConcurrentSim.Resource(env) for _ in traits], Dict{Int128, Tuple{Tag, Int64, Float64}}(), [], nothing, Ref(AsymmetricSemaphore(env)))
end

Register(traits,reprs,bg,sr,si) = Register(traits,reprs,bg,sr,si,zeros(length(traits)))
Expand Down Expand Up @@ -60,3 +61,8 @@ get_register(r::RegRef) = r.reg
get_register(r::Register) = r

#Base.:(==)(r1::Register, r2::Register) =

function onchange_tag(r::RegOrRegRef)
register = get_register(r)
return lock(register.tag_waiter[])
end
32 changes: 32 additions & 0 deletions test/test_protocolzoo_cutoffprot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,36 @@ run(sim, 2.0)
run(sim, 6.0)
@test !isassigned(net[1][1])
@test isassigned(net[2][1])

# test for period=nothing
net = RegisterNet([Register(2), Register(1)])
sim = get_time_tracker(net)
initialize!((net[1][1], net[2][1]), (Z1Z1+Z2Z2)/(sqrt(2.0)))
tag!(net[1][1], EntanglementCounterpart, 2, 1)
tag!(net[2][1], EntanglementCounterpart, 1, 1)

cprot1 = CutoffProt(sim, net, 1; period=nothing, retention_time=3.0)
cprot2 = CutoffProt(sim, net, 2; period=0.1, retention_time=3.0)
@process cprot1()
@process cprot2()

@resumable function wait_and_tag(sim)
@yield timeout(sim, 5)
tag!(net[1][2], Int)
end

@process wait_and_tag(sim)

run(sim, 2.0)
@test isassigned(net[1][1])
@test isassigned(net[2][1])

run(sim, 4.0)
@test isassigned(net[1][1])
@test !isassigned(net[2][1])

run(sim, 6.0)
@test !isassigned(net[1][1])
@test !isassigned(net[2][1])

end
37 changes: 37 additions & 0 deletions test/test_protocolzoo_entanglement_consumer.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
@testitem "ProtocolZoo Entanglement Consumer" tags=[:protocolzoo_entanglement_consumer] begin
using QuantumSavory
using QuantumSavory.ProtocolZoo: EntanglerProt, SwapperProt, EntanglementTracker, EntanglementConsumer
using Graphs
using ConcurrentSim
using ResumableFunctions

if isinteractive()
using Logging
Expand Down Expand Up @@ -41,6 +43,41 @@ for n in 3:30
@test econ.log[i][2] 1.0
@test econ.log[i][3] 1.0
end
end

# test for period=nothing
for n in 3:30
regsize = 10
net = RegisterNet([Register(regsize) for j in 1:n])
sim = get_time_tracker(net)

@resumable function delayedProts(sim)
@yield timeout(sim, 5)
for e in edges(net)
eprot = EntanglerProt(sim, net, e.src, e.dst; rounds=-1, randomize=true, margin=5, hardmargin=3)
@process eprot()
end

for v in 2:n-1
sprot = SwapperProt(sim, net, v; nodeL = <(v), nodeH = >(v), chooseL = argmin, chooseH = argmax, rounds = -1)
@process sprot()
end

for v in vertices(net)
etracker = EntanglementTracker(sim, net, v)
@process etracker()
end
end
econ = EntanglementConsumer(sim, net, 1, n; period=nothing)
@process econ()
@process delayedProts(sim)

run(sim, 100)

@test econ.log[1][1] > 5 # the process should start after 5
for i in 1:length(econ.log)
@test econ.log[i][2] 1.0
@test econ.log[i][3] 1.0
end
end
end
Loading

0 comments on commit b1c8f5e

Please sign in to comment.