diff --git a/OTP_VERSION b/OTP_VERSION
index a306e32d1de8..13b1d9802ef1 100644
--- a/OTP_VERSION
+++ b/OTP_VERSION
@@ -1 +1 @@
-26.2.3
+26.2.3-1
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index f9d95a83caaf..5da89303c683 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -88,9 +88,9 @@ static char erts_system_version[] = ("Erlang/OTP " ERLANG_OTP_RELEASE
 				     " [erts-" ERLANG_VERSION "]"
 #ifndef OTP_RELEASE
 #ifdef ERLANG_GIT_VERSION
-				     " [source-" ERLANG_GIT_VERSION "]"
+				     " [emqx-" ERLANG_GIT_VERSION "]"
 #else
-				     " [source]"
+				     " [emqx]"
 #endif
 #endif	
 #if defined(ARCH_64)
diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl
index 88fcf5b4e0cd..a272d779e162 100644
--- a/lib/kernel/src/gen_tcp.erl
+++ b/lib/kernel/src/gen_tcp.erl
@@ -28,6 +28,7 @@
 -export([send/2, recv/2, recv/3, unrecv/2]).
 -export([controlling_process/2]).
 -export([fdopen/2]).
+-export([ipv6_probe/0]).
 
 -include("inet_int.hrl").
 -include("file.hrl").
@@ -120,7 +121,8 @@
         recvtclass |
         recvttl |
         pktoptions |
-	ipv6_v6only.
+        ipv6_v6only |
+        ipv6_probe.
 -type connect_option() ::
         {fd, Fd :: non_neg_integer()} |
         inet:address_family() |
@@ -131,6 +133,7 @@
         {tcp_module, module()} |
         {netns, file:filename_all()} |
         {bind_to_device, binary()} |
+        {ipv6_probe, boolean() | timeout()} |
         option().
 -type listen_option() ::
         {fd, Fd :: non_neg_integer()} |
@@ -157,6 +160,8 @@
 %% Connect a socket
 %%
 
+ipv6_probe() -> true.
+
 -spec connect(SockAddr, Opts) -> {ok, Socket} | {error, Reason} when
       SockAddr :: socket:sockaddr_in() | socket:sockaddr_in6(),
       Opts     :: [inet:inet_backend() | connect_option()],
@@ -218,21 +223,89 @@ connect(#{family := Fam} = SockAddr, Opts, Timeout)
       Reason  :: timeout | inet:posix().
 
 connect(Address, Port, Opts0, Timeout) ->
-    case inet:gen_tcp_module(Opts0) of
+    %% When neither `inet` nor `inet6` is provided in Opts0,
+    %% and if `ipv6_probe` option is given, try to connect ipv6 first.
+    {TryIpv6, Ipv6T} =
+        case proplists:get_value(ipv6_probe, Opts0) of
+                  true -> {true, 2000}; %% default 2 seconds
+                  false -> {false, 0};
+                  undefined -> {false, 0};
+                  T -> {true, T}
+              end,
+    %% delete it to avoid interference
+    Opts1 = proplists:delete(ipv6_probe, Opts0),
+    case inet:gen_tcp_module(Opts1) of
         {?MODULE, Opts} ->
-            Timer = inet:start_timer(Timeout),
-            Res = (catch connect1(Address,Port,Opts,Timer)),
-            _ = inet:stop_timer(Timer),
-            case Res of
-                {ok,S} -> {ok,S};
-                {error, einval} -> exit(badarg);
-                {'EXIT',Reason} -> exit(Reason);
-                Error -> Error
-            end;
+            connect_maybe_ipv6(Address, Port, Opts, Timeout, TryIpv6, Ipv6T);
         {GenTcpMod, Opts} ->
             GenTcpMod:connect(Address, Port, Opts, Timeout)
     end.
 
+connect_maybe_ipv6(Address, Port, Opts, Timeout, TryIpv6, Ipv6T) ->
+    case maybe_ipv6(Address, Opts, TryIpv6) of
+        {maybe, NewOpts} when TryIpv6 ->
+            try
+                {ok, _} = connect_0(Address, Port, NewOpts, Ipv6T)
+            catch
+                _ : _ ->
+                    %% fallback
+                    connect_0(Address, Port, Opts, Timeout)
+            end;
+        NewOpts ->
+            connect_0(Address, Port, NewOpts, Timeout)
+    end.
+
+connect_0(Address, Port, Opts, Timeout) ->
+    Timer = inet:start_timer(Timeout),
+    Res = (catch connect1(Address,Port,Opts,Timer)),
+    _ = inet:stop_timer(Timer),
+    case Res of
+        {ok,S} -> {ok,S};
+        {error, einval} -> exit(badarg);
+        {'EXIT',Reason} -> exit(Reason);
+        Error -> Error
+    end.
+
+maybe_ipv6({local, _}, Opts, _TryIpv6) ->
+    %% unapplicable to local sockets
+    Opts;
+maybe_ipv6(Host, Opts, TryIpv6) ->
+    case lists:member(inet, Opts) orelse lists:member(inet6, Opts) of
+        true ->
+            Opts; %% caller has made the decision
+        false when is_tuple(Host) ->
+            %% ip tuple provided
+            maybe_ipv6_1(Host, Opts);
+        false when TryIpv6 ->
+            %% string host
+            maybe_ipv6_2(Host, Opts);
+        false ->
+            Opts
+    end.
+
+maybe_ipv6_1(Ip, Opts) when tuple_size(Ip) =:= 4 -> Opts;
+maybe_ipv6_1(Ip, Opts) when tuple_size(Ip) =:= 8 -> [inet6 | Opts].
+
+maybe_ipv6_2(Host, Opts) ->
+    case inet:parse_address(Host) of
+        {ok, Ip} when is_tuple(Ip) ->
+            %% ip string provided, parsed into tuple
+            maybe_ipv6_1(Ip, Opts);
+        _ ->
+            maybe_ipv6_3(Host, Opts)
+    end.
+
+maybe_ipv6_3(Host, Opts) ->
+    case inet:getaddr(Host, inet6) of
+        {ok, _} ->
+            %% the target has a resolvable v6 IP
+            %% maybe try to connect
+            {maybe, [inet6 | Opts]};
+        _ ->
+            %% the target has no resolvable v6 IP
+            Opts
+    end.
+
 connect1(Address, Port, Opts0, Timer) ->
     {Mod, Opts} = inet:tcp_module(Opts0, Address),
     case Mod:getaddrs(Address, Timer) of
diff --git a/lib/mnesia/src/Makefile b/lib/mnesia/src/Makefile
index 72aa054fb326..ec540ce4e482 100644
--- a/lib/mnesia/src/Makefile
+++ b/lib/mnesia/src/Makefile
@@ -55,6 +55,7 @@ MODULES= \
 	mnesia_ext_sup \
 	mnesia_frag \
 	mnesia_frag_hash \
+	mnesia_hook \
 	mnesia_index \
 	mnesia_kernel_sup \
 	mnesia_late_loader \
diff --git a/lib/mnesia/src/mnesia.app.src b/lib/mnesia/src/mnesia.app.src
index 6ce0c68de8cd..23bc77c97701 100644
--- a/lib/mnesia/src/mnesia.app.src
+++ b/lib/mnesia/src/mnesia.app.src
@@ -15,6 +15,7 @@
 	     mnesia_ext_sup,
 	     mnesia_frag,
 	     mnesia_frag_hash,
+             mnesia_hook,
 	     mnesia_index,
              mnesia_kernel_sup,
 	     mnesia_late_loader,
diff --git a/lib/mnesia/src/mnesia.erl b/lib/mnesia/src/mnesia.erl
index 6ddd75728e06..ebb34dc500e1 100644
--- a/lib/mnesia/src/mnesia.erl
+++ b/lib/mnesia/src/mnesia.erl
@@ -99,6 +99,7 @@
 	 read_table_property/2, write_table_property/2, delete_table_property/2,
 	 change_table_frag/2,
 	 clear_table/1, clear_table/4,
+         match_delete/2,
 
 	 %% Table load
 	 dump_tables/1, wait_for_tables/2, force_load_table/1,
@@ -2812,21 +2813,25 @@ change_table_copy_type(T, N, S) ->
 
 -spec clear_table(Tab::table()) -> t_result('ok').
 clear_table(Tab) ->
+    match_delete(Tab, '_').
+
+-spec match_delete(Tab::table(), ets:match_pattern()) -> t_result('ok').
+match_delete(Tab, Pattern) ->
     case get(mnesia_activity_state) of
 	State = {Mod, Tid, _Ts} when element(1, Tid) =/= tid ->
-	    transaction(State, fun() -> do_clear_table(Tab) end, [], infinity, Mod, sync);
+	    transaction(State, fun() -> do_clear_table(Tab, Pattern) end, [], infinity, Mod, sync);
 	undefined ->
-	    transaction(undefined, fun() -> do_clear_table(Tab) end, [], infinity, ?DEFAULT_ACCESS, sync);
+	    transaction(undefined, fun() -> do_clear_table(Tab, Pattern) end, [], infinity, ?DEFAULT_ACCESS, sync);
 	_ -> %% Not allowed for clear_table
 	    mnesia:abort({aborted, nested_transaction})
     end.
 
-do_clear_table(Tab) ->
+do_clear_table(Tab, Pattern) ->
     case get(mnesia_activity_state) of
 	{?DEFAULT_ACCESS, Tid, Ts}  ->
-	    clear_table(Tid, Ts, Tab, '_');
+	    clear_table(Tid, Ts, Tab, Pattern);
 	{Mod, Tid, Ts} ->
-	    Mod:clear_table(Tid, Ts, Tab, '_');
+	    Mod:clear_table(Tid, Ts, Tab, Pattern);
 	_ ->
 	    abort(no_transaction)
     end.
diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl
index ed1c0df605e9..505c604e4ddd 100644
--- a/lib/mnesia/src/mnesia_checkpoint.erl
+++ b/lib/mnesia/src/mnesia_checkpoint.erl
@@ -30,6 +30,7 @@
 	 tm_prepare/1,
 	 tm_retain/4,
 	 tm_retain/5,
+	 tm_retain/6,
 	 tm_enter_pending/1,
 	 tm_enter_pending/3,
 	 tm_exit_pending/1
@@ -148,7 +149,6 @@ enter_still_pending([Tid | Tids], Tab) ->
 enter_still_pending([], _Tab) ->
     ok.
 
-
 %% Looks up checkpoints for functions in mnesia_tm.
 tm_retain(Tid, Tab, Key, Op) ->
     case val({Tab, commit_work}) of
@@ -157,11 +157,14 @@ tm_retain(Tid, Tab, Key, Op) ->
 	_ -> 
 	    undefined
     end.
-    
+
 tm_retain(Tid, Tab, Key, Op, Checkpoints) ->
+    tm_retain(Tid, Tab, Key, Op, Checkpoints, '_').
+
+tm_retain(Tid, Tab, Key, Op, Checkpoints, Obj) ->
     case Op of
 	clear_table ->
-	    OldRecs = mnesia_lib:db_match_object(Tab, '_'),
+	    OldRecs = mnesia_lib:db_match_object(Tab, Obj),
 	    send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),
 	    OldRecs;
 	_ ->
diff --git a/lib/mnesia/src/mnesia_dumper.erl b/lib/mnesia/src/mnesia_dumper.erl
index cd712c34ade6..54da818181c2 100644
--- a/lib/mnesia/src/mnesia_dumper.erl
+++ b/lib/mnesia/src/mnesia_dumper.erl
@@ -404,8 +404,12 @@ dets_insert(Op,Tab,Key,Val, Storage0) ->
 	    dets_updated(Tab,Key),
 	    mnesia_lib:db_match_erase(Storage, Tab, Val);
 	clear_table ->
-	    dets_cleared(Tab),
-	    ok = mnesia_lib:db_match_erase(Storage, Tab, '_')
+            %% Val is a match_delete pattern
+            case Val of
+                '_' -> dets_cleared(Tab);
+                _ -> dets_updated(Tab, Val)
+            end,
+	    ok = mnesia_lib:db_match_erase(Storage, Tab, Val)
     end.
 
 dets_updated(Tab,Key) ->
diff --git a/lib/mnesia/src/mnesia_hook.erl b/lib/mnesia/src/mnesia_hook.erl
new file mode 100644
index 000000000000..cb23868db68d
--- /dev/null
+++ b/lib/mnesia/src/mnesia_hook.erl
@@ -0,0 +1,105 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2021. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(mnesia_hook).
+
+-include("mnesia.hrl").
+
+-export([
+         register_hook/2,
+         unregister_hook/1,
+         do_post_commit/2
+        ]).
+
+-define(hook(NAME), {mnesia_hook, NAME}).
+
+-type post_commit_hook_data() ::
+        #{ node => node()
+         , ram_copies => list()
+         , disc_copies => list()
+         , disc_only_copies => list()
+         , ext => list()
+         , schema_ops => list()
+         }.
+
+-type post_commit_hook() :: fun((_Tid, post_commit_hook_data()) -> ok).
+
+-spec register_hook(post_commit, post_commit_hook()) -> ok | {error, term()}.
+register_hook(post_commit, Hook) when is_function(Hook, 2) ->
+    persistent_term:put(?hook(post_commit), Hook);
+register_hook(_, _) ->
+    {error, bad_type}.
+
+-spec unregister_hook(post_commit) -> boolean() | {error, term()}.
+unregister_hook(post_commit) ->
+    persistent_term:erase(?hook(post_commit));
+unregister_hook(_) ->
+    {error, bad_type}.
+
+-spec do_post_commit(_Tid, #commit{}) -> ok.
+do_post_commit(Tid, Commit) ->
+    case persistent_term:get(?hook(post_commit), undefined) of
+        undefined ->
+            ok;
+        Fun ->
+            #commit{ node = Node
+                   , ram_copies = Ram
+                   , disc_copies = Disc
+                   , disc_only_copies = DiscOnly
+                   , ext = Ext
+                   , schema_ops = SchemaOps
+                   } = Commit,
+            CommitData = #{ node => Node
+                          , ram_copies => Ram
+                          , disc_copies => Disc
+                          , disc_only_copies => DiscOnly
+                          , ext => Ext
+                          , schema_ops => SchemaOps
+                          },
+            try Fun(Tid, CommitData)
+            catch EC:Err:St ->
+                    CommitTabs = commit_tabs(Ram, Disc, DiscOnly, Ext),
+                    mnesia_lib:dbg_out("Mnesia post_commit hook failed: ~p:~p~nStacktrace:~p~nCommit tables:~p~n",
+                                       [EC, Err, stack_without_args(St), CommitTabs])
+            end,
+            ok
+    end.
+
+%% May be helpful for debugging
+commit_tabs(Ram, Disc, DiscOnly, Ext) ->
+    Acc = tabs_from_ops(Ram, []),
+    Acc1 = tabs_from_ops(Disc, Acc),
+    Acc2 = tabs_from_ops(DiscOnly, Acc1),
+    lists:uniq(tabs_from_ops(Ext, Acc2)).
+
+tabs_from_ops([{{Tab, _K}, _Val, _Op} | T], Acc) ->
+    tabs_from_ops(T, [Tab | Acc]);
+tabs_from_ops([_ | T], Acc) ->
+    tabs_from_ops(T, Acc);
+tabs_from_ops([], Acc) ->
+    Acc.
+
+%% Args may contain sensitive data
+stack_without_args([{M, F, Args, Info} | T]) when is_list(Args) ->
+    [{M, F, length(Args), Info} | stack_without_args(T)];
+stack_without_args([StItem | T] ) ->
+    [StItem | stack_without_args(T)];
+stack_without_args([]) ->
+    [].
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index 686412816cb3..787f9a124fd8 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -210,7 +210,19 @@ do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
     verbose("Local table copy of ~tp has recently been deleted, ignored.~n", [Tab]),
     {not_loaded, storage_unknown};
 do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
-    [Node | Tail] = Ns,
+    [Node | Tail] =
+        case ?catch_val(copy_from_node) of
+            undefined -> Ns;
+            CPNode when is_atom(CPNode) ->
+                case lists:member(CPNode, Ns) of
+                    true ->
+                        [CPNode | Ns -- [CPNode]];
+                    false ->
+                        Ns
+                end;
+            _ ->
+                Ns
+        end,
     case lists:member(Node,val({current, db_nodes})) of
 	true ->
 	    dbg_out("Getting table ~tp (~p) from node ~p: ~tp~n",
@@ -917,7 +929,16 @@ get_chunk_func(Pid, Tab, {ext, Alias, Mod}, RemoteS) ->
 get_chunk_func(Pid, Tab, Storage, RemoteS) ->
     try
         TabSize = mnesia:table_info(Tab, size),
-        KeysPerTransfer = calc_nokeys(Storage, Tab),
+        KeysPerTransfer =
+            case ?catch_val(send_table_batch_size) of
+                {'EXIT', _} ->
+                    mnesia_lib:set(send_table_batch_size, 0),
+                    calc_nokeys(Storage, Tab);
+                0 ->
+                    calc_nokeys(Storage, Tab);
+                Val when is_integer(Val) ->
+                    Val
+            end,
         ChunkData = dets:info(Tab, bchunk_format),
         UseDetsChunk =
             Storage == RemoteS andalso
diff --git a/lib/mnesia/src/mnesia_log.erl b/lib/mnesia/src/mnesia_log.erl
index b1514bfbe64e..519b8fac4828 100644
--- a/lib/mnesia/src/mnesia_log.erl
+++ b/lib/mnesia/src/mnesia_log.erl
@@ -1024,9 +1024,14 @@ add_recs([LogH|Rest], N)
        LogH#log_header.log_kind == dcl_log,
        LogH#log_header.log_version >= "1.0" ->
     add_recs(Rest, N);
-add_recs([{{Tab, _Key}, _Val, clear_table} | Rest], N) ->
+add_recs([{{Tab, _Key}, '_', clear_table} | Rest], N) ->
     Size = ets:info(Tab, size),
     true = ets:delete_all_objects(Tab),
     add_recs(Rest, N+Size);
+add_recs([{{Tab, _Key}, Pattern, clear_table} | Rest], N) ->
+    SizeBefore = ets:info(Tab, size),
+    true = ets:match_delete(Tab, Pattern),
+    SizeAfter = ets:info(Tab, size),
+    add_recs(Rest, N+SizeBefore-SizeAfter);
 add_recs([], N) ->
     N.
diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl
index dbd4a9c42f5d..9b06a41108ec 100644
--- a/lib/mnesia/src/mnesia_monitor.erl
+++ b/lib/mnesia/src/mnesia_monitor.erl
@@ -690,8 +690,10 @@ env() ->
      pid_sort_order,
      no_table_loaders,
      dc_dump_limit,
+     copy_from_node,
      send_compressed,
      max_transfer_size,
+     send_table_batch_size,
      schema
     ].
 
@@ -740,10 +742,14 @@ default_env(no_table_loaders) ->
     2;
 default_env(dc_dump_limit) ->
     4;
+default_env(copy_from_node) ->
+    undefined;
 default_env(send_compressed) ->
     0;
 default_env(max_transfer_size) ->
     64000;
+default_env(send_table_batch_size) ->
+    0;
 default_env(schema) ->
     [].
 
@@ -792,8 +798,10 @@ do_check_type(pid_sort_order, "standard") -> standard;
 do_check_type(pid_sort_order, _) -> false;
 do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;
 do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N;
+do_check_type(copy_from_node, L) when is_atom(L) -> L;
 do_check_type(send_compressed, L) when is_integer(L), L >= 0, L =< 9 -> L;
 do_check_type(max_transfer_size, N) when is_integer(N), N > 0 -> N;
+do_check_type(send_table_batch_size, L) when is_integer(L), L >= 0 -> L;
 do_check_type(schema, L) when is_list(L) -> L.
 
 bool(true) -> true;
diff --git a/lib/mnesia/src/mnesia_subscr.erl b/lib/mnesia/src/mnesia_subscr.erl
index d0c298e4253c..894fb3a0813a 100644
--- a/lib/mnesia/src/mnesia_subscr.erl
+++ b/lib/mnesia/src/mnesia_subscr.erl
@@ -152,7 +152,7 @@ report_table_event(Tab, Tid, Obj, Op) ->
 report_table_event(Subscr, Tab, Tid, Obj, Op) ->
     report_table_event(Subscr, Tab, Tid, Obj, Op, undefined).
 
-report_table_event({subscribers, S1, S2}, Tab, Tid, _Obj, clear_table, _Old) ->
+report_table_event({subscribers, S1, S2}, Tab, Tid, '_' = _Obj, clear_table, _Old) ->
     What   = {delete, {schema, Tab}, Tid},
     deliver(S1, {mnesia_table_event, What}),
     TabDef = mnesia_schema:cs2list(?catch_val({Tab, cstruct})),
@@ -163,6 +163,15 @@ report_table_event({subscribers, S1, S2}, Tab, Tid, _Obj, clear_table, _Old) ->
     What4  = {write, schema,  {schema, Tab, TabDef}, [], Tid},
     deliver(S2, {mnesia_table_event, What4});
 
+report_table_event({subscribers, S1, _S2}, Tab, Tid, Obj, clear_table, _Old) ->
+    %% Obj is a match pattern here.
+    %% Sending delete_object event is compatible with `mnesia_loader`,
+    %% that uses `db_match_erase/2` which actually removes records by pattern.
+    %% Extended event is omitted: it's possible to match and get `OldRecords`,
+    %% but the list can be quite large.
+    Standard = {delete_object, patch_record(Tab, Obj), Tid},
+    deliver(S1, {mnesia_table_event, Standard});
+
 report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, _Old) ->
     What = {Op, patch_record(Tab, Obj), Tid},
     deliver(Subscr, {mnesia_table_event, What});
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 10def6d3d7a1..9efcb130f912 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -1849,6 +1849,7 @@ do_commit(Tid, C, DumperMode) ->
     R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),
     R5 = do_update_ext(Tid, C#commit.ext, R4),
     mnesia_subscr:report_activity(Tid),
+    mnesia_hook:do_post_commit(Tid, C),
     R5.
 
 %% This could/should be optimized
@@ -1971,7 +1972,7 @@ commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index
 
 commit_clear([], _, _, _, _, _) ->  ok;
 commit_clear([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) ->
-    mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList),
+    mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList, Obj),
     commit_clear(R, Tid, Storage, Tab, K, Obj);
 commit_clear([H|R], Tid, Storage, Tab, K, Obj)
   when element(1, H) == subscribers ->
diff --git a/lib/mnesia/test/Makefile b/lib/mnesia/test/Makefile
index c3fbad88ca1b..ddd0a9f0d1a0 100644
--- a/lib/mnesia/test/Makefile
+++ b/lib/mnesia/test/Makefile
@@ -54,7 +54,8 @@ MODULES= \
 	mnesia_cost \
 	mnesia_dbn_meters \
 	ext_test \
-	mnesia_index_plugin_test
+	mnesia_index_plugin_test \
+	mnesia_match_delete_test
 
 DocExamplesDir := ../doc/src/
 
diff --git a/lib/mnesia/test/mnesia_SUITE.erl b/lib/mnesia/test/mnesia_SUITE.erl
index 123d16023f11..1b1574ca08e2 100644
--- a/lib/mnesia/test/mnesia_SUITE.erl
+++ b/lib/mnesia/test/mnesia_SUITE.erl
@@ -70,7 +70,7 @@ groups() ->
     [{light, [],
       [{group, install}, {group, nice}, {group, evil},
        {group, mnesia_frag_test, light}, {group, qlc}, {group, index_plugins},
-       {group, registry}, {group, config}, {group, examples}]},
+       {group, registry}, {group, config}, {group, examples}, {group, match_delete}]},
      {install, [], [{mnesia_install_test, all}]},
      {nice, [], [{mnesia_nice_coverage_test, all}]},
      {evil, [], [{mnesia_evil_coverage_test, all}]},
@@ -79,6 +79,7 @@ groups() ->
      {registry, [], [{mnesia_registry_test, all}]},
      {config, [], [{mnesia_config_test, all}]},
      {examples, [], [{mnesia_examples_test, all}]},
+     {match_delete, [], [{mnesia_match_delete_test, all}]},
      %% The 'medium' test suite verfies the ACID (atomicity, consistency
      %% isolation and durability) properties and various recovery scenarios
      %% These tests may take quite while to run.
diff --git a/lib/mnesia/test/mnesia_match_delete_test.erl b/lib/mnesia/test/mnesia_match_delete_test.erl
new file mode 100644
index 000000000000..c77a260fb0b7
--- /dev/null
+++ b/lib/mnesia/test/mnesia_match_delete_test.erl
@@ -0,0 +1,217 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2023. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+-module(mnesia_match_delete_test).
+-include("mnesia_test_lib.hrl").
+
+-export([all/0, groups/0,
+         init_per_group/2, end_per_group/2,
+         init_per_testcase/2, end_per_testcase/2]).
+
+-export([match_delete/1,
+         match_delete_checkpoint/1,
+         match_delete_subscribe/1,
+         match_delete_index/1,
+         match_delete_restart/1,
+         match_delete_dump_restart/1,
+         match_delete_frag/1]).
+
+all() ->
+    [match_delete,
+     match_delete_checkpoint,
+     match_delete_subscribe,
+     match_delete_index,
+     match_delete_restart,
+     match_delete_dump_restart,
+     match_delete_frag].
+
+groups() ->
+    [].
+
+init_per_group(_GroupName, Config) ->
+    Config.
+
+end_per_group(_GroupName, Config) ->
+    Config.
+
+init_per_testcase(Func, Conf) ->
+    mnesia_test_lib:init_per_testcase(Func, Conf).
+
+end_per_testcase(Func, Conf) ->
+    mnesia_test_lib:end_per_testcase(Func, Conf).
+
+match_delete(suite) -> [];
+match_delete(Config) when is_list(Config) ->
+    [Node1, Node2, Node3] = Nodes = ?acquire_nodes(3, Config),
+    Tab = match_delete_tab,
+    Def = [{ram_copies, [Node1]}, {disc_copies, [Node2]}, {disc_only_copies, [Node3]}],
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    ?match({atomic, ok}, write(Tab)),
+    ?match({atomic, ok}, mnesia:match_delete(Tab, {Tab, '_', bar})),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+    ?verify_mnesia(Nodes, []).
+
+match_delete_checkpoint(suite) -> [];
+match_delete_checkpoint(Config) when is_list(Config) ->
+    [Node1, Node2, Node3] = Nodes = ?acquire_nodes(3, Config),
+    Tab = match_delete_retain_tab,
+    Def = [{disc_copies, [Node1, Node2]}, {disc_only_copies, [Node3]}],
+    Checkpoint = ?FUNCTION_NAME,
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    ?match({atomic, ok}, write(Tab)),
+
+    ?match({ok, Checkpoint, _}, mnesia:activate_checkpoint([{name, Checkpoint}, {max, [Tab]}])),
+    ?match({atomic, ok}, mnesia:match_delete(Tab, {Tab, '_', bar})),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+
+    File = "match_delete_backup.BUP",
+    ?match(ok, mnesia:backup_checkpoint(Checkpoint, File)),
+    ?match(ok, mnesia:deactivate_checkpoint(?FUNCTION_NAME)),
+
+    ?match({atomic, [Tab]}, mnesia:restore(File, [{default_op, clear_tables}])),
+    ?match({atomic, [1,2,3,4,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+
+    ?match(ok, file:delete(File)),
+    ?verify_mnesia(Nodes, []).
+
+match_delete_subscribe(suite) -> [];
+match_delete_subscribe(Config) when is_list(Config) ->
+    Nodes = ?acquire_nodes(3, Config),
+    Tab = match_delete_sub_tab,
+    Def = [{ram_copies, Nodes}],
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    ?match({atomic, ok}, write(Tab)),
+    Pattern = {Tab, '_', bar},
+    ?match({ok, _}, mnesia:subscribe({table, Tab})),
+    ?match({atomic, ok}, mnesia:match_delete(Tab, Pattern)),
+    ?match_receive({mnesia_table_event, {delete_object, Pattern, _}}),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+    ?verify_mnesia(Nodes, []).
+
+match_delete_index(suite) -> [];
+match_delete_index(Config) when is_list(Config) ->
+    Nodes = ?acquire_nodes(3, Config),
+    {atomic, ok} = mnesia:create_table(match_delete_index,
+                                       [{index, [ix]}, {attributes, [key, ix, val]},
+                                        {disc_copies, Nodes}]),
+    {atomic, ok} = mnesia:create_table(match_delete_index_ram,
+                                       [{index, [ix]}, {attributes, [key, ix, val]},
+                                        {ram_copies, Nodes}]),
+    {atomic, ok} = mnesia:create_table(match_delete_index_do,
+                                       [{index, [ix]}, {attributes, [key, ix, val]},
+                                        {disc_only_copies, Nodes}]),
+    Test = fun(Tab) ->
+                   Rec = {Tab, 1, 4, data},
+                   Rec2 = {Tab, 2, 5, data},
+                   Rec3 = {Tab, 3, 5, data},
+                   Rec4 = {Tab, 4, 6, data},
+                   Pattern = {Tab, '_', 5, '_'},
+
+                   {atomic, ok} = mnesia:transaction(fun() -> mnesia:write(Rec),
+                                                              mnesia:write(Rec2),
+                                                              mnesia:write(Rec3),
+                                                              mnesia:write(Rec4)
+                                                     end),
+
+                   ?match({atomic, ok}, mnesia:match_delete(Tab, Pattern)),
+
+                   ?match([Rec], mnesia:dirty_index_read(Tab, 4, ix)),
+                   ?match([Rec4], mnesia:dirty_index_read(Tab, 6, ix)),
+                   ?match({atomic, [Rec]}, mnesia:transaction(fun() -> mnesia:index_read(Tab, 4, ix) end)),
+                   ?match({atomic, [Rec4]}, mnesia:transaction(fun() -> mnesia:index_read(Tab, 6, ix) end)),
+
+                   ?match([], mnesia:dirty_index_match_object(Pattern, ix)),
+                   ?match({atomic, []}, mnesia:transaction(fun() -> mnesia:index_match_object(Pattern, ix) end)),
+
+                   ?match([Rec], mnesia:dirty_index_match_object({Tab, '_', 4, '_'}, ix)),
+                   ?match({atomic, [Rec4]},
+                          mnesia:transaction(fun() -> mnesia:index_match_object({Tab, '_', 6, data}, ix) end))
+           end,
+    [Test(Tab) || Tab <- [match_delete_index, match_delete_index_ram, match_delete_index_do]],
+    ?verify_mnesia(Nodes, []).
+
+match_delete_restart(suite) -> [];
+match_delete_restart(Config) when is_list(Config) ->
+    Nodes = ?acquire_nodes(1, Config),
+    Tab = match_delete_log_tab,
+    Def = [{disc_copies, Nodes}],
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    ?match({atomic, ok}, write(Tab)),
+    Pattern = {Tab, '_', bar},
+    ?match({atomic, ok}, mnesia:match_delete(Tab, Pattern)),
+    %% Restart Mnesia right after calling match_delete/2 to verify that
+    %% the table is correctly loaded
+    ?match([], mnesia_test_lib:stop_mnesia(Nodes)),
+    ?match([], mnesia_test_lib:start_mnesia(Nodes, [Tab])),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+    ?verify_mnesia(Nodes, []).
+
+match_delete_dump_restart(suite) -> [];
+match_delete_dump_restart(Config) when is_list(Config) ->
+    [Node1] = Nodes = ?acquire_nodes(1, Config),
+    Tab = match_delete_dump_tab,
+    Def = [{disc_copies, Nodes}],
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    ?match({atomic, ok}, write(Tab)),
+    Pattern = {Tab, '_', bar},
+    ?match({atomic, ok}, mnesia:match_delete(Tab, Pattern)),
+    dumped = rpc:call(Node1, mnesia, dump_log, []),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+    ?match([], mnesia_test_lib:stop_mnesia(Nodes)),
+    ?match([], mnesia_test_lib:start_mnesia(Nodes, [Tab])),
+    ?match({atomic, [1,2,5]}, ?sort(mnesia:transaction(fun() -> mnesia:all_keys(Tab) end))),
+    ?verify_mnesia(Nodes, []).
+
+match_delete_frag(suite) -> [];
+match_delete_frag(Config) when is_list(Config) ->
+    Nodes = ?acquire_nodes(2, Config),
+    Tab = match_delete_frag_tab,
+    FragProps = [{n_fragments, 2}, {node_pool, Nodes}],
+    Def = [{frag_properties, FragProps}, {ram_copies, Nodes}],
+    ?match({atomic, ok}, mnesia:create_table(Tab, Def)),
+    KVs = [{1, foo}, {2, foo},
+           {3, bar}, {4, bar},
+           {5, baz}, {6, baz},
+           {7, foo}, {8, foo}],
+    ?match([ok, ok | _], frag_write(Tab, KVs)),
+    Pattern = {Tab, '_', bar},
+    %% match_delete/2 is a transaction itself
+    ?match({atomic, ok},
+           mnesia:activity(
+             async_dirty, fun(P) -> mnesia:match_delete(Tab, P) end, [Pattern], mnesia_frag)
+          ),
+    Keys = mnesia:activity(transaction, fun() -> mnesia:all_keys(Tab) end, [], mnesia_frag),
+    ?match([1,2,5,6,7,8], ?sort(Keys)),
+    ?verify_mnesia(Nodes, []).
+
+frag_write(Tab, KVs) ->
+    Fun = fun(KVs1) -> [mnesia:write(Tab, {Tab, K, V}, write) || {K, V} <- KVs1] end,
+    mnesia:activity(transaction, Fun, [KVs], mnesia_frag).
+
+write(Tab) ->
+    mnesia:transaction(
+      fun() ->
+              mnesia:write({Tab, 1, foo}),
+              mnesia:write({Tab, 2, foo}),
+              mnesia:write({Tab, 3, bar}),
+              mnesia:write({Tab, 4, bar}),
+              mnesia:write({Tab, 5, baz})
+      end).
diff --git a/lib/ssl/src/ssl.erl b/lib/ssl/src/ssl.erl
index cba37c6f3358..731c8cca0c3d 100644
--- a/lib/ssl/src/ssl.erl
+++ b/lib/ssl/src/ssl.erl
@@ -1655,6 +1655,7 @@ ssl_options() ->
      middlebox_comp_mode,
      max_fragment_length,
      next_protocol_selector,  next_protocols_advertised,
+     certificate_status,
      ocsp_stapling, ocsp_responder_certs, ocsp_nonce,
      padding_check,
      partial_chain,
@@ -1688,10 +1689,11 @@ update_options(Opts, Role, InheritedSslOpts) when is_map(InheritedSslOpts) ->
     {UserSslOpts, _} = split_options(Opts, ssl_options()),
     process_options(UserSslOpts, InheritedSslOpts, #{role => Role}).
 
-process_options(UserSslOpts, SslOpts0, Env) ->
+process_options(UserSslOpts, SslOpts00, Env) ->
     %% Reverse option list so we get the last set option if set twice,
     %% users depend on it.
     UserSslOptsMap = proplists:to_map(lists:reverse(UserSslOpts)),
+    SslOpts0  = opt_certificate_status(UserSslOptsMap, SslOpts00, Env),
     SslOpts1  = opt_protocol_versions(UserSslOptsMap, SslOpts0, Env),
     SslOpts2  = opt_verification(UserSslOptsMap, SslOpts1, Env),
     SslOpts3  = opt_certs(UserSslOptsMap, SslOpts2, Env),
@@ -2048,6 +2050,15 @@ opt_tickets(UserOpts, #{versions := Versions} = Opts, #{role := server}) ->
     Opts#{session_tickets => SessionTickets, early_data => EarlyData,
           anti_replay => AntiReplay, stateless_tickets_seed => STS}.
 
+opt_certificate_status(UserOpts, Opts, #{role := _Role}) ->
+    {_, CertificateStatus} = get_opt(certificate_status, undefined, UserOpts, Opts),
+    case CertificateStatus of
+        undefined -> ok;
+        #certificate_status{} -> ok;
+        _Value -> option_error(certificate_status, CertificateStatus)
+    end,
+    Opts#{certificate_status => CertificateStatus}.
+
 opt_ocsp(UserOpts, #{versions := _Versions} = Opts, #{role := Role}) ->
     {Stapling, SMap} =
         case get_opt(ocsp_stapling, ?DEFAULT_OCSP_STAPLING, UserOpts, Opts) of
diff --git a/lib/ssl/src/ssl_handshake.erl b/lib/ssl/src/ssl_handshake.erl
index 7dd60829a12e..7e2b0e4720e4 100644
--- a/lib/ssl/src/ssl_handshake.erl
+++ b/lib/ssl/src/ssl_handshake.erl
@@ -586,6 +586,9 @@ encode_handshake(#certificate_request{certificate_types = CertTypes,
        <<?BYTE(CertTypesLen), CertTypes/binary,
 	?UINT16(CertAuthsLen), EncCertAuths/binary>>
     };
+encode_handshake(#certificate_status{status_type = StatusType, response = Response}, _Version) ->
+    Size = byte_size(Response),
+    {?CERTIFICATE_STATUS, <<?BYTE(StatusType), ?UINT24(Size), Response/binary>>};
 encode_handshake(#server_hello_done{}, _Version) ->
     {?SERVER_HELLO_DONE, <<>>};
 encode_handshake(#client_key_exchange{exchange_keys = ExchangeKeys}, _Version) ->
@@ -789,7 +792,13 @@ encode_cert_status_req(
         request_extensions = ReqExtns}) ->
     ResponderIDListBin = encode_responderID_list(ResponderIDList),
     ReqExtnsBin = encode_request_extensions(ReqExtns),
-    <<?BYTE(StatusType), ResponderIDListBin/binary, ReqExtnsBin/binary>>.
+    <<?BYTE(StatusType), ResponderIDListBin/binary, ReqExtnsBin/binary>>;
+encode_cert_status_req(_StatusType, #certificate_status{} = Status) ->
+    Version = {3, 4},
+    {_, EncStatus} = encode_handshake(Status, Version),
+    EncStatus;
+encode_cert_status_req(_StatusType, _Value) ->
+    <<>>.
 
 encode_responderID_list([]) ->
     <<?UINT16(0)>>;
@@ -1486,7 +1495,8 @@ handle_client_hello_extensions(RecordCB, Random, ClientCipherSuites,
                                    ec_point_formats => server_ecc_extension(Version, 
                                                                             maps:get(ec_point_formats, Exts, undefined)),
                                    use_srtp => use_srtp_ext(Opts),
-                                   max_frag_enum => ServerMaxFragEnum
+                                   max_frag_enum => ServerMaxFragEnum,
+                                   status_request => handle_status_request(Opts, Exts)
                                   },
     
     %% If we receive an ALPN extension and have ALPN configured for this connection,
@@ -1559,6 +1569,19 @@ handle_server_hello_extensions(RecordCB, Random, CipherSuite, Compression,
             end
     end.
 
+handle_status_request(SSLOptions, ClientExtensions) ->
+    case {SSLOptions, ClientExtensions} of
+        { #{certificate_status := #certificate_status{}}
+        , #{status_request := #certificate_status_request{}}
+        } ->
+            #certificate_status_request{
+               status_type = ?CERTIFICATE_STATUS_TYPE_OCSP,
+               request = <<>>
+              };
+        _ ->
+            undefined
+    end.
+
 select_curve(Client, Server) ->
     select_curve(Client, Server, false).
 
@@ -3065,6 +3088,11 @@ decode_extensions(<<?UINT16(?STATUS_REQUEST), ?UINT16(Len),
           ASN1OCSPResponse:OCSPLen/binary>> ->
             decode_extensions(Rest, Version, MessageType,
                       Acc#{status_request => #certificate_status{response = ASN1OCSPResponse}});
+        <<?BYTE(?CERTIFICATE_STATUS_TYPE_OCSP),
+          ?UINT16(0),
+          ?UINT16(0)>> ->
+            decode_extensions(Rest, Version, MessageType,
+                              Acc#{status_request => #certificate_status_request{}});
         _Other ->
             decode_extensions(Rest, Version, MessageType, Acc)
     end;
diff --git a/lib/ssl/src/tls_dtls_connection.erl b/lib/ssl/src/tls_dtls_connection.erl
index 2822e53b7eed..e62f379b4d9e 100644
--- a/lib/ssl/src/tls_dtls_connection.erl
+++ b/lib/ssl/src/tls_dtls_connection.erl
@@ -748,12 +748,19 @@ do_server_hello(Type, #{next_protocol_negotiation := NextProtocols} =
                        handshake_env = HsEnv,
 		       session = #session{session_id = SessId},
 		       connection_states = ConnectionStates0,
-                       ssl_options = #{versions := [HighestVersion|_]}}
+                       ssl_options = #{versions := [HighestVersion|_]} = SSLOpts0}
 		= State0) when is_atom(Type) ->
     %% TLS 1.3 - Section 4.1.3
     %% Override server random values for TLS 1.3 downgrade protection mechanism.
     ConnectionStates1 = update_server_random(ConnectionStates0, Version, HighestVersion),
-    State1 = State0#state{connection_states = ConnectionStates1},
+    SSLOpts1 = case {SSLOpts0, ServerHelloExt} of
+                   { #{certificate_status := #certificate_status{}}
+                   , #{status_request := #certificate_status_request{}}
+                   } -> SSLOpts0;
+                   _ -> SSLOpts0#{certificate_status => undefined}
+               end,
+    State1 = State0#state{connection_states = ConnectionStates1,
+                          ssl_options = SSLOpts1},
     ServerHello =
 	ssl_handshake:server_hello(SessId, ssl:tls_version(Version),
                                    ConnectionStates1, ServerHelloExt),
@@ -930,8 +937,14 @@ do_client_certify_and_key_exchange(State0, Connection) ->
 
 server_certify_and_key_exchange(State0, Connection) ->
     State1 = certify_server(State0, Connection),
-    State2 = key_exchange(State1, Connection),
-    request_client_cert(State2, Connection).
+    State2 = certificate_status(State1, Connection),
+    State3 = key_exchange(State2, Connection),
+    request_client_cert(State3, Connection).
+
+certificate_status(#state{ssl_options = #{certificate_status := #certificate_status{} = Status}} = State, Connection) ->
+    Connection:queue_handshake(Status, State);
+certificate_status(State, _) ->
+    State.
 
 certify_client_key_exchange(#encrypted_premaster_secret{premaster_secret= EncPMS},
 			    #state{session = #session{private_key = PrivateKey},
diff --git a/lib/ssl/src/tls_handshake_1_3.erl b/lib/ssl/src/tls_handshake_1_3.erl
index 69a4e5cf1f6e..5139318d26f7 100644
--- a/lib/ssl/src/tls_handshake_1_3.erl
+++ b/lib/ssl/src/tls_handshake_1_3.erl
@@ -286,10 +286,11 @@ certificate(undefined, _, _, _, client) ->
     {ok, #certificate_1_3{
             certificate_request_context = <<>>,
             certificate_list = []}};
-certificate([OwnCert], CertDbHandle, CertDbRef, _CRContext, Role) ->
+certificate([OwnCert], CertDbHandle, CertDbRef, CRContext, Role) ->
     case ssl_certificate:certificate_chain(OwnCert, CertDbHandle, CertDbRef) of
 	{ok, _, Chain} ->
-            CertList = chain_to_cert_list(Chain),
+            CertList0 = chain_to_cert_list(Chain),
+            CertList = maybe_add_certificate_entry_extensions(CertList0, CRContext),
             %% If this message is in response to a CertificateRequest, the value of
             %% certificate_request_context in that message. Otherwise (in the case
             %%of server authentication), this field SHALL be zero length.
@@ -309,8 +310,9 @@ certificate([OwnCert], CertDbHandle, CertDbRef, _CRContext, Role) ->
                     certificate_request_context = <<>>,
                     certificate_list = []}}
     end;
-certificate([_,_| _] = Chain, _,_,_,_) ->
-    CertList = chain_to_cert_list(Chain),
+certificate([_,_| _] = Chain, _,_,CRContext,_) ->
+    CertList0 = chain_to_cert_list(Chain),
+    CertList = maybe_add_certificate_entry_extensions(CertList0, CRContext),
     {ok, #certificate_1_3{
             certificate_request_context = <<>>,
             certificate_list = CertList}}.
@@ -1243,17 +1245,20 @@ update_start_state(State, Map) ->
     PeerPublicKey = maps:get(peer_public_key, Map, undefined),
     ALPNProtocol = maps:get(alpn, Map, undefined),
     Random = maps:get(random, Map),
+    StatusRequest = maps:get(status_request, Map, undefined),
     update_start_state(State, Cipher, KeyShare, SessionId,
                        Group, SelectedSignAlg, PeerPublicKey,
-                       ALPNProtocol, Random).
+                       ALPNProtocol, Random, StatusRequest).
 %%
 update_start_state(#state{connection_states = ConnectionStates0,
                           handshake_env = #handshake_env{} = HsEnv,
+                          protocol_specific = ProtocolSpecific0,
                           static_env = #static_env{role = Role},
                           connection_env = CEnv,
                           session = Session} = State,
                    Cipher, KeyShare, SessionId,
-                   Group, SelectedSignAlg, PeerPublicKey, ALPNProtocol, Random) ->
+                   Group, SelectedSignAlg, PeerPublicKey, ALPNProtocol, Random,
+                   StatusRequest) ->
     #{security_parameters := SecParamsR0} = PendingRead =
         maps:get(pending_read, ConnectionStates0),
     #{security_parameters := SecParamsW0} = PendingWrite =
@@ -1267,9 +1272,11 @@ update_start_state(#state{connection_states = ConnectionStates0,
     ConnectionStates =
         ConnectionStates0#{pending_read => PendingRead#{security_parameters => SecParamsR},
                            pending_write => PendingWrite#{security_parameters => SecParamsW}},
+    ProtocolSpecific = ProtocolSpecific0#{status_request => StatusRequest},
     State#state{connection_states = ConnectionStates,
                 handshake_env = HsEnv#handshake_env{alpn = ALPNProtocol},
                 key_share = KeyShare,
+                protocol_specific = ProtocolSpecific,
                 session = Session#session{session_id = SessionId,
                                           ecc = Group,
                                           sign_alg = SelectedSignAlg,
@@ -1968,3 +1975,21 @@ plausible_missing_chain([_] = EncodedChain, undefined, SignAlg, Key, Session0) -
                     };
 plausible_missing_chain(_,Plausible,_,_,_) ->
     Plausible.
+
+maybe_add_certificate_entry_extensions(
+  [ServerCertEntry = #certificate_entry{} | Rest],
+  #{ status_request := #certificate_status_request{} = Req
+   , certificate_status := #certificate_status{} = Status
+   }) ->
+    [ ServerCertEntry#certificate_entry{
+        extensions =
+            #{ status_request =>
+                   Req#certificate_status_request{
+                     status_type = ?CERTIFICATE_STATUS_TYPE_OCSP,
+                     request = Status
+                    }
+             }
+       }
+    | Rest];
+maybe_add_certificate_entry_extensions(CertList, _CRContext) ->
+    CertList.
diff --git a/lib/ssl/src/tls_server_connection_1_3.erl b/lib/ssl/src/tls_server_connection_1_3.erl
index 83a8b0894d90..bb888f761894 100644
--- a/lib/ssl/src/tls_server_connection_1_3.erl
+++ b/lib/ssl/src/tls_server_connection_1_3.erl
@@ -475,6 +475,7 @@ do_handle_client_hello(#client_hello{cipher_suites = ClientCiphers,
                      false -> State2
                  end,
 
+        StatusRequest = maps:get(status_request, Extensions, undefined),
         State4 = tls_handshake_1_3:update_start_state(State3,
                                                       #{cipher => Cipher,
                                                         key_share => KeyShare,
@@ -483,7 +484,8 @@ do_handle_client_hello(#client_hello{cipher_suites = ClientCiphers,
                                                         sign_alg => SelectedSignAlg,
                                                         peer_public_key => ClientPubKey,
                                                         alpn => ALPNProtocol,
-                                                        random => Random}),
+                                                        random => Random,
+                                                        status_request => StatusRequest}),
 
         %% 4.1.4.  Hello Retry Request
         %%
@@ -829,11 +831,19 @@ maybe_send_certificate_request(#state{static_env = #static_env{protocol_cb = Con
 maybe_send_certificate(State, PSK) when  PSK =/= undefined ->
     {ok, State};
 maybe_send_certificate(#state{session = #session{own_certificates = OwnCerts},
+                              protocol_specific = ProtocolSpecific,
+                              ssl_options = SslOpts,
                               static_env = #static_env{
                                               protocol_cb = Connection,
                                               cert_db = CertDbHandle,
                                               cert_db_ref = CertDbRef}} = State, _) ->
-    case tls_handshake_1_3:certificate(OwnCerts, CertDbHandle, CertDbRef, <<>>, server) of
+    %% hack: apparently, CRContext is not used by the server (whatever that may be...)
+    StatusRequest = maps:get(status_request, ProtocolSpecific, undefined),
+    CertificateStatus = maps:get(certificate_status, SslOpts, undefined),
+    CRContext = #{ status_request => StatusRequest
+                 , certificate_status => CertificateStatus
+                 },
+    case tls_handshake_1_3:certificate(OwnCerts, CertDbHandle, CertDbRef, CRContext, server) of
         {ok, Certificate} ->
             {ok, Connection:queue_handshake(Certificate, State)};
         Error ->
diff --git a/lib/ssl/test/make_certs.erl b/lib/ssl/test/make_certs.erl
index e786cddeb642..0640c3dc5f4f 100644
--- a/lib/ssl/test/make_certs.erl
+++ b/lib/ssl/test/make_certs.erl
@@ -183,6 +183,22 @@ revoke(Root, CA, User, C) ->
     cmd(Cmd, Env),
     gencrl(Root, CA, C).
 
+make_ocsp_response(Port, Root, CA, User, Issuer, C) ->
+    UsrCert = filename:join([Root, User, "cert.pem"]),
+    IssuerCert = filename:join([Root, Issuer, "cert.pem"]),
+    CACertFile = filename:join([Root, CA, "cert.pem"]),
+    OCSPResp = filename:join([Root, User, "ocsp.resp"]),
+    Cmd = [C#config.openssl_cmd, " ocsp"
+           " -CAfile ", CACertFile,
+           " -url ", "http://localhost:" ++ integer_to_list(Port),
+           " -no_nonce",
+           " -respout ", OCSPResp,
+	   " -issuer ", IssuerCert,
+	   " -cert ", UsrCert],
+    Env = [{"ROOTDIR", filename:absname(Root)}],
+    cmd(Cmd, Env),
+    OCSPResp.
+
 %% Remove the certificate's entry from the database. The OCSP responder
 %% will consider the certificate to be unknown.
 remove_entry(Root, CA, User, C) ->
diff --git a/lib/ssl/test/openssl_ocsp_SUITE.erl b/lib/ssl/test/openssl_ocsp_SUITE.erl
index 045915cc84d3..e864afa6985f 100644
--- a/lib/ssl/test/openssl_ocsp_SUITE.erl
+++ b/lib/ssl/test/openssl_ocsp_SUITE.erl
@@ -23,6 +23,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("public_key/include/public_key.hrl").
 -include("ssl_test_lib.hrl").
+-include("ssl_handshake.hrl").
 
 %% Callback functions
 -export([all/0,
@@ -40,7 +41,8 @@
          stapling_with_responder_cert/0, stapling_with_responder_cert/1,
          stapling_revoked/0, stapling_revoked/1,
          stapling_undetermined/0, stapling_undetermined/1,
-         stapling_no_staple/0, stapling_no_staple/1
+         stapling_no_staple/0, stapling_no_staple/1,
+         stapling_server/0, stapling_server/1
         ]).
 
 %% spawn export
@@ -56,9 +58,9 @@ all() ->
      {group, 'tlsv1.2'},
      {group, 'dtlsv1.2'}].
 
-groups() -> 
-    [{'tlsv1.3', [], ocsp_tests()},
-     {'tlsv1.2', [], ocsp_tests()},
+groups() ->
+    [{'tlsv1.3', [], ocsp_tests() ++ ocsp_server_tests()},
+     {'tlsv1.2', [], ocsp_tests() ++ ocsp_server_tests()},
      {'dtlsv1.2', [], ocsp_tests()}].
 
 ocsp_tests() ->
@@ -70,6 +72,9 @@ ocsp_tests() ->
      stapling_no_staple
     ].
 
+ocsp_server_tests() ->
+    [stapling_server].
+
 %%--------------------------------------------------------------------
 init_per_suite(Config0) ->
     Config = lists:merge([{debug, ?DEBUG}],
@@ -215,6 +220,61 @@ stapling_negative_helper(Config, CACertsPath, ServerVariant, ExpectedError) ->
     true = is_pid(Client),
     ssl_test_lib:check_client_alert(Client, ExpectedError).
 
+%%--------------------------------------------------------------------
+stapling_server() ->
+    [{doc, "Verify basic OCSP stapling works (server side)"}].
+stapling_server(Config0)
+  when is_list(Config0) ->
+    PrivDir = proplists:get_value(priv_dir, Config0),
+    ResponderPort = proplists:get_value(responder_port, Config0),
+    OCSPRespPath = make_certs:make_ocsp_response(ResponderPort, PrivDir, "otpCA",
+                                                 "server", "b.server",
+                                                 make_certs:default_config()),
+    {ok, OCSPRespDer} = file:read_file(OCSPRespPath),
+    ServerOpts = proplists:get_value(server_opts, Config0, []),
+    Config = [ {server_opts, [ {sni_fun,
+                                fun(SN) ->  ocsp_sni_fun(SN, OCSPRespDer) end}
+                             | ServerOpts]}
+             | Config0],
+    stapling_server_helper(Config, []).
+
+stapling_server_helper(Config, Opts) ->
+    Data = "ping",  %% 4 bytes
+    %% GroupName = undefined,
+    %% ServerOpts = [{group, GroupName}],
+    ServerOpts = [],
+    Server = ssl_test_lib:start_server(erlang,
+                                       [{options, ServerOpts}],
+                                       Config),
+    Port = ssl_test_lib:inet_port(Server),
+
+    ClientOpts = ssl_test_lib:ssl_options(Opts, Config),
+    Client = ssl_test_lib:start_client(openssl,
+                                       [{port, Port},
+                                        {options, ClientOpts},
+                                        {server_name_indication, "server"},
+                                        {ocsp_stapling, true},
+                                        {ocsp_nonce, false},
+                                        {debug_openssl, false}],
+                                       Config),
+    true = is_pid(Client),
+    ct:sleep(1000),
+    {messages, ClientMsgs} = process_info(Client, messages),
+    [OCSPOutput] = [Output ||
+                       {_Port, {data, Output}} <- ClientMsgs,
+                       case re:run(Output, "OCSP response") of
+                           {match, _} -> true;
+                           _ -> false
+                       end],
+    {match, _} = re:run(OCSPOutput, "Response Status: successful"),
+    {match, _} = re:run(OCSPOutput, "Cert Status:"),
+
+    ssl_test_lib:check_active_receive(Server, "Hello world"),
+    ssl_test_lib:send(Client, Data),
+    Data = ssl_test_lib:check_active_receive(Server, Data),
+    ssl_test_lib:close(Server),
+    ssl_test_lib:close(Client).
+
 %%--------------------------------------------------------------------
 %% Internal functions -----------------------------------------------
 %%--------------------------------------------------------------------
@@ -279,3 +339,9 @@ get_free_port() ->
     {ok, Port} = inet:port(Listen),
     ok = gen_tcp:close(Listen),
     Port.
+
+ocsp_sni_fun(_Servername, OCSPRespDer) ->
+    [{certificate_status, #certificate_status{
+                             status_type = 1,
+                             response = OCSPRespDer
+                            }}].
diff --git a/lib/ssl/test/ssl_test_lib.erl b/lib/ssl/test/ssl_test_lib.erl
index 885d98813188..5eb68b3e4791 100644
--- a/lib/ssl/test/ssl_test_lib.erl
+++ b/lib/ssl/test/ssl_test_lib.erl
@@ -2334,6 +2334,7 @@ start_client(openssl, Port, ClientOpts, Config) ->
     HostName = proplists:get_value(hostname, ClientOpts, net_adm:localhost()),
     SNI = openssl_sni(proplists:get_value(server_name_indication, ClientOpts, undefined)),
     Debug = openssl_debug_options(DOpenssl),
+    OCSPStatus = openssl_ocsp_status(proplists:get_value(ocsp_stapling, ClientOpts, undefined)),
 
     Exe = "openssl",
     Args0 =  case Groups0 of
@@ -2352,6 +2353,7 @@ start_client(openssl, Port, ClientOpts, Config) ->
                          Reconnect ++
                          MaxFragLen ++
                          SessionArgs ++
+                         OCSPStatus ++
                          Debug;
                  Group ->
                      ["s_client",
@@ -2369,6 +2371,7 @@ start_client(openssl, Port, ClientOpts, Config) ->
                          Reconnect ++
                          MaxFragLen ++
                          SessionArgs ++
+                         OCSPStatus ++
                          Debug
              end,
     Args = maybe_force_ipv4(Args0),
@@ -2512,6 +2515,14 @@ openssl_debug_options(true) ->
     ["-msg", "-debug"];
 openssl_debug_options(false) ->
     [].
+
+openssl_ocsp_status(undefined) ->
+    [];
+openssl_ocsp_status(true) ->
+    ["-status"];
+openssl_ocsp_status(false) ->
+    [].
+
 %%
 openssl_debug_options(PrivDir, true) ->
     case is_keylogfile_supported() of