Skip to content

Commit

Permalink
Start on cdc stream adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 26, 2024
1 parent 733aa7b commit de98755
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions flow/pua/stream_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,26 @@ func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStr
}()
return output
}

func AttachToCdcStream(ls *lua.LState, lfn *lua.LFunction, stream *model.CDCStream[model.RecordItems]) *model.CDCStream[model.RecordItems] {
// TODO no buffering on output stream
outstream := model.NewCDCStream[model.RecordItems]()
go func() {
// TODO change to latch, fix Close race
if stream.WaitAndCheckEmpty() {
outstream.SignalAsEmpty()
} else {
outstream.SignalAsNotEmpty()
for record := range stream.GetRecords() {
// TODO call lfn
// what to do about errors?
outstream.AddRecord(record)
}
}
<-stream.GetRecords() // TODO needed because empty signal comes before Close
outstream.SchemaDeltas = stream.SchemaDeltas
outstream.UpdateLatestCheckpoint(stream.GetLastCheckpoint())
outstream.Close()
}()
return outstream
}

0 comments on commit de98755

Please sign in to comment.