Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Infra print internal state if stuck again #536

Closed
akoshelev opened this issue Mar 14, 2023 · 2 comments
Closed

Make Infra print internal state if stuck again #536

akoshelev opened this issue Mar 14, 2023 · 2 comments

Comments

@akoshelev
Copy link
Collaborator

          Let's just open an issue for this one.

Originally posted by @martinthomson in #532 (comment)

Context: After changing gateway implementation, there is no more dedicated tokio task running inside it, hence we can't print the state of send/receive buffers anymore. But this is important and we need to find a way how to do this. I am thinking that spawning a task inside TestWorld may be a way to go

@akoshelev
Copy link
Collaborator Author

More context

Infrastructure API (send/receive with associated record id) unfortunately makes it possible to stall the program if this API is used incorrectly. Due to batching inside Infra, messages are not sent immediately but rather they're accumulated inside send buffers that are created per channel and then send in one go.

That leads to implicit expectations that records are sent in order (or out of order within a certain window) and if it is not the case, then the whole program just stalls as the following snippet shows

    #[tokio::test]
    pub async fn handles_reordering() {
        use typenum::Unsigned;
        use crate::ff::Serializable;

        let mut config = TestWorldConfig::default();
        // Make send buffer capable of holding more than one message.
        config.gateway_config.send_outstanding_bytes = (2*<Fp31 as Serializable>::Size::USIZE).try_into().unwrap();

        let world = Box::leak(Box::new(TestWorld::new_with(config)));
        let contexts = world.contexts();
        let sender_ctx = contexts[0].narrow("reordering-test").set_total_records(2);
        let recv_ctx = contexts[1].narrow("reordering-test").set_total_records(2);

        tokio::spawn(async move {
            let channel = sender_ctx.send_channel(Role::H2);
            // send record 1 and wait for confirmation. It will never be acknowledged by the Infra
            // because record 0 is never sent (blocked behind record 1).
            // In order to make this code work, both sends must be executed in parallel (`try_join`)
            channel.send(RecordId::from(1), Fp31::truncate_from(1_u128)).await.unwrap();
            channel.send(RecordId::from(0), Fp31::truncate_from(0_u128)).await.unwrap();
            
            // how to make it work
            // try_join(
            //    channel.send(RecordId::from(1), Fp31::truncate_from(1_u128)),
            //    channel.send(RecordId::from(0), Fp31::truncate_from(0_u128)),
            // ).await
        });

        let recv_channel = recv_ctx.recv_channel::<Fp31>(Role::H1);
        let result = try_join(
            recv_channel.receive(RecordId::from(1)),
            recv_channel.receive(RecordId::from(0)),
        )
        .await
        .unwrap();

        assert_eq!(
            (Fp31::truncate_from(1u128), Fp31::truncate_from(0u128)),
            result
        );
    }

(this is a slightly modified version of the existing test helpers::gateway::tests::handles_reordering).

The whole problem with this stall is that it keeps everyone in the dark with regard to what actually happened. It is not clear why program stalled and advanced debugging is needed to figure out what is going on.

The previous version of the infrastructure tackled that by having a watchdog that inspected the state of internal buffers and printed records outstanding for send/receive after certain timeout has passed. It was made possible by spawning a tokio task inside Gateway struct that was running the main loop that handled send/receive and control tasks.

_ = &mut sleep => {

(feel free to clone the repo at that commit and explore how printing state worked)

After #532 we no longer have tasks spawned inside Gateway (and we don't want to have them), so there is no opportunity for placing watchdog inside it anymore. However we still need to have the visibility into records that are pending send/receive, so we need to think a bit and find a spot for it.

This is not an easy task for the following reasons:

  • Previously we knew exactly how long to wait - Gateway main loop idling was the main indicator for Infra to print its state. Now, messages are sent through channel streams, outside of Gateway's control. So the first question - how to decide when to print the state?
  • Send/receive buffers are private to the Gateway but because of the reasons above it is not easy to inspect them from outside. Some amount of exposure is likely required.

but there are good news too - send and receive buffers are Send and Sync (at least I think so), which means we can share them with the watchdog.

Thoughts on the implementation

  • InMemoryTransport is the thing I would look into first. It has the event loop task spawned (listen method) and has the exposure to receive and send methods. So inside the stream wrapper it could wake up itself on a timer and inspect whether anything was sent through any channel.

@akoshelev
Copy link
Collaborator Author

fixed in #832

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant