Skip to content

Commit

Permalink
fix: 🐛 输出&Websocket&转译 大修复:解决「Websocket代码耦合」「消息回传线程锁死问题」等bug
Browse files Browse the repository at this point in the history
1. 通过在「NAVM输出缓存」中启用「流式处理者列表」实现「输出打印、输出Websocket回传、输出缓存」解耦
2. 通过深入`ws`库,探索使用`ws::Settings`与`WebSocket::broadcaster`
  1. 简化「广播到所有连接」的「消息回传」问题
  2. 缓解「一时间回传过多消息,Websocket回传线程阻塞导致整个程序阻塞」的问题:输出容量从500提升到24576
3. 结合ONA源码,对ONA shell重新启用「操作注册」,但对「预置操作符名」自动忽略
4. 更改逻辑以修复OpenNARS对「ANTICIPATE」的输出转译失败问题
5. 优化并正式实装Matriangle Websocket服务器,目前可正常对接实验
  • Loading branch information
ARCJ137442 committed Apr 8, 2024
1 parent 73e0b66 commit 3632527
Show file tree
Hide file tree
Showing 16 changed files with 356 additions and 104 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ executables/
# 临时文件
*tempCodeRunnerFile*

# 日志文件
*.log

# 测试用文件
\[test\]*
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"editor.formatOnSave": true,
"cSpell.words": [
"Addrs",
"boardcaster",
"canonicalize",
"clearscreen",
"confy",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "babel_nar"
version = "0.18.0"
version = "0.18.1"
edition = "2021"
description = """
Implementation and application supports of the NAVM model
Expand Down
9 changes: 9 additions & 0 deletions src/bin/babelnar_cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ mod tests {
main($cin_path, &[])
}


/// Matriangle服务器
/// * 🎯复现先前基于Matriangle环境的NARS实验
#[test]
pub fn main_matriangle_server() -> Result<()> {
// 以默认参数启动
main_configs($cin_path, &[MATRIANGLE_SERVER])
}

$(
$(#[$attr])*
#[test]
Expand Down
48 changes: 40 additions & 8 deletions src/bin/babelnar_cli/runtime_manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use babel_nar::{
readline_iter::ReadlineIter,
},
},
eprintln_cli, println_cli,
eprintln_cli, if_let_err_eprintln_cli, println_cli,
runtimes::TranslateError,
test_tools::{nal_format::parse, put_nal, VmOutputCache},
};
use nar_dev_utils::{if_return, ResultBoost};
use nar_dev_utils::{if_return, manipulate, pipe, ResultBoost};
use navm::{
cmd::Cmd,
vm::{VmRuntime, VmStatus},
Expand Down Expand Up @@ -64,10 +64,40 @@ where
Self {
runtime: Arc::new(Mutex::new(runtime)),
config: Arc::new(config),
output_cache: OutputCache::default_arc_mutex(),
// 创建的同时增加侦听器
output_cache: Self::new_output_cache(),
}
}

/// 新建一个「输出缓存」
/// * 🚩创建缓存⇒增加侦听器⇒装入[`ArcMutex`]
/// * 🎯避免
fn new_output_cache() -> ArcMutex<OutputCache> {
pipe! {
manipulate!(
// 产生一个新的「输出缓存」
OutputCache::default()
// 添加侦听器
=> Self::add_output_listener
)
// 装入ArcMutex
=> Mutex::new => Arc::new
}
}

/// 增加「打印输出」侦听器
/// * 🎯(与Websocket一同)分离「输出侦听」逻辑
/// * 🎯统一给管理者添加功能
/// * ❓后续可配置
fn add_output_listener(output_cache: &mut OutputCache) {
output_cache.output_handlers.add_handler(|output| {
// 打印输出
println_cli!(&output);
// 继续返回
Some(output)
});
}

/// 【主函数】在运行时启动后,对其进行管理
/// * 🎯健壮性:更多「警告/重来」而非`panic`
/// * 🎯用户友好:尽可能隐藏底层内容
Expand Down Expand Up @@ -222,6 +252,7 @@ where
{
// 缓存输出
// * 🚩在缓存时格式化输出
// TODO: 【2024-04-08 19:15:30】现在必须不再能直接`put`输出了:要兼容Websocket情形
match output_cache.lock() {
Ok(mut output_cache) => output_cache.put(output)?,
Err(e) => eprintln_cli!([Error] "缓存NAVM运行时输出时发生错误:{e}"),
Expand All @@ -237,8 +268,8 @@ where
/// 生成「Websocket服务」子线程
pub fn try_spawn_ws_server(&mut self) -> Result<Option<JoinHandle<Result<()>>>> {
// 若有⇒启动
if let Some(config) = &self.config.websocket {
let thread = spawn_ws_server(self, &config.host, config.port);
if self.config.websocket.is_some() {
let thread = spawn_ws_server(self)?;
return Ok(Some(thread));
}

Expand Down Expand Up @@ -288,9 +319,10 @@ where

// 非空⇒解析输入并执行
if !line.trim().is_empty() {
if let Err(e) = Self::input_line_to_vm(runtime, &line, &config, output_cache) {
println_cli!([Error] "输入过程中发生错误:{e}")
}
if_let_err_eprintln_cli!(
Self::input_line_to_vm(runtime, &line, &config, output_cache)
=> e => [Error] "输入过程中发生错误:{e}"
);
}
}

Expand Down
181 changes: 129 additions & 52 deletions src/bin/babelnar_cli/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
//! * 🎯为BabelNAR CLI实现Websocket IO
//! * 🎯实现专有的Websocket服务端逻辑
use crate::{RuntimeConfig, RuntimeManager};
use crate::{LaunchConfigWebsocket, RuntimeConfig, RuntimeManager};
use anyhow::Result;
use babel_nar::{
cli_support::io::{
navm_output_cache::{ArcMutex, OutputCache},
websocket::{spawn_server, to_address},
cli_support::{
error_handling_boost::error_anyhow,
io::{
navm_output_cache::{ArcMutex, OutputCache},
websocket::to_address,
},
},
eprintln_cli, println_cli,
eprintln_cli, if_let_err_eprintln_cli, println_cli,
};
use navm::{output::Output, vm::VmRuntime};
use std::{
sync::Arc,
thread::{self, JoinHandle},
};
use navm::vm::VmRuntime;
use std::{sync::Arc, thread::JoinHandle};
use ws::{Factory, Handler, Sender};

/// 工具宏:尝试执行,如果失败则上抛错误
Expand All @@ -33,32 +39,82 @@ macro_rules! try_or_return_err {
};
}

/// 通信用代码
/// * 🎯统一有关「通信消息格式」的内容
/// * 📌形式:JSON**对象数组**
/// * ⚠️【2024-04-08 19:08:15】即便一次只回传一条消息,也需包装上方括号`[{...}]`
#[inline]
pub fn format_output_message(output: &Output) -> String {
// 包装成「对象数组」
format!("[{}]", output.to_json_string())
}

/// 入口代码
/// * 🎯生成一个Websocket服务端线程
/// * 🚩不管参数`config`中的地址:可能没有
pub fn spawn_ws_server<R>(
manager: &RuntimeManager<R>,
host: &str,
port: u16,
) -> JoinHandle<Result<()>>
/// * ⚠️此处要求**manager.config.websocket**必须非空,否则会直接panic
/// * 🚩此处手动生成Websocket服务端并启动:提升其「待发消息缓冲区」容量到24576
/// * ❗【2024-04-09 01:20:57】问题缘起:服务端在「突然收到大量消息需要重发」时,可能会直接阻塞线程
/// * 📌【2024-04-09 01:21:37】现在通过配置「最大连接数」与「队列大小」以**暂时缓解**此问题
/// * 🔗参考:<https://docs.rs/ws/latest/ws/struct.Settings.html>
/// * 🔗GitHub issue:<https://github.com/housleyjk/ws-rs/issues/346>
pub fn spawn_ws_server<R>(manager: &mut RuntimeManager<R>) -> Result<JoinHandle<Result<()>>>
where
R: VmRuntime + Send + Sync,
{
// 合并地址
let address = to_address(host, port);
// 提取并合并地址
let LaunchConfigWebsocket { host, port } = manager
.config
.websocket
.as_ref()
.expect("尝试在无配置时启动Websocket服务器");
let address = to_address(host, *port);

// 获取服务端「处理者工厂」
// * 🚩拷贝[`Arc`]
let factory = WSServer {
let server = WSServer {
runtime: manager.runtime.clone(),
output_cache: manager.output_cache.clone(),
config: manager.config.clone(),
};

// 根据专有服务端逻辑,生成子线程并返回
let server = spawn_server(address.clone(), factory);
// 生成定制版的Websocket服务端
// * 🎯获取生成的[`WebSocket`](服务端)对象,调用[`WebSocket::boardcaster`]方法快速广播
// * ❌【2024-04-08 23:23:08】无法独立为单独的函数:此中NAVM运行时「R」的生命周期问题(难以参与推导)
let (handle, sender) = {
let factory = server;
let address = address.clone();
let ws_setting = ws::Settings {
// * 📝使用`ws::Builder`结合`ws::Settings`生成配置
// * ✅在配置中调节「队列大小」以扩宽「连续消息接收限制」
// * 默认:100(最大连接)×5(最长队列)→500条后阻塞
// * 🚩【2024-04-09 01:03:52】现在调整成「最多32个连接,每个连接最多768条消息」
// * ⚠️仍然会在24576条消息后产生阻塞——但相比原先500条,情况少很多
max_connections: 0x20,
queue_size: 0x300,
..Default::default()
};
let server = ws::Builder::new()
.with_settings(ws_setting)
.build(factory)?;
let sender = server.broadcaster();
let handle = thread::spawn(move || {
server.listen(address)?;
// ! ❌此处不能缩并:必须转换为`anyhow::Error`
Ok(())
});
(handle, sender)
};
println_cli!([Info] "Websocket服务器已在 {:?} 启动", address);
server

// 向(服务端自身)「输出缓存」添加侦听器
if_let_err_eprintln_cli! {
// ! 此处需要可变的`manager`
register_listener(&mut manager.output_cache, sender)
=> e => [Error] "无法为服务端注册侦听器:{e}"
}

// 返回线程句柄
Ok(handle)
}

/// 一个Websocket连接
Expand Down Expand Up @@ -106,32 +162,22 @@ where
let output_cache = &mut *try_or_return_err!(self.output_cache.lock(); err => "在Websocket连接中获取输出缓存失败:{err}");

// 输入信息,并监控缓存的新输出
if let Err(err) =
RuntimeManager::input_line_to_vm(runtime, &msg.to_string(), config, output_cache)
{
eprintln_cli!([Error] "在Websocket连接中输入「{msg}」时发生错误:{err}")
// * 📝【2024-04-08 22:10:17】现在查明「Websocket线程阻塞」问题在Websocket「回传发送者」的`send`调用中
if_let_err_eprintln_cli! {
RuntimeManager::input_line_to_vm(
runtime,
&msg.to_string(),
config,
output_cache,
)
=> err => [Error] "在Websocket连接中输入「{msg}」时发生错误:{err}"
}

// ! 🚩此处无法回传输出:输出捕捉在缓存中处理的地方
// if new_len_cache > old_len_cache {
// let mut output;
// let mut json_text;
// // 逐个获取
// for i in (old_len_cache - 1)..new_len_cache {
// output = &output_cache.borrow_inner()[i];
// json_text = output.to_json_string();
// // 回传,若出错仅输出错误
// if let Err(e) = self.sender.send(json_text.clone()) {
// eprintln_cli!([Error] "尝试回传消息「{json_text}」时发生错误:{e}");
// }
// }
// }

Ok(())
}

fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
println_cli!([Info] "Websocket连接关闭(退出码:{code:?};原因:「{reason}」)")
println_cli!([Info] "Websocket连接关闭(退出码:{code:?};原因:「{reason}」)");
}

fn on_error(&mut self, err: ws::Error) {
Expand Down Expand Up @@ -172,27 +218,57 @@ where
pub(crate) output_cache: ArcMutex<OutputCache>,
}

/// 向所有「回传发送者」广播NAVM输出
/// * 🎯回传所侦听到的NAVM输出
pub(crate) fn broadcast_to_senders(
// senders: &mut ArcMutex<ResendSenders>,
broadcaster: &mut Sender,
output: &Output,
) -> Result<()> {
let output_str = format_output_message(output);

// println_cli!([Debug] "🏗️正在向接收者回传消息:\n{output_str}");
// * 通过一个`broadcaster`直接向所有连接广播消息
if_let_err_eprintln_cli! {
broadcaster.send(output_str.to_string())
=> e => [Error] "广播消息失败:{e}"
};

// println_cli!([Debug] "✅向接收者回传消息完成:\n{output_str}");

Ok(())
}

/// 向「输出缓存」注册侦听器
/// * 🎯绑定侦听器到输出缓存中,以便在「侦听器有输出」时广播
/// * 🎯现在只有「输出缓存」会留存:因为`WebSocket.broadcaster`只在服务器启动后创建
pub(crate) fn register_listener(
output_cache: &mut ArcMutex<OutputCache>,
mut broadcaster: Sender,
) -> Result<()> {
// 尝试解包「输出缓存」
let output_cache = &mut *output_cache.lock().map_err(error_anyhow)?;
output_cache.output_handlers.add_handler(move |output| {
// 广播
if_let_err_eprintln_cli! {
broadcast_to_senders(&mut broadcaster, &output)
=> e => [Error] "Websocket回传广播到发送者时出现错误:{:?}", e
}
// 返回
Some(output)
});
Ok(())
}

impl<R> Factory for WSServer<R>
where
R: VmRuntime + Send + Sync + 'static,
{
type Handler = Connection<R>;

fn connection_made(&mut self, sender: Sender) -> Connection<R> {
println_cli!([Info] "Websocket连接已建立");
let id = sender.connection_id();
// 尝试添加「发送者」
match self.output_cache.lock() {
Ok(mut output_cache) => {
let output_cache = &mut *output_cache;
// 添加「发送者」
output_cache.websocket_senders.push(sender);
}
Err(err) => {
// 输出错误
println_cli!([Error] "Websocket输出侦听器添加失败:{err}");
}
}
println_cli!([Info] "Websocket连接已在id {id} 处建立");
// 返回连接
Connection {
runtime: self.runtime.clone(),
Expand All @@ -203,6 +279,7 @@ where
}

fn on_shutdown(&mut self) {
// 打印消息
println_cli!([Info] "Websocket服务器已关停")
}

Expand Down
Loading

0 comments on commit 3632527

Please sign in to comment.