Skip to content

Commit

Permalink
PushPullInputAdapter:
Browse files Browse the repository at this point in the history
 - renamed KafkaStartOffset enum to REplayMode and moved up so other push pull adapters can follow similar conventions
- PushPull will now always force ticks in sim mode with time < starttime to replay at starttime.
- Kafka adapter removed logic to drop ticks before starttime ( If user doesnt want data before starttime, they shouldnt be passing EARLIEST )

Signed-off-by: Rob Ambalu <[email protected]>
  • Loading branch information
robambalu committed Feb 12, 2025
1 parent 781b510 commit 4145649
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 23 deletions.
14 changes: 9 additions & 5 deletions cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ void KafkaConsumer::start( DateTime starttime )
auto & startOffsetProperty = m_mgr -> startOffsetProperty();
if( std::holds_alternative<int64_t>( startOffsetProperty ) )
{
KafkaStartOffset sOffset = ( KafkaStartOffset ) std::get<int64_t>( startOffsetProperty );
switch( sOffset )
ReplayMode replayMode = ( ReplayMode ) std::get<int64_t>( startOffsetProperty );
switch( replayMode )
{
case KafkaStartOffset::EARLIEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_BEGINNING ); break;
case KafkaStartOffset::LATEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_END ); break;
case KafkaStartOffset::START_TIME: m_rebalanceCb -> setStartTime( starttime ); break;
case ReplayMode::EARLIEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_BEGINNING ); break;
case ReplayMode::LATEST: m_rebalanceCb -> setStartOffset( RdKafka::Topic::OFFSET_END ); break;
case ReplayMode::START_TIME: m_rebalanceCb -> setStartTime( starttime ); break;

case ReplayMode::NUM_TYPES:
case ReplayMode::UNKNOWN:
CSP_THROW( ValueError, "start_offset is unset" );
}
}
else if( std::holds_alternative<DateTime>( startOffsetProperty ) )
Expand Down
7 changes: 0 additions & 7 deletions cpp/csp/adapters/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ class KafkaConsumer
void forceReplayCompleted();

private:
//should align with python side enum
enum class KafkaStartOffset
{
EARLIEST = 1,
LATEST = 2,
START_TIME = 3,
};

struct TopicData
{
Expand Down
4 changes: 0 additions & 4 deletions cpp/csp/adapters/kafka/KafkaInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ void KafkaInputAdapter::processMessage( RdKafka::Message* message, bool live, cs
if( ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE )
{
msgTime = DateTime::fromMilliseconds( ts.timestamp );

//If user requested kafka data earlier than engine start, we will force it as live so it makes it into the engine
if( msgTime < rootEngine() -> startTime() )
pushLive = true;
}
else
{
Expand Down
21 changes: 20 additions & 1 deletion cpp/csp/engine/Enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace csp
{

// NOTE this must align with the python side Enum definition ///
// NOTE these must align with the python side Enum definition ///
struct PushModeTraits
{
enum _enum : unsigned char
Expand All @@ -25,6 +25,25 @@ struct PushModeTraits

using PushMode = Enum<PushModeTraits>;

//ReplayMode is used by PushPull adapters
struct ReplayModeTraits
{
enum _enum : unsigned char
{
UNKNOWN = 0,
EARLIEST = 1, //Replay all available data
LATEST = 2, //no replay at all, start from latest
START_TIME = 3, //replay from engine start time

NUM_TYPES
};

protected:
_enum m_value;
};

using ReplayMode = Enum<ReplayModeTraits>;

}

#endif
6 changes: 6 additions & 0 deletions cpp/csp/engine/PushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ PushPullInputAdapter::PullDataEvent * PushPullInputAdapter::nextPullEvent()
auto * event = m_poppedPullEvents.front();
m_poppedPullEvents.pop();

//Always force time before start to start. There are two possibilities:
//- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime
//- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime
if( unlikely( event -> time < rootEngine() -> startTime() ) )
event -> time = rootEngine() -> startTime();

if( m_adjustOutOfOrderTime && event )
event -> time = std::max( event -> time, rootEngine() -> now() );

Expand Down
8 changes: 3 additions & 5 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
RawTextMessageMapper,
hash_mutable,
)
from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def
from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def, ReplayMode
from csp.lib import _kafkaadapterimpl

_ = BytesMessageProtoMapper, DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper
Expand All @@ -32,10 +32,8 @@ class KafkaStatusMessageType(IntEnum):
GENERIC_ERROR = 4


class KafkaStartOffset(csp.Enum):
EARLIEST = 1 # Replay all of history
LATEST = 2 # Start from new msgs
START_TIME = 3 # Start from csp run starttime
# Backward compatible
KafkaStartOffset = ReplayMode


class KafkaAdapterManager:
Expand Down
9 changes: 9 additions & 0 deletions csp/impl/types/common_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ class PushMode(IntEnum):
NON_COLLAPSING = 2
BURST = 3

class ReplayMode(IntEnum):
"""PushPull adapters can take a replay_mode option to specify how to replay data
EARLIEST will replay all available data (Note that data with timestamps before engine start will be forced to playback at starttime )
LATEST only run from latest data ( effectively, no replay )
START_TIME playback all data from engine starttime
"""
EARLIEST = 1
LATEST = 2
START_TIME = 3

class DuplicatePolicy(IntEnum):
"""An 'enum' that specifies the policy for handling the last value in functions like value_at."""
Expand Down
2 changes: 1 addition & 1 deletion csp/impl/wiring/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from csp.impl.mem_cache import csp_memoized_graph_object
from csp.impl.outputadapter import OutputAdapter # noqa: F401
from csp.impl.types import tstype
from csp.impl.types.common_definitions import ArgKind, InputDef, OutputDef, PushMode
from csp.impl.types.common_definitions import ArgKind, InputDef, OutputDef, PushMode, ReplayMode
from csp.impl.types.container_type_normalizer import ContainerTypeNormalizer
from csp.impl.types.tstype import ts
from csp.impl.wiring.signature import Signature
Expand Down

0 comments on commit 4145649

Please sign in to comment.