Skip to content

Commit

Permalink
Dont do topic checking, too many caveats
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat Khanbabayev <[email protected]>
  • Loading branch information
NeejWeej committed Jan 31, 2025
1 parent ae4dbb8 commit 787740a
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 118 deletions.
79 changes: 7 additions & 72 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ class EventCb : public RdKafka::EventCb

KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary & properties ) : AdapterManager( engine ),
m_consumerIdx( 0 ),
m_producerPollThreadActive( false )
m_producerPollThreadActive( false ),
m_unrecoverableError( false )
{
m_maxThreads = properties.get<uint64_t>( "max_threads" );
m_pollTimeoutMs = properties.get<TimeDelta>( "poll_timeout" ).asMilliseconds();
m_brokerConnectTimeoutMs = properties.get<TimeDelta>( "broker_connect_timeout" ).asMilliseconds();

m_eventCb = std::make_unique<EventCb>( this );
m_producerCb = std::make_unique<DeliveryReportCb>( this );
Expand Down Expand Up @@ -135,6 +135,7 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction

void KafkaAdapterManager::forceShutdown( const std::string & err )
{
m_unrecoverableError = true; // So we can alert the producer to stop trying to flush
forceConsumerReplayComplete();
try
{
Expand All @@ -152,72 +153,6 @@ void KafkaAdapterManager::forceConsumerReplayComplete()
consumer -> forceReplayCompleted();
}

void KafkaAdapterManager::fetchMetadata() {
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err;

// Try with producer first if we have one
if ( m_producer ) {
err = m_producer -> metadata(
true, // get all topics
nullptr, // Topic pointer to specific topic (null since we getting them all)
&metadata, // pointer to hold metadata. It must be released by calling delete
m_brokerConnectTimeoutMs // timeout before failing
);
}

// Otherwise try with first consumer
else if (!m_consumerVector.empty()) {
err = m_consumerVector[0].get()->getMetadata(
true,
nullptr,
&metadata,
m_brokerConnectTimeoutMs
);
} else {
CSP_THROW(RuntimeException, "No producer or consumer available to fetch metadata");
}

if (err != RdKafka::ERR_NO_ERROR) {
if (metadata) {
delete metadata;
}
CSP_THROW(RuntimeException, "Failed to get metadata: " << RdKafka::err2str(err));
}

m_metadata.reset(metadata);
}


// This also serves as a validation check for the broker
void KafkaAdapterManager::validateTopic(const std::string& topic){
if (m_validated_topics.find(topic) != m_validated_topics.end()) {
return;
}
if (!m_metadata) {
fetchMetadata();
}
const std::vector<const RdKafka::TopicMetadata*>* topics_vec = m_metadata->topics();
auto it = std::find_if(
topics_vec -> begin(),
topics_vec -> end(),
[&topic](const RdKafka::TopicMetadata* mt) {
return mt -> topic() == topic;
}
);

if (it == topics_vec->end())
CSP_THROW(RuntimeException, "Topic does not exist: " << topic);

const RdKafka::TopicMetadata* topic_metadata = *it;
if (topic_metadata->err() != RdKafka::ERR_NO_ERROR) {
std::stringstream err_msg;
err_msg << "Topic error for " << topic << ": " << RdKafka::err2str(topic_metadata->err());
CSP_THROW(RuntimeException, err_msg.str());
}
m_validated_topics.insert(topic);
}

void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
{
std::string errstr;
Expand Down Expand Up @@ -288,18 +223,18 @@ void KafkaAdapterManager::pollProducers()
{
while( m_producerPollThreadActive )
{
m_producer -> poll( 1000 );
m_producer -> poll( m_pollTimeoutMs );
}

try
{
while( true )
{
auto rc = m_producer -> flush( 10000 );
if( !rc )
auto rc = m_producer -> flush( 5000 );
if( !rc || m_unrecoverableError )
break;

if( rc && rc != RdKafka::ERR__TIMED_OUT )
if( rc != RdKafka::ERR__TIMED_OUT )
CSP_THROW( RuntimeException, "KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str( rc ) );
}
}
Expand Down
12 changes: 3 additions & 9 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <atomic>
#include <string>
#include <thread>
#include <unordered_map>
Expand All @@ -18,7 +19,6 @@ class Conf;
class DeliveryReportCb;
class EventCb;
class Producer;
class Metadata;
}

namespace csp::adapters::kafka
Expand Down Expand Up @@ -73,18 +73,14 @@ class KafkaAdapterManager final : public csp::AdapterManager
const Dictionary::Value & startOffsetProperty() const { return m_startOffsetProperty; }

int pollTimeoutMs() const { return m_pollTimeoutMs; }
int brokerConnectTimeoutMs() const { return m_brokerConnectTimeoutMs; }

void forceShutdown( const std::string & err );

void validateTopic(const std::string& topic);

private:

using TopicKeyPair = std::pair<std::string, std::string>;

void setConfProperties( RdKafka::Conf * conf, const Dictionary & properties );
void fetchMetadata();
void pollProducers();
void forceConsumerReplayComplete();

Expand All @@ -107,21 +103,19 @@ class KafkaAdapterManager final : public csp::AdapterManager
Subscribers m_subscribers;

int m_pollTimeoutMs;
int m_brokerConnectTimeoutMs;
size_t m_maxThreads;
size_t m_consumerIdx;

std::unique_ptr<RdKafka::EventCb> m_eventCb;
std::shared_ptr<RdKafka::Producer> m_producer;
std::unique_ptr<RdKafka::DeliveryReportCb> m_producerCb;
std::unique_ptr<std::thread> m_producerPollThread;
volatile bool m_producerPollThreadActive;
std::atomic<bool> m_producerPollThreadActive;
std::atomic<bool> m_unrecoverableError;

std::unique_ptr<RdKafka::Conf> m_consumerConf;
std::unique_ptr<RdKafka::Conf> m_producerConf;
Dictionary::Value m_startOffsetProperty;
std::unique_ptr<RdKafka::Metadata> m_metadata;
std::unordered_set<std::string> m_validated_topics;
};

}
Expand Down
26 changes: 4 additions & 22 deletions cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ void KafkaConsumer::start( DateTime starttime )
// we flag replay complete as soon as we identify it.
if( topic_data.wildcardSubscriber )
topic_data.wildcardSubscriber -> flagReplayComplete();
m_mgr->validateTopic(topic);
}

RdKafka::ErrorCode err = m_consumer -> subscribe( topics );
Expand Down Expand Up @@ -187,9 +186,8 @@ void KafkaConsumer::setNumPartitions( const std::string & topic, size_t num )

void KafkaConsumer::forceReplayCompleted()
{
for( auto & entry : m_topics ){
for( auto & entry : m_topics )
entry.second.markReplayComplete();
}
}

void KafkaConsumer::poll()
Expand Down Expand Up @@ -275,33 +273,17 @@ void KafkaConsumer::poll()
}
}

//we need to flag end in case the topic doesnt have any incoming data, we cant stall the engine on the pull side of the adapter
if( allDone )
{
//we need to flag end in case the topic doesnt have any incoming data, we cant stall the engine on the pull side of the adapter
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
topicData.flaggedReplayComplete = true;
}
topicData.markReplayComplete();
}
}
else
{
//In most cases we should not get here, if we do then something is wrong
//safest bet is to release the pull adapter so it doesnt stall the engine and
//we can let the error msg through
if( !topicData.flaggedReplayComplete )
{
//flag inputs as done so they dont hold up the engine
for( auto & subscriberEntry : topicData.subscribers )
{
for( auto * subscriber : subscriberEntry.second )
subscriber -> flagReplayComplete();
}
topicData.flaggedReplayComplete = true;
}
topicData.markReplayComplete();

std::string errmsg = "KafkaConsumer: Message error on topic \"" + msg -> topic_name() + "\". errcode: " + RdKafka::err2str( msg -> err() ) + " error: " + msg -> errstr();
m_mgr -> pushStatus( StatusLevel::ERROR, KafkaStatusMessageType::MSG_RECV_ERROR, errmsg );
Expand Down
9 changes: 0 additions & 9 deletions cpp/csp/adapters/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ class KafkaConsumer

void forceReplayCompleted();

RdKafka::ErrorCode getMetadata(
bool all_topics,
const RdKafka::Topic* only_rkt,
RdKafka::Metadata** metadatap,
int timeout_ms) const
{
return m_consumer->metadata(all_topics, only_rkt, metadatap, timeout_ms);
}

private:
//should align with python side enum
enum class KafkaStartOffset
Expand Down
4 changes: 2 additions & 2 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ PushInputAdapter * KafkaPublisher::getStatusAdapter()
void KafkaPublisher::start( std::shared_ptr<RdKafka::Producer> producer )
{
m_producer = producer;
std::string errstr;
m_adapterMgr.validateTopic( m_topic ); // make sure we can access topic

std::unique_ptr<RdKafka::Conf> tconf( RdKafka::Conf::create( RdKafka::Conf::CONF_TOPIC ) );

std::string errstr;
m_kafkaTopic = std::shared_ptr<RdKafka::Topic>( RdKafka::Topic::create( m_producer.get(), m_topic, tconf.get(), errstr ) );
if( !m_kafkaTopic )
CSP_THROW( RuntimeException, "Failed to create RdKafka::Topic for producer on topic " << m_topic << ":" << errstr );
Expand Down
6 changes: 4 additions & 2 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def __init__(
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5),
):
"""
:param broker - broker URL
Expand Down Expand Up @@ -101,7 +100,6 @@ def __init__(
"rd_kafka_conf_properties": conf_properties,
"rd_kafka_consumer_conf_properties": consumer_properties,
"rd_kafka_producer_conf_properties": producer_properties,
"broker_connect_timeout": broker_connect_timeout,
}

if auth:
Expand Down Expand Up @@ -177,6 +175,10 @@ def subscribe(
properties["meta_field_map"] = meta_field_map
properties["adjust_out_of_order_time"] = adjust_out_of_order_time
if extract_timestamp_from_field is not None:
if meta_field_map.get("timestamp") == extract_timestamp_from_field:
raise ValueError(
f"Field '{extract_timestamp_from_field}' cannot be used for both timestamp extraction and meta field mapping"
)
properties["extract_timestamp_from_field"] = extract_timestamp_from_field

return _kafka_input_adapter_def(self, ts_type, properties, push_mode)
Expand Down
20 changes: 20 additions & 0 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,23 @@ def graph_pub():
# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
def test_meta_field_map_extract_timestamp_from_field(self, kafkaadapterkwargs):
class SubData(csp.Struct):
msg: str
dt: datetime

kafkaadapter1 = KafkaAdapterManager(**kafkaadapterkwargs)

def graph_sub():
return kafkaadapter1.subscribe(
ts_type=SubData,
msg_mapper=RawTextMessageMapper(),
meta_field_map={"timestamp": "dt"},
topic="foobar",
extract_timestamp_from_field="dt",
)

with pytest.raises(ValueError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
3 changes: 1 addition & 2 deletions docs/wiki/api-references/Input-Output-Adapters-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ KafkaAdapterManager(
sasl_kerberos_service_name='kafka',
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5)
poll_timeout: timedelta = timedelta(seconds=1)
):
```

Expand Down

0 comments on commit 787740a

Please sign in to comment.