@@ -1113,6 +1113,23 @@ impl Session {
1113
1113
let state = zread ! ( self . state) ;
1114
1114
self . update_status_up ( & state, & key_expr)
1115
1115
}
1116
+ } else if key_expr
1117
+ . as_str ( )
1118
+ . starts_with ( crate :: liveliness:: PREFIX_LIVELINESS )
1119
+ {
1120
+ let primitives = state. primitives . as_ref ( ) . unwrap ( ) . clone ( ) ;
1121
+ drop ( state) ;
1122
+
1123
+ primitives. send_declare ( Declare {
1124
+ ext_qos : declare:: ext:: QoSType :: DECLARE ,
1125
+ ext_tstamp : None ,
1126
+ ext_nodeid : declare:: ext:: NodeIdType :: DEFAULT ,
1127
+ body : DeclareBody :: DeclareInterest ( DeclareInterest {
1128
+ id,
1129
+ wire_expr : Some ( key_expr. to_wire ( self ) . to_owned ( ) ) ,
1130
+ interest : Interest :: KEYEXPRS + Interest :: SUBSCRIBERS + Interest :: FUTURE ,
1131
+ } ) ,
1132
+ } ) ;
1116
1133
}
1117
1134
1118
1135
Ok ( sub_state)
@@ -1170,6 +1187,23 @@ impl Session {
1170
1187
self . update_status_down ( & state, & sub_state. key_expr )
1171
1188
}
1172
1189
}
1190
+ } else if sub_state
1191
+ . key_expr
1192
+ . as_str ( )
1193
+ . starts_with ( crate :: liveliness:: PREFIX_LIVELINESS )
1194
+ {
1195
+ let primitives = state. primitives . as_ref ( ) . unwrap ( ) . clone ( ) ;
1196
+ drop ( state) ;
1197
+
1198
+ primitives. send_declare ( Declare {
1199
+ ext_qos : declare:: ext:: QoSType :: DECLARE ,
1200
+ ext_tstamp : None ,
1201
+ ext_nodeid : declare:: ext:: NodeIdType :: DEFAULT ,
1202
+ body : DeclareBody :: UndeclareInterest ( UndeclareInterest {
1203
+ id : sub_state. id ,
1204
+ ext_wire_expr : WireExprType :: null ( ) ,
1205
+ } ) ,
1206
+ } ) ;
1173
1207
}
1174
1208
Ok ( ( ) )
1175
1209
} else {
@@ -2026,7 +2060,7 @@ impl Primitives for Session {
2026
2060
} ;
2027
2061
self . handle_data (
2028
2062
false ,
2029
- & m . ext_wire_expr . wire_expr ,
2063
+ & expr . to_wire ( self ) ,
2030
2064
Some ( data_info) ,
2031
2065
ZBuf :: default ( ) ,
2032
2066
#[ cfg( feature = "unstable" ) ]
0 commit comments