Skip to content

Commit

Permalink
flagReplayComplete for wildcard subscribers, fix bad broker publish h…
Browse files Browse the repository at this point in the history
…ang (#433)

* flagReplayComplete for  wildcard subscribers, allow pulling csp timestamp from Kafka message
---------

Signed-off-by: Nijat Khanbabayev <[email protected]>
  • Loading branch information
NeejWeej authored Feb 7, 2025
1 parent b09a88b commit 6cca482
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 54 deletions.
12 changes: 7 additions & 5 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class EventCb : public RdKafka::EventCb

KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary & properties ) : AdapterManager( engine ),
m_consumerIdx( 0 ),
m_producerPollThreadActive( false )
m_producerPollThreadActive( false ),
m_unrecoverableError( false )
{
m_maxThreads = properties.get<uint64_t>( "max_threads" );
m_pollTimeoutMs = properties.get<TimeDelta>( "poll_timeout" ).asMilliseconds();
Expand Down Expand Up @@ -134,6 +135,7 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction

void KafkaAdapterManager::forceShutdown( const std::string & err )
{
m_unrecoverableError = true; // So we can alert the producer to stop trying to flush
forceConsumerReplayComplete();
try
{
Expand Down Expand Up @@ -221,18 +223,18 @@ void KafkaAdapterManager::pollProducers()
{
while( m_producerPollThreadActive )
{
m_producer -> poll( 1000 );
m_producer -> poll( m_pollTimeoutMs );
}

try
{
while( true )
{
auto rc = m_producer -> flush( 10000 );
if( !rc )
auto rc = m_producer -> flush( 5000 );
if( !rc || m_unrecoverableError )
break;

if( rc && rc != RdKafka::ERR__TIMED_OUT )
if( rc != RdKafka::ERR__TIMED_OUT )
CSP_THROW( RuntimeException, "KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str( rc ) );
}
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/csp/adapters/kafka/KafkaAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <atomic>
#include <string>
#include <thread>
#include <unordered_map>
Expand Down Expand Up @@ -109,7 +110,8 @@ class KafkaAdapterManager final : public csp::AdapterManager
std::shared_ptr<RdKafka::Producer> m_producer;
std::unique_ptr<RdKafka::DeliveryReportCb> m_producerCb;
std::unique_ptr<std::thread> m_producerPollThread;
volatile bool m_producerPollThreadActive;
std::atomic<bool> m_producerPollThreadActive;
std::atomic<bool> m_unrecoverableError;

std::unique_ptr<RdKafka::Conf> m_consumerConf;
std::unique_ptr<RdKafka::Conf> m_producerConf;
Expand Down
53 changes: 15 additions & 38 deletions cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ void KafkaConsumer::addSubscriber( const std::string & topic, const std::string
}
else
m_topics[topic].subscribers[key].emplace_back( subscriber );
//This is a bit convoluted, but basically if we dont have rebalanceCB set, that means we are in "groupid" mode
//which doesnt support seeking. We force the adapters into a live mode, because groupid mode leads to deadlocks
//on adapters that dont received any data since we dont have partition information available to declare them done ( we dont even connect to them all )
if( !m_rebalanceCb )
subscriber -> flagReplayComplete();
}

void KafkaConsumer::start( DateTime starttime )
Expand Down Expand Up @@ -137,12 +132,21 @@ void KafkaConsumer::start( DateTime starttime )
else
CSP_THROW( TypeError, "Expected enum, datetime or timedelta for startOffset" );
}
//This is a bit convoluted, but basically if we dont have rebalanceCB set, that means we are in "groupid" mode
//which doesnt support seeking. We force the adapters into a live mode, because groupid mode leads to deadlocks
//on adapters that dont received any data since we dont have partition information available to declare them done ( we dont even connect to them all )
else
forceReplayCompleted();

std::vector<std::string> topics;
for( auto & entry : m_topics )
topics.emplace_back( entry.first );
for (const auto& [topic, topic_data] : m_topics)
{
topics.emplace_back( topic );
// wildcard subscription has no guarantee of being in order
// we flag replay complete as soon as we identify it.
if( topic_data.wildcardSubscriber )
topic_data.wildcardSubscriber -> flagReplayComplete();
}

RdKafka::ErrorCode err = m_consumer -> subscribe( topics );
if( err )
Expand Down Expand Up @@ -175,18 +179,7 @@ void KafkaConsumer::setNumPartitions( const std::string & topic, size_t num )
void KafkaConsumer::forceReplayCompleted()
{
for( auto & entry : m_topics )
{
auto & topicData = entry.second;
if( !topicData.flaggedReplayComplete )
{
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
topicData.flaggedReplayComplete = true;
}
}
entry.second.markReplayComplete();
}

void KafkaConsumer::poll()
Expand Down Expand Up @@ -272,33 +265,17 @@ void KafkaConsumer::poll()
}
}

//we need to flag end in case the topic doesnt have any incoming data, we cant stall the engine on the pull side of the adapter
if( allDone )
{
//we need to flag end in case the topic doesnt have any incoming data, we cant stall the engine on the pull side of the adapter
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
topicData.flaggedReplayComplete = true;
}
topicData.markReplayComplete();
}
}
else
{
//In most cases we should not get here, if we do then something is wrong
//safest bet is to release the pull adapter so it doesnt stall the engine and
//we can let the error msg through
if( !topicData.flaggedReplayComplete )
{
//flag inputs as done so they dont hold up the engine
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
topicData.flaggedReplayComplete = true;
}
topicData.markReplayComplete();

std::string errmsg = "KafkaConsumer: Message error on topic \"" + msg -> topic_name() + "\". errcode: " + RdKafka::err2str( msg -> err() ) + " error: " + msg -> errstr();
m_mgr -> pushStatus( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
Expand Down
19 changes: 19 additions & 0 deletions cpp/csp/adapters/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ class KafkaConsumer
KafkaSubscriber * wildcardSubscriber = nullptr;
std::vector<bool> partitionLive;
bool flaggedReplayComplete = false;

void markReplayComplete()
{
if( !flaggedReplayComplete )
{
// Flag all regular subscribers
for( auto& subscriberEntry : subscribers )
{
for( auto* subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}

// Handle wildcard subscriber if present
if( wildcardSubscriber )
wildcardSubscriber -> flagReplayComplete();

flaggedReplayComplete = true;
}
}
};

std::unordered_map<std::string,TopicData> m_topics;
Expand Down
12 changes: 7 additions & 5 deletions csp/tests/adapters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ def kafkabroker():


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkabroker):
group_id = "group.id123"
_kafkaadapter = KafkaAdapterManager(
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
)
def kafkaadapterkwargs(kafkabroker):
return dict(broker=kafkabroker, group_id="group.id123", rd_kafka_conf_options={"allow.auto.create.topics": "true"})


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkaadapterkwargs):
_kafkaadapter = KafkaAdapterManager(**kafkaadapterkwargs)
return _kafkaadapter
51 changes: 46 additions & 5 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,58 @@ def graph(symbols: list, count: int):
assert [v[1] for v in sub_bytes] == [v[1] for v in pub[:count]]

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
@pytest.fixture(autouse=True)
def test_invalid_topic(self, kafkaadapter):
def test_invalid_topic(self, kafkaadapterkwargs):
class SubData(csp.Struct):
msg: str

kafkaadapter1 = KafkaAdapterManager(**kafkaadapterkwargs)

# Was a bug where engine would stall
def graph():
def graph_sub():
# csp.print('status', kafkaadapter.status())
return kafkaadapter.subscribe(
return kafkaadapter1.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
kafkaadapter2 = KafkaAdapterManager(**kafkaadapterkwargs)

def graph_pub():
msg_mapper = RawTextMessageMapper()
kafkaadapter2.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
def test_invalid_broker(self, kafkaadapterkwargs):
dict_with_broker = kafkaadapterkwargs.copy()
dict_with_broker["broker"] = "foobar"

kafkaadapter1 = KafkaAdapterManager(**dict_with_broker)

class SubData(csp.Struct):
msg: str

# Was a bug where engine would stall
def graph_sub():
return kafkaadapter1.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

kafkaadapter2 = KafkaAdapterManager(**dict_with_broker)

def graph_pub():
msg_mapper = RawTextMessageMapper()
kafkaadapter2.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

0 comments on commit 6cca482

Please sign in to comment.