Skip to content

Commit

Permalink
wip: time index lookup; shifted topic partition hash into a proto schema
Browse files Browse the repository at this point in the history
  • Loading branch information
felixangell committed Oct 8, 2024
1 parent 9582d0e commit ab7d0cc
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 79 deletions.
34 changes: 25 additions & 9 deletions api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var DefaultCarnaxConfig = ParseFromProperties(Properties{
//raftMaxPool: 3,
})

// CarnaxController
// CarnaxController ...
type CarnaxController struct {
// raft consensus related bits
RaftBind string
Expand All @@ -123,7 +123,7 @@ type CarnaxController struct {
config CarnaxConfig

// @local
segmentCache map[TopicPartitionHash]*SegmentCache
segmentCache map[*TopicPartitionHash]*SegmentCache

// output
storeBackedLog ObjectStore // @leader only!
Expand All @@ -134,7 +134,7 @@ type CarnaxController struct {
func NewCarnaxControllerWithConfig(store ObjectStore, config CarnaxConfig) *CarnaxController {
return &CarnaxController{
storeBackedLog: store,
segmentCache: map[TopicPartitionHash]*SegmentCache{},
segmentCache: map[*TopicPartitionHash]*SegmentCache{},
state: &sharedMessageLogState{
segment: NewSegmentLog(),
topicConfig: make(map[string]*apiv1.TopicConfig),
Expand All @@ -150,7 +150,7 @@ func (m *CarnaxController) Start(raftInst *raft.Raft) error {
m.raft = raftInst

// NOTE: we are _expecting_ election here
// so we wait until we are marked as a leader.
// we wait until we are marked as a leader.
if m.config.ExecutionMode == SingleNode {
start := time.Now()
for {
Expand All @@ -163,6 +163,8 @@ func (m *CarnaxController) Start(raftInst *raft.Raft) error {
break
}
}
} else if m.config.ExecutionMode == MultiNode {
panic("not yet supported")
} else {
panic("not yet supported")
}
Expand Down Expand Up @@ -343,7 +345,7 @@ func (m *CarnaxController) read(topic string, partitionIndex uint32, offset uint
//
// nit: we should respect the state of the consumer group node here
// e.g. if it's in a rebalance or not.
func (m *CarnaxController) Poll(consumerGroupId string, clientId string, duration time.Duration) (*controllerv1.Poll_Response, error) {
func (m *CarnaxController) Poll(consumerGroupId string, clientId string, _ time.Duration) (*controllerv1.Poll_Response, error) {
if m.getRaft().State() != raft.Leader {
return nil, ErrNotALeader
}
Expand Down Expand Up @@ -380,7 +382,7 @@ func (m *CarnaxController) Poll(consumerGroupId string, clientId string, duratio
continue
}

writtenBytes := calcRecordLen(rec, offset.Address.Offset)
writtenBytes := calcRecordLen(rec)

// NOTE: we can't necessarily foresee when we are in another segment at this point
nextOffset := offset.Address.Offset + writtenBytes + SegmentIncrement
Expand All @@ -404,7 +406,7 @@ func (m *CarnaxController) Poll(consumerGroupId string, clientId string, duratio
}, nil
}

func calcRecordLen(rec *apiv1.Record, offs uint64) uint64 {
func calcRecordLen(rec *apiv1.Record) uint64 {
buf := new(bytes.Buffer)
amt, err := protodelim.MarshalTo(buf, rec)
if err != nil {
Expand Down Expand Up @@ -567,7 +569,7 @@ func (m *CarnaxController) Write(topic string, record *apiv1.Record) (*commandv1
panic("failed to get write result")
}

func (m *CarnaxController) softCommit(id string, clientId string, index uint32, delta uint64) error {
func (m *CarnaxController) softCommit(id string, clientId string, index uint32, offset uint64) error {
if m.getRaft().State() != raft.Leader {
return ErrNotALeader
}
Expand All @@ -579,7 +581,7 @@ func (m *CarnaxController) softCommit(id string, clientId string, index uint32,
ConsumerGroupId: id,
ClientId: clientId,
PartitionIndex: index,
NextOffsetDelta: delta,
Offset: offset,
},
},
})
Expand All @@ -595,3 +597,17 @@ func (m *CarnaxController) softCommit(id string, clientId string, index uint32,
res.Response()
return nil
}

// OffsetsForTimes ...
// default.api.timeout.ms
func (m *CarnaxController) OffsetsForTimes(id string, clientId string, m2 map[*TopicPartitionHash]*apiv1.SeekIndex) {
// todo: shift into apply...

for tph, seekIndex := range m2 {
// for given partition do index seeks by timestamp to an offset.
log.Println(tph.String(), "to", seekIndex)

// soft commit
//m.softCommit(id, clientId, 0, 0)
}
}
20 changes: 10 additions & 10 deletions api/controller_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *CarnaxControllerFSM) Apply(l *raft.Log) interface{} {
return f.applyCommitSync(sc.ConsumerGroupId)
case commandv1.CommandType_COMMAND_TYPE_SOFT_COMMIT:
sc := cmd.GetSoftCommit()
return f.applySoftCommit(sc.ConsumerGroupId, sc.ClientId, sc.PartitionIndex, sc.NextOffsetDelta)
return f.applySoftCommit(sc.ConsumerGroupId, sc.ClientId, sc.PartitionIndex, sc.Offset)
default:
panic("Command is not handled " + cmd.Type.String())
}
Expand Down Expand Up @@ -297,17 +297,17 @@ func (f *CarnaxControllerFSM) applyReadMessage(topic string, address *commandv1.
}

func (f *CarnaxControllerFSM) tryReadWithSegmentCacheHistory(topic string, address *commandv1.Address, point commandv1.ResetPoint) interface{} {
hash := tpHash(topic, address.PartitionIndex)
hash := newTopicHash(topic, address.PartitionIndex)

log.Println("READ:", internal.FormatAddr(address), "RP:", point, hash)
log.Println("READ:", internal.FormatAddr(address), "RP:", point, hash.String())

allSegmentPaths := f.storeBackedLog.List(string(hash))
allSegmentPaths := f.storeBackedLog.List(hash.String())
segmentFolderPath := findLowestSegmentFile(allSegmentPaths, address.Offset)

// 1. find the offsIndex file.
// .index file format is offset:byte_offset
indexFilePath, err := func() (string, error) {
indexSearchKey := fmt.Sprintf("%s/%s.index", string(hash), segmentFolderPath)
indexSearchKey := fmt.Sprintf("%s/%s.index", hash.String(), segmentFolderPath)
res := f.storeBackedLog.List(indexSearchKey)
log.Println("INDEX_LU", indexSearchKey)
if len(res) == 1 {
Expand All @@ -316,7 +316,7 @@ func (f *CarnaxControllerFSM) tryReadWithSegmentCacheHistory(topic string, addre
return "", ErrNoIndexFound
}()
if err == ErrNoIndexFound {
log.Println("no offsIndex found for " + string(hash))
log.Println("no offsIndex found for " + hash.String())
return nil
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func (f *CarnaxControllerFSM) tryReadWithSegmentCacheHistory(topic string, addre

log.Println("Addr", address.Offset, "is offs:", pos.Offset, "byte pos:", pos.Position, "point", point)

logSearchKey := fmt.Sprintf("%s/%s.log", string(hash), segmentFolderPath)
logSearchKey := fmt.Sprintf("%s/%s.log", hash.String(), segmentFolderPath)
log.Println("DataFile:", logSearchKey)
logSegmentFileData, err := f.storeBackedLog.Get(logSearchKey)
if err != nil {
Expand Down Expand Up @@ -558,7 +558,7 @@ func (f *CarnaxControllerFSM) applyCommitSync(id string) interface{} {
}

func (f *CarnaxControllerFSM) tryFindInCache(topic string, address *commandv1.Address) (*apiv1.Record, bool) {
sc, ok := f.segmentCache[tpHash(topic, address.PartitionIndex)]
sc, ok := f.segmentCache[newTopicHash(topic, address.PartitionIndex)]
if !ok {
return nil, false
}
Expand Down Expand Up @@ -595,10 +595,10 @@ func (f *CarnaxControllerFSM) tryFindInCache(topic string, address *commandv1.Ad
}

func (f *CarnaxControllerFSM) cacheSegment(topic string, address *commandv1.Address, reader []byte) {
sc, ok := f.segmentCache[tpHash(topic, address.PartitionIndex)]
sc, ok := f.segmentCache[newTopicHash(topic, address.PartitionIndex)]
if !ok {
sc = newSegmentCache()
f.segmentCache[tpHash(topic, address.PartitionIndex)] = sc
f.segmentCache[newTopicHash(topic, address.PartitionIndex)] = sc
}

sc.cacheSeg(address.Offset, reader)
Expand Down
13 changes: 13 additions & 0 deletions api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,4 +558,17 @@ func (c *carnaxTestSuite) TestIndexByTimestampToOffset() {
// seek to offset by timestamp
// poll message
// done

cg, err := c.controller.Subscribe("group", "id", "some_topic")
assert.NoError(c.T(), err)

c.controller.OffsetsForTimes(cg.ConsumerGroupId, cg.ClientId, map[*TopicPartitionHash]*apiv1.SeekIndex{
newTopicHash("some_topic", 0): {
Whence: &apiv1.SeekIndex_Time{
Time: startTime.Add(2 * day).UnixMilli(),
},
},
})

c.controller.Poll(cg.ConsumerGroupId, cg.ClientId, 15*time.Second)
}
20 changes: 15 additions & 5 deletions api/tp_hash.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package api

import "fmt"
import (
"fmt"
apiv1 "github.com/yarefs/carnax/gen/api/v1"
)

type TopicPartitionHash string
type TopicPartitionHash apiv1.TopicPartition

func tpHash(topic string, partitionIndex uint32) TopicPartitionHash {
rawHash := fmt.Sprintf("%s-%d", topic, partitionIndex)
return TopicPartitionHash(rawHash)
func (t *TopicPartitionHash) String() string {
return fmt.Sprintf("%s-%d", t.Topic, t.PartitionIndex)
}

func newTopicHash(topic string, partitionIndex uint32) *TopicPartitionHash {
tp := &apiv1.TopicPartition{
Topic: topic,
PartitionIndex: partitionIndex,
}
return (*TopicPartitionHash)(tp)
}
11 changes: 11 additions & 0 deletions api/tp_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package api

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestTopicPartitionHash_String(t *testing.T) {
res := newTopicHash("foo", 123)
assert.Equal(t, "foo-123", res.String())
}
Loading

0 comments on commit ab7d0cc

Please sign in to comment.