Skip to content
This repository has been archived by the owner on Jul 9, 2024. It is now read-only.

1.2.3 introduced a regression on streams. #77

Open
fmeyer opened this issue Dec 28, 2023 · 0 comments
Open

1.2.3 introduced a regression on streams. #77

fmeyer opened this issue Dec 28, 2023 · 0 comments

Comments

@fmeyer
Copy link

fmeyer commented Dec 28, 2023

when async processing streams with 1.2.3 I often end up in a panic whereas with 1.1.3 everything works fine

async fn process_message_stream(client: ChatGPT, prompt: &str) -> chatgpt::Result<String> {
	let stream = client.send_message_streaming(prompt).await?;

	// Wrapping the buffer in an Arc and Mutex
	let buffer = Arc::new(Mutex::new(Vec::<String>::new()));

	// Iterating over stream contents
	stream
		.for_each({
			// Cloning the Arc to be moved into the outer move closure
			let buffer = Arc::clone(&buffer);
			move |each| {
				// Cloning the Arc again to be moved into the async block
				let buffer_clone = Arc::clone(&buffer);
				async move {
					match each {
						ResponseChunk::Content { delta, response_index: _ } => {
							// Printing part of response without the newline
							print!("{delta}");
							// print!(".");
							// Manually flushing the standard output, as `print` macro does not do
							// that
							stdout().lock().flush().unwrap();
							// Appending delta to buffer
							let mut locked_buffer = buffer_clone.lock().unwrap();
							locked_buffer.push(delta);
						},
						_ => {},
					}
				}
			}
		})
		.await;

	// Use buffer outside of for_each, by locking and dereferencing
	let final_buffer = buffer.lock().unwrap();

	Ok(final_buffer.join(""))
}
Stream closed abruptly!: Transport(reqwest::Error { kind: Body, source: TimedOut })
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: core::result::unwrap_failed
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1653:5
   3: core::result::Result<T,E>::expect
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1034:23
   4: chatgpt::client::ChatGPT::process_streaming_response::{{closure}}::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/chatgpt_rs-1.2.3/./src/client.rs:301:34
   5: <T as futures_util::fns::FnMut1<A>>::call_mut
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/fns.rs:28:9
   6: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:33
   7: core::option::Option<T>::map
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/option.rs:1072:29
   8: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:21
   9: <futures_util::stream::stream::for_each::ForEach<St,Fut,F> as core::future::future::Future>::poll
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/for_each.rs:70:47
  10: tldw::summarizer::process_message_stream::{{closure}}
             at ./src/summarizer.rs:67:4
  11: tldw::summarizer::process_short_input::{{closure}}
             at ./src/summarizer.rs:109:59
  12: tldw::main::{{closure}}
             at ./src/main.rs:76:69
  13: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:63
  14: tokio::runtime::coop::with_budget
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:107:5
  15: tokio::runtime::coop::budget
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:73:5
  16: tokio::runtime::park::CachedParkThread::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:31
  17: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/blocking.rs:66:9
  18: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
  19: tokio::runtime::context::runtime::enter_runtime
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/runtime.rs:65:16
  20: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
  21: tokio::runtime::runtime::Runtime::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/runtime.rs:350:45
  22: tldw::main
             at ./src/main.rs:88:2
  23: core::ops::function::FnOnce::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
fm:tldw [master]$
connor-brooks added a commit to connor-brooks/gptx that referenced this issue Apr 20, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant