Skip to content

Commit

Permalink
Configurable #fragments to send on NACK of sample
Browse files Browse the repository at this point in the history
This changes the hard-coded policy of responding to a NACK of a large
sample with only the first fragment of that sample into one where the
number is configurable.  The default is 1 and so there is by default no
change in behavior.

Because of the change, the code handling HeartbeatFrag messages in the
case of a retransmit is now no longer dead code.

Signed-off-by: Erik Boasson <[email protected]>
  • Loading branch information
eboasson committed Sep 27, 2023
1 parent 9c6c5df commit 600459e
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 20 deletions.
20 changes: 16 additions & 4 deletions docs/manual/config/config_file_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,23 @@ The default value is: ``writers``
//CycloneDDS/Domain/Internal/BurstSize
--------------------------------------

Children: `//CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit`_, `//CycloneDDS/Domain/Internal/BurstSize/MaxRexmit`_
Children: `//CycloneDDS/Domain/Internal/BurstSize/MaxFragsRexmitSample`_, `//CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit`_, `//CycloneDDS/Domain/Internal/BurstSize/MaxRexmit`_

Setting for controlling the size of transmitting bursts.


.. _`//CycloneDDS/Domain/Internal/BurstSize/MaxFragsRexmitSample`:

//CycloneDDS/Domain/Internal/BurstSize/MaxFragsRexmitSample
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Text

This element controls the maximum number of fragments of a sample that are retransmit in response to a NACK of the entire sample (as opposed to what is sent in response to a NACKFRAG requesting specific fragments).

The default value is: ``1``


.. _`//CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit`:

//CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit
Expand Down Expand Up @@ -2628,10 +2640,10 @@ The categorisation of tracing output is incomplete and hence most of the verbosi
The default value is: ``none``

..
generated from ddsi_config.h[eae21b4181f3fdd23b2514a089c43b0e36357066]
generated from ddsi_config.h[570f67bd3080674a4bad53d9580a8bb7ad1e6e4d]
generated from ddsi__cfgunits.h[bd22f0c0ed210501d0ecd3b07c992eca549ef5aa]
generated from ddsi__cfgelems.h[0d5a3d2063031f4b47f53cc007d2703bcdefdfa1]
generated from ddsi_config.c[7b5f868237256aa44f09a4ef5b411a21a5ca8024]
generated from ddsi__cfgelems.h[13337a006d5313519c88c3f3643f27992840cfd3]
generated from ddsi_config.c[efeae198a5e12ca8977a655216470564b5c44b64]
generated from _confgen.h[e32eabfc35e9f3a7dcb63b19ed148c0d17c6e5fc]
generated from _confgen.c[237308acd53897a34e8c643e16e05a61d73ffd65]
generated from generate_rnc.c[b50e4b7ab1d04b2bc1d361a0811247c337b74934]
Expand Down
16 changes: 12 additions & 4 deletions docs/manual/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,19 @@ The default value is: `writers`


#### //CycloneDDS/Domain/Internal/BurstSize
Children: [MaxInitTransmit](#cycloneddsdomaininternalburstsizemaxinittransmit), [MaxRexmit](#cycloneddsdomaininternalburstsizemaxrexmit)
Children: [MaxFragsRexmitSample](#cycloneddsdomaininternalburstsizemaxfragsrexmitsample), [MaxInitTransmit](#cycloneddsdomaininternalburstsizemaxinittransmit), [MaxRexmit](#cycloneddsdomaininternalburstsizemaxrexmit)

Setting for controlling the size of transmitting bursts.


##### //CycloneDDS/Domain/Internal/BurstSize/MaxFragsRexmitSample
Text

This element controls the maximum number of fragments of a sample that are retransmit in response to a NACK of the entire sample (as opposed to what is sent in response to a NACKFRAG requesting specific fragments).

The default value is: `1`


##### //CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit
Number-with-unit

Expand Down Expand Up @@ -1836,10 +1844,10 @@ While none prevents any message from being written to a DDSI2 log file.
The categorisation of tracing output is incomplete and hence most of the verbosity levels and categories are not of much use in the current release. This is an ongoing process and here we describe the target situation rather than the current situation. Currently, the most useful verbosity levels are config, fine and finest.

The default value is: `none`
<!--- generated from ddsi_config.h[eae21b4181f3fdd23b2514a089c43b0e36357066] -->
<!--- generated from ddsi_config.h[570f67bd3080674a4bad53d9580a8bb7ad1e6e4d] -->
<!--- generated from ddsi__cfgunits.h[bd22f0c0ed210501d0ecd3b07c992eca549ef5aa] -->
<!--- generated from ddsi__cfgelems.h[0d5a3d2063031f4b47f53cc007d2703bcdefdfa1] -->
<!--- generated from ddsi_config.c[7b5f868237256aa44f09a4ef5b411a21a5ca8024] -->
<!--- generated from ddsi__cfgelems.h[13337a006d5313519c88c3f3643f27992840cfd3] -->
<!--- generated from ddsi_config.c[efeae198a5e12ca8977a655216470564b5c44b64] -->
<!--- generated from _confgen.h[e32eabfc35e9f3a7dcb63b19ed148c0d17c6e5fc] -->
<!--- generated from _confgen.c[237308acd53897a34e8c643e16e05a61d73ffd65] -->
<!--- generated from generate_rnc.c[b50e4b7ab1d04b2bc1d361a0811247c337b74934] -->
Expand Down
12 changes: 9 additions & 3 deletions etc/cyclonedds.rnc
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ CycloneDDS configuration""" ] ]
<p>Setting for controlling the size of transmitting bursts.</p>""" ] ]
element BurstSize {
[ a:documentation [ xml:lang="en" """
<p>This element controls the maximum number of fragments of a sample that are retransmit in response to a NACK of the entire sample (as opposed to what is sent in response to a NACKFRAG requesting specific fragments).</p>
<p>The default value is: <code>1</code></p>""" ] ]
element MaxFragsRexmitSample {
text
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually, the receivers can keep up with the transmitter, at least on average, so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: <code>4294967295</code></p>""" ] ]
Expand Down Expand Up @@ -1277,10 +1283,10 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg==<br>
duration_inf = xsd:token { pattern = "inf|0|(\d+(\.\d*)?([Ee][\-+]?\d+)?|\.\d+([Ee][\-+]?\d+)?) *([num]?s|min|hr|day)" }
memsize = xsd:token { pattern = "0|(\d+(\.\d*)?([Ee][\-+]?\d+)?|\.\d+([Ee][\-+]?\d+)?) *([kMG]i?)?B" }
}
# generated from ddsi_config.h[eae21b4181f3fdd23b2514a089c43b0e36357066]
# generated from ddsi_config.h[570f67bd3080674a4bad53d9580a8bb7ad1e6e4d]
# generated from ddsi__cfgunits.h[bd22f0c0ed210501d0ecd3b07c992eca549ef5aa]
# generated from ddsi__cfgelems.h[0d5a3d2063031f4b47f53cc007d2703bcdefdfa1]
# generated from ddsi_config.c[7b5f868237256aa44f09a4ef5b411a21a5ca8024]
# generated from ddsi__cfgelems.h[13337a006d5313519c88c3f3643f27992840cfd3]
# generated from ddsi_config.c[efeae198a5e12ca8977a655216470564b5c44b64]
# generated from _confgen.h[e32eabfc35e9f3a7dcb63b19ed148c0d17c6e5fc]
# generated from _confgen.c[237308acd53897a34e8c643e16e05a61d73ffd65]
# generated from generate_rnc.c[b50e4b7ab1d04b2bc1d361a0811247c337b74934]
Expand Down
14 changes: 11 additions & 3 deletions etc/cyclonedds.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,19 @@ CycloneDDS configuration</xs:documentation>
</xs:annotation>
<xs:complexType>
<xs:all>
<xs:element minOccurs="0" ref="config:MaxFragsRexmitSample"/>
<xs:element minOccurs="0" ref="config:MaxInitTransmit"/>
<xs:element minOccurs="0" ref="config:MaxRexmit"/>
</xs:all>
</xs:complexType>
</xs:element>
<xs:element name="MaxFragsRexmitSample" type="xs:string">
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element controls the maximum number of fragments of a sample that are retransmit in response to a NACK of the entire sample (as opposed to what is sent in response to a NACKFRAG requesting specific fragments).&lt;/p&gt;
&lt;p&gt;The default value is: &lt;code&gt;1&lt;/code&gt;&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="MaxInitTransmit" type="config:memsize">
<xs:annotation>
<xs:documentation>
Expand Down Expand Up @@ -1931,10 +1939,10 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg==&lt;br&gt;
</xs:restriction>
</xs:simpleType>
</xs:schema>
<!--- generated from ddsi_config.h[eae21b4181f3fdd23b2514a089c43b0e36357066] -->
<!--- generated from ddsi_config.h[570f67bd3080674a4bad53d9580a8bb7ad1e6e4d] -->
<!--- generated from ddsi__cfgunits.h[bd22f0c0ed210501d0ecd3b07c992eca549ef5aa] -->
<!--- generated from ddsi__cfgelems.h[0d5a3d2063031f4b47f53cc007d2703bcdefdfa1] -->
<!--- generated from ddsi_config.c[7b5f868237256aa44f09a4ef5b411a21a5ca8024] -->
<!--- generated from ddsi__cfgelems.h[13337a006d5313519c88c3f3643f27992840cfd3] -->
<!--- generated from ddsi_config.c[efeae198a5e12ca8977a655216470564b5c44b64] -->
<!--- generated from _confgen.h[e32eabfc35e9f3a7dcb63b19ed148c0d17c6e5fc] -->
<!--- generated from _confgen.c[237308acd53897a34e8c643e16e05a61d73ffd65] -->
<!--- generated from generate_rnc.c[b50e4b7ab1d04b2bc1d361a0811247c337b74934] -->
Expand Down
7 changes: 4 additions & 3 deletions src/core/ddsi/defconfig.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ void ddsi_config_init_default (struct ddsi_config *cfg)
cfg->whc_adaptive = INT32_C (1);
cfg->max_rexmit_burst_size = UINT32_C (1048576);
cfg->init_transmit_extra_pct = UINT32_C (4294967295);
cfg->max_frags_in_rexmit_of_sample = UINT32_C (1);
cfg->tcp_nodelay = INT32_C (1);
cfg->tcp_port = INT32_C (-1);
cfg->tcp_read_timeout = INT64_C (2000000000);
Expand All @@ -98,10 +99,10 @@ void ddsi_config_init_default (struct ddsi_config *cfg)
cfg->ssl_min_version.minor = 3;
#endif /* DDS_HAS_SSL */
}
/* generated from ddsi_config.h[eae21b4181f3fdd23b2514a089c43b0e36357066] */
/* generated from ddsi_config.h[570f67bd3080674a4bad53d9580a8bb7ad1e6e4d] */
/* generated from ddsi__cfgunits.h[bd22f0c0ed210501d0ecd3b07c992eca549ef5aa] */
/* generated from ddsi__cfgelems.h[0d5a3d2063031f4b47f53cc007d2703bcdefdfa1] */
/* generated from ddsi_config.c[7b5f868237256aa44f09a4ef5b411a21a5ca8024] */
/* generated from ddsi__cfgelems.h[13337a006d5313519c88c3f3643f27992840cfd3] */
/* generated from ddsi_config.c[efeae198a5e12ca8977a655216470564b5c44b64] */
/* generated from _confgen.h[e32eabfc35e9f3a7dcb63b19ed148c0d17c6e5fc] */
/* generated from _confgen.c[237308acd53897a34e8c643e16e05a61d73ffd65] */
/* generated from generate_rnc.c[b50e4b7ab1d04b2bc1d361a0811247c337b74934] */
Expand Down
1 change: 1 addition & 0 deletions src/core/ddsi/include/dds/ddsi/ddsi_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ struct ddsi_config
uint32_t max_rexmit_msg_size;
uint32_t init_transmit_extra_pct;
uint32_t max_rexmit_burst_size;
uint32_t max_frags_in_rexmit_of_sample;

int publish_uc_locators; /* Publish discovery unicast locators */
int enable_uc_locators; /* If false, don't even try to create a unicast socket */
Expand Down
7 changes: 7 additions & 0 deletions src/core/ddsi/src/ddsi__cfgelems.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,13 @@ static struct cfgelem internal_burstsize_cfgelems[] = {
"Besides, the retransmits will be unicast, and so any multicast advantage "
"will be lost as well.</p>"),
UNIT("memsize")),
STRING("MaxFragsRexmitSample", NULL, 1, "1",
MEMBER(max_frags_in_rexmit_of_sample),
FUNCTIONS(0, uf_pos_uint, 0, pf_uint),
DESCRIPTION(
"<p>This element controls the maximum number of fragments of a sample that "
"are retransmit in response to a NACK of the entire sample (as opposed to "
"what is sent in response to a NACKFRAG requesting specific fragments).</p>")),
END_MARKER
};

Expand Down
11 changes: 11 additions & 0 deletions src/core/ddsi/src/ddsi_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ DUPF(uint32);
#endif
DU(natint);
DU(natint_255);
DU(pos_uint);
DUPF(participantIndex);
DU(dyn_port);
DUPF(memsize);
Expand Down Expand Up @@ -1418,6 +1419,16 @@ static enum update_result uf_uint (struct ddsi_cfgst *cfgst, void *parent, struc
return URES_SUCCESS;
}

static enum update_result uf_pos_uint (struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG (int first), const char *value)
{
uint32_t * const elem = cfg_address (cfgst, parent, cfgelem);
int64_t x;
if (uf_int64_unit (cfgst, &x, value, NULL, 1, 1, UINT32_MAX) != URES_SUCCESS)
return URES_ERROR;
*elem = (uint32_t) x;
return URES_SUCCESS;
}

static void pf_uint (struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, uint32_t sources)
{
uint32_t const * const p = cfg_address (cfgst, parent, cfgelem);
Expand Down
17 changes: 14 additions & 3 deletions src/core/ddsi/src/ddsi_transmit.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,15 @@ int ddsi_enqueue_sample_wrlock_held (struct ddsi_writer *wr, ddsi_seqno_t seq, s
/* end-of-transaction messages are empty, but still need to be sent */
nfrags = 1;
}
if (!isnew && nfrags > 1)
nfrags = 1;
if (!isnew && nfrags > gv->config.max_frags_in_rexmit_of_sample)
{
/* Implementations that never use NACKFRAG are allowed by the specification and for such a peer,
we must always respond with the full sample on a NACK. For all other implementations (and
certainly for Cyclone) it makes much more sense to do a small initial retransmit.
(I am not aware of any implementations that never use NACKFRAG.) */
nfrags = gv->config.max_frags_in_rexmit_of_sample;
}
for (i = 0; i < nfrags && enqueued != DDSI_QXEV_MSG_REXMIT_DROPPED; i++)
{
struct ddsi_xmsg *fmsg = NULL;
Expand All @@ -508,7 +515,11 @@ int ddsi_enqueue_sample_wrlock_held (struct ddsi_writer *wr, ddsi_seqno_t seq, s
}
else
{
/* Implementations that never use NACKFRAG are allowed by the specification, and for such a peer, we must always force out the full sample on a retransmit request. I am not aware of any such implementations so leaving the override flag in, but not actually using it at the moment. Should set force = (i != 0) for "known bad" implementations. */
/* For implementations that never use NACKFRAG if we enqueue the first fragment, we must enqueue
everything, because dropping the tail is guaranteed to result in another full NACK anyway.
I am not aware of any such implementations so leaving the override flag in, if it turns out they do
exist, set force = (i != 0) for "known bad" implementations. */
const int force = 0;
if(fmsg)
{
Expand Down

0 comments on commit 600459e

Please sign in to comment.