Skip to content

Commit

Permalink
Merge pull request #65 from keynslug/sync/emqx-OTP-27.1/rebase
Browse files Browse the repository at this point in the history
sync(27.2): rebase emqx-OTP-27.1-2 stripped from irrelevant changes
  • Loading branch information
keynslug authored Dec 18, 2024
2 parents 6bf99d6 + 83b4238 commit 45379d7
Show file tree
Hide file tree
Showing 40 changed files with 855 additions and 78 deletions.
2 changes: 1 addition & 1 deletion OTP_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
27.2
27.2-1
1 change: 1 addition & 0 deletions erts/emulator/beam/atom.names
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ atom monitors
atom monotonic
atom monotonic_timestamp
atom more
atom mqtt
atom multi_scheduling
atom multiline
atom nano_seconds
Expand Down
4 changes: 2 additions & 2 deletions erts/emulator/beam/erl_bif_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ static char erts_system_version[] = ("Erlang/OTP " ERLANG_OTP_RELEASE
" [erts-" ERLANG_VERSION "]"
#ifndef OTP_RELEASE
#ifdef ERLANG_GIT_VERSION
" [source-" ERLANG_GIT_VERSION "]"
" [emqx-" ERLANG_GIT_VERSION "]"
#else
" [source]"
" [emqx]"
#endif
#endif
#if defined(ARCH_64)
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_bif_port.c
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ BIF_RETTYPE decode_packet_3(BIF_ALIST_3)
case am_httph: type = TCP_PB_HTTPH; break;
case am_http_bin: type = TCP_PB_HTTP_BIN; break;
case am_httph_bin: type = TCP_PB_HTTPH_BIN; break;
case am_mqtt: type = TCP_PB_MQTT; break;
case am_ssl_tls: type = TCP_PB_SSL_TLS; break;
default:
BIF_P->fvalue = am_badopt;
Expand Down
10 changes: 5 additions & 5 deletions erts/emulator/beam/erl_time_sup.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ update_last_mtime(ErtsSchedulerData *esdp, ErtsMonotonicTime mtime)
if (esdp->last_monotonic_time > mtime) {
ERTS_ASSERT(esdp == erts_get_scheduler_data());
erts_exit(ERTS_ABORT_EXIT,
"Erlang monotonic time stepped backwards!\n"
"Erlang monotonic time stepped backwards! Maybe add '+c false' in vm.args\n"
"Previous time: %b64d\n"
"Current time: %b64d\n",
esdp->last_monotonic_time,
Expand All @@ -287,7 +287,7 @@ check_os_monotonic_time(ErtsSchedulerData *esdp, ErtsMonotonicTime mtime)
if (esdp->last_os_monotonic_time > mtime) {
ERTS_ASSERT(esdp == erts_get_scheduler_data());
erts_exit(ERTS_ABORT_EXIT,
"OS monotonic time stepped backwards!\n"
"OS monotonic time stepped backwards! Maybe add '+c false' in vm.args\n"
"Previous time: %b64d\n"
"Current time: %b64d\n",
esdp->last_os_monotonic_time,
Expand Down Expand Up @@ -374,7 +374,7 @@ read_corrected_time(int os_drift_corrected, ErtsSchedulerData *esdp)
else {
if (os_mtime < time_sup.inf.c.parmon.cdata.insts.prev.os_mtime)
erts_exit(ERTS_ABORT_EXIT,
"OS monotonic time stepped backwards\n");
"OS monotonic time stepped backwards, maybe add '+c false' in vm.args\n");
ci = time_sup.inf.c.parmon.cdata.insts.prev;
}

Expand Down Expand Up @@ -473,7 +473,7 @@ check_time_correction(void *vesdp)

if (os_mtime < ci.os_mtime)
erts_exit(ERTS_ABORT_EXIT,
"OS monotonic time stepped backwards\n");
"OS monotonic time stepped backwards, maybe add '+c false' in vm.args\n");

erl_mtime = calc_corrected_erl_mtime(os_mtime, &ci, &mdiff,
os_drift_corrected);
Expand Down Expand Up @@ -901,7 +901,7 @@ finalize_corrected_time_offset(ErtsSystemTime *stimep)

if (os_mtime < ci.os_mtime)
erts_exit(ERTS_ABORT_EXIT,
"OS monotonic time stepped backwards\n");
"OS monotonic time stepped backwards, maybe add '+c false' in vm.args\n");

return calc_corrected_erl_mtime(os_mtime, &ci, NULL,
os_drift_corrected);
Expand Down
34 changes: 34 additions & 0 deletions erts/emulator/beam/packet_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,40 @@ int packet_get_length(enum PacketParseType htype,
plen = get_int16(&ptr[3]);
}
goto remain;

case TCP_PB_MQTT: {
/* Byte 1: MQTT Control Packet fixed header
* Bytes 2-2/3/4/5: Remaining Length (variable byte integer)
*/
byte vb, ptype;
hlen = 2;
plen = 0;
if (n < 1) goto more;
/* Bits 4-8: Packet type */
ptype = ptr[0] >> 4;
/* ERROR: Type 0 is reserved, forbidden */
if (ptype == 0)
goto error;
while (hlen <= 1 + 4) {
if (hlen > n) goto more;
vb = ptr[hlen - 1] & 0x7F;
plen |= vb << (7 * (hlen - 2));
if (ptr[hlen - 1] & 0x80) {
hlen = hlen + 1;
}
else {
/* NOTE: Tolerate minumum-number-of-bytes rule violation
* [MQTT-1.5.5-1]
*/
goto packet;
}
}
/* ERROR: variable byte integer >4 bytes long */
goto error;
packet:
/* No special parsing for now */
goto remain;
}

default:
DEBUGF((" => case error\r\n"));
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/beam/packet_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ enum PacketParseType {
TCP_PB_HTTPH = 11,
TCP_PB_SSL_TLS = 12,
TCP_PB_HTTP_BIN = 13,
TCP_PB_HTTPH_BIN = 14
TCP_PB_HTTPH_BIN = 14,
TCP_PB_MQTT = 15
};

typedef struct http_atom {
Expand Down
34 changes: 29 additions & 5 deletions erts/emulator/test/decode_packet_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

-export([all/0, suite/0,groups/0,
init_per_testcase/2,end_per_testcase/2,
basic/1, ipv6/1, packet_size/1, neg/1, http/1, line/1, ssl/1, otp_8536/1,
otp_9389/1, otp_9389_line/1]).
basic/1, ipv6/1, packet_size/1, neg/1, http/1, line/1, mqtt/1, ssl/1,
otp_8536/1, otp_9389/1, otp_9389_line/1]).

suite() ->
[{ct_hooks,[ts_install_cth]},
{timetrap, {minutes, 1}}].

all() ->
[basic, packet_size, neg, http, line, ssl, otp_8536,
[basic, packet_size, neg, http, line, mqtt, ssl, otp_8536,
otp_9389, otp_9389_line, ipv6].

groups() ->
Expand Down Expand Up @@ -62,7 +62,7 @@ basic(Config) when is_list(Config) ->
{more, undefined} = decode_pkt(2,<<0>>),
{more, undefined} = decode_pkt(4,<<0,0,0>>),

Types = [1,2,4,asn1,sunrm,cdr,fcgi,tpkt,ssl_tls],
Types = [1,2,4,asn1,sunrm,cdr,fcgi,tpkt,mqtt,ssl_tls],

%% Run tests for different header types and bit offsets.

Expand Down Expand Up @@ -185,6 +185,11 @@ pack(tpkt,Bin) ->
Size = byte_size(Bin) + 4,
Res = <<Ver:8,Reserv:8,Size:16,Bin/binary>>,
{Res, Res};
pack(mqtt,Bin) ->
Type = 3, % PUBLISH
Size = pack_mqtt_vbi(byte_size(Bin)),
Res = <<Type:4,0:4,Size/binary,Bin/binary>>,
{Res, Res};
pack(ssl_tls,Bin) ->
Content = case (rand:uniform(256) - 1) of
C when C<128 -> C;
Expand All @@ -208,6 +213,11 @@ pack_ssl(Content, Major, Minor, Body) ->
end,
{Res, {ssl_tls,[],C,{Major,Minor}, Data}}.

pack_mqtt_vbi(N) when N =< 2#01111111 ->
<<0:1, N:7>>;
pack_mqtt_vbi(N) ->
<<1:1, (N rem 2#10000000):7, (pack_mqtt_vbi(N div 2#10000000))/binary>>.

ipv6(Config) when is_list(Config) ->
%% Test with port
Packet = <<"GET http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:4000/echo_components HTTP/1.1\r\nhost: orange\r\n\r\n">>,
Expand Down Expand Up @@ -242,7 +252,7 @@ packet_size(Config) when is_list(Config) ->
ok
end
end,
lists:foreach(F, [{T,D} || T<-[1,2,4,asn1,sunrm,cdr,fcgi,tpkt,ssl_tls],
lists:foreach(F, [{T,D} || T<-[1,2,4,asn1,sunrm,cdr,fcgi,tpkt,mqtt,ssl_tls],
D<-lists:seq(0, byte_size(Packet)*2)]),

%% Test OTP-8102, "negative" 4-byte sizes.
Expand Down Expand Up @@ -303,6 +313,20 @@ neg(Config) when is_list(Config) ->
ok.


mqtt(_Config) ->
Type = 1, % CONNECT
{more, undefined} = decode_pkt(mqtt,<<>>),
{error, invalid} = decode_pkt(mqtt,<<0>>),
{more, undefined} = decode_pkt(mqtt,<<Type:4,0:4>>),
{more, 2 + 10} = decode_pkt(mqtt,<<Type:4,0:4, 10:8>>),
{more, undefined} = decode_pkt(mqtt,<<Type:4,0:4, 1:1,10:7>>),
{more, 3 + 138} = decode_pkt(mqtt,<<Type:4,0:4, 1:1,10:7,1:8>>),
{more, 5 + 13*128*128*128 + 12*128*128 + 11*128 + 10} =
decode_pkt(mqtt,<<Type:4,0:4, 1:1,10:7,1:1,11:7,1:1,12:7,0:1,13:7>>),
{error, invalid} =
decode_pkt(mqtt,<<Type:4,0:4, 1:1,10:7,1:1,11:7,1:1,12:7,1:1,13:7>>).


http(Config) when is_list(Config) ->
<<"foo">> = http_do(http_request("foo")),
<<" bar">> = http_do(http_request(" bar")),
Expand Down
Binary file modified erts/preloaded/ebin/erlang.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/prim_inet.beam
Binary file not shown.
7 changes: 5 additions & 2 deletions erts/preloaded/src/erlang.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ returned.
latin-1 newline character. The delimiter byte is included in the returned
packet unless the line was truncated according to option `line_length`.
- **`asn1 | cdr | sunrm | fcgi | tpkt`** - The header is _not_ stripped off.
- **`asn1 | cdr | sunrm | fcgi | tpkt | mqtt`** - The header is _not_ stripped off.
The meanings of the packet types are as follows:
Expand All @@ -1737,6 +1737,8 @@ returned.
- **`tpkt` \- TPKT format \[RFC1006]**
- **`mqtt` \- MQTT packet \[mqtt-v5.0\] / \[mqtt-v3.1.1\]
- **`http | httph | http_bin | httph_bin`** - The Hypertext Transfer Protocol.
The packets are returned with the format according to `HttpPacket` described
earlier. A packet is either a request, a response, a header, or an end of
Expand Down Expand Up @@ -1791,7 +1793,8 @@ Examples:
{more, Length} |
{error, Reason} when
Type :: 'raw' | 0 | 1 | 2 | 4 | 'asn1' | 'cdr' | 'sunrm' | 'fcgi'
| 'tpkt' | 'line' | 'http' | 'http_bin' | 'httph' | 'httph_bin',
| 'tpkt' | 'line' | 'http' | 'http_bin' | 'httph' | 'httph_bin'
| 'mqtt',
Bin :: binary(),
Options :: [Opt],
Opt :: {packet_size, non_neg_integer()}
Expand Down
1 change: 1 addition & 0 deletions erts/preloaded/src/prim_inet.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,7 @@ type_opt_1(packet) ->
{httph,?TCP_PB_HTTPH},
{http_bin, ?TCP_PB_HTTP_BIN},
{httph_bin,?TCP_PB_HTTPH_BIN},
{mqtt, ?TCP_PB_MQTT},
{ssl, ?TCP_PB_SSL_TLS}, % obsolete
{ssl_tls, ?TCP_PB_SSL_TLS}]};
type_opt_1(line_delimiter) -> int;
Expand Down
2 changes: 2 additions & 0 deletions lib/common_test/src/ct_framework.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,8 @@ report(What,Data) ->
add_to_stats(user_skipped);
{_,{auto_skipped,_}} ->
add_to_stats(auto_skipped);
{_,{{failed,keep_going}, _}} ->
ok;
{_,{SkipOrFail,_Reason}} ->
add_to_stats(SkipOrFail)
end;
Expand Down
42 changes: 40 additions & 2 deletions lib/common_test/src/test_server_ctrl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2960,6 +2960,7 @@ run_test_cases_loop([{Mod,Func,Args}=Case|Cases], Config, TimetrapData, Mode0, S
ok
end,

maybe_put_flaky_info(Mode0),
case run_test_case(undefined, Num+1, Mod, Func, Args,
RunInit, TimetrapData, Mode) of
%% callback to framework module failed, exit immediately
Expand Down Expand Up @@ -3934,6 +3935,8 @@ run_test_case1(Ref, Num, Mod, Func, Args, RunInit,
ok;
{_,ok} ->
put(test_server_ok, get(test_server_ok)+1);
{_,{failed, keep_going}} ->
ok;
{_,failed} ->
put(test_server_failed, get(test_server_failed)+1);
{_,skip} ->
Expand Down Expand Up @@ -4173,8 +4176,15 @@ progress(failed, CaseNum, Mod, Func, GrName, Loc, Reason, T,
print(major, "=elapsed ~.6fs", [Time]),
print(1, "*** FAILED ~ts ***",
[get_info_str(Mod,Func, CaseNum, get(test_server_cases))]),

StatusTag = case should_keep_going() of
true ->
{failed, keep_going};
false ->
failed
end,
test_server_sup:framework_call(report, [tc_done,{Mod,{Func,GrName},
{failed,Reason}}]),
{StatusTag,Reason}}]),
TimeStr = io_lib:format("~.fs", [Time]),
Comment =
case Comment0 of
Expand All @@ -4193,7 +4203,7 @@ progress(failed, CaseNum, Mod, Func, GrName, Loc, Reason, T,
print(minor, "~ts",
["=== Reason: " ++
escape_chars(io_lib:format(FStr, [FormattedReason]))]),
failed;
StatusTag;

progress(ok, _CaseNum, Mod, Func, GrName, _Loc, RetVal, T,
Comment0, {St0,St1}) ->
Expand Down Expand Up @@ -5818,6 +5828,7 @@ encoding(File) ->

check_repeat_testcase(Case,Result,Cases,
[{Ref,[{repeat,RepeatData0}],StartTime}|Mode0]) ->
maybe_report_flaky_test(Case, RepeatData0),
case do_update_repeat_data(Result,RepeatData0) of
false ->
{Cases,Mode0};
Expand All @@ -5835,6 +5846,10 @@ do_update_repeat_data(ok,{repeat_until_ok=RT,M,N}) ->
report_repeat_testcase(M,N),
report_stop_repeat_testcase(RT,{RT,N}),
false;
do_update_repeat_data(ok,{flaky=RT,M,N}) ->
report_repeat_testcase(M,N),
report_stop_repeat_testcase(RT,{RT,N}),
false;
do_update_repeat_data(failed,{repeat_until_fail=RT,M,N}) ->
report_repeat_testcase(M,N),
report_stop_repeat_testcase(RT,{RT,N}),
Expand All @@ -5854,3 +5869,26 @@ report_repeat_testcase(M,forever) ->
print(minor, "~n=== Repeated test case: ~w of infinity", [M]);
report_repeat_testcase(M,N) ->
print(minor, "~n=== Repeated test case: ~w of ~w", [M,N]).

maybe_report_flaky_test({Mod, Func, _Args}, {flaky,N,_Max}) ->
case N > 1 of
true ->
print(minor, "~n=== FLAKY test case: ~w:~w", [Mod, Func]);
false ->
ok
end;
maybe_report_flaky_test(_TestCase, _RepeatInfo) ->
ok.

maybe_put_flaky_info([{_Ref, [{repeat, {flaky, N, M}}], _Time} | _]) ->
put('$ct_flaky_info', {N, M});
maybe_put_flaky_info(_) ->
erase('$ct_flaky_info').

should_keep_going() ->
case get('$ct_flaky_info') of
{N, M} when N < M ->
true;
_ ->
false
end.
Loading

0 comments on commit 45379d7

Please sign in to comment.