Skip to content

Commit

Permalink
Make it clearer how to extend the library
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 22, 2024
1 parent 3b879bc commit 00303f7
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pkg/eventstreams/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) {
assert.True(t, created)

// Find the second one by topic filter
esList, _, err := mgr.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).Eq("topicfilter", "topic2"))
esList, _, err := mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).Eq("topicfilter", "topic2"))
assert.NoError(t, err)
assert.Len(t, esList, 1)
assert.Equal(t, "stream2", *esList[0].Name)
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) {
assert.NoError(t, err)

// Check no streams left
esList, _, err = mgr.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).And())
esList, _, err = mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).And())
assert.NoError(t, err)
assert.Empty(t, esList)

Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (es *eventStream[CT, DT]) checkSetStatus(ctx context.Context, targetStatus
}

func (es *eventStream[CT, DT]) persistStatus(ctx context.Context, targetStatus EventStreamStatus) error {
fb := EventStreamFilters.NewUpdate(ctx)
fb := GenericEventStreamFilters.NewUpdate(ctx)
return es.esm.persistence.EventStreams().Update(ctx, es.spec.GetID(), fb.Set("status", targetStatus))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error {
const pageSize = 25
var skip uint64
for {
fb := EventStreamFilters.NewFilter(ctx)
fb := GenericEventStreamFilters.NewFilter(ctx)
streams, _, err := esm.persistence.EventStreams().GetMany(ctx, fb.And().Skip(skip).Limit(pageSize))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/eventstreams/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (co
eventStreams: crudmocks.NewCRUD[*GenericEventStream](t),
checkpoints: crudmocks.NewCRUD[*EventStreamCheckpoint](t),
}
mp.eventStreams.On("GetQueryFactory").Return(EventStreamFilters).Maybe()
mp.eventStreams.On("GetQueryFactory").Return(GenericEventStreamFilters).Maybe()

ctx, cancelCtx := context.WithCancel(context.Background())
config.RootConfigReset()
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestListStreamsFail(t *testing.T) {
})
defer done()

_, _, err := esm.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).And())
_, _, err := esm.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).And())
assert.Regexp(t, "pop", err)

}
Expand Down
41 changes: 28 additions & 13 deletions pkg/eventstreams/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ type Persistence[CT EventStreamSpec] interface {
Close()
}

var EventStreamFilters = &ffapi.QueryFields{
"id": &ffapi.StringField{},
"created": &ffapi.TimeField{},
"updated": &ffapi.TimeField{},
"name": &ffapi.StringField{},
"status": &ffapi.StringField{},
"type": &ffapi.StringField{},
"topicfilter": &ffapi.StringField{},
"identity": &ffapi.StringField{},
"initialsequenceid": &ffapi.StringField{},
}

var CheckpointFilters = &ffapi.QueryFields{
"id": &ffapi.StringField{},
"created": &ffapi.TimeField{},
Expand All @@ -53,6 +41,22 @@ var CheckpointFilters = &ffapi.QueryFields{

type IDValidator func(ctx context.Context, idStr string) error

// This is a base object, and set of filters, that you can use if:
// - You are happy exposing all the built-in types of consumer (webhooks/websockets)
// - You do not need to extend the configuration in any way
// - You are happy using UUIDs for your IDs per dbsql.ResourceBase semantics
//
// A pre-built persistence library is provided, and sample migrations, that work
// with this structure.
//
// The design of the generic is such that you can start with the generic structure,
// and then move to your own structure later if you want to add more fields.
//
// When you are ready to extend, you need to:
// 1. Copy the GenericEventStream source into your own repo, and rename it appropriately.
// Then you can add your extra configuration fields to it.
// 2. Copy the EventStreams() and Checkpoints() CRUD factories into your own repo,
// and extend them with additional columns etc. as you see fit.
type GenericEventStream struct {
dbsql.ResourceBase
Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"`
Expand All @@ -62,6 +66,17 @@ type GenericEventStream struct {
Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"`
}

var GenericEventStreamFilters = &ffapi.QueryFields{
"id": &ffapi.StringField{},
"created": &ffapi.TimeField{},
"updated": &ffapi.TimeField{},
"name": &ffapi.StringField{},
"status": &ffapi.StringField{},
"type": &ffapi.StringField{},
"topicfilter": &ffapi.StringField{},
"initialsequenceid": &ffapi.StringField{},
}

func (ges *GenericEventStream) SetID(s string) {
ges.ID = fftypes.MustParseUUID(s)
}
Expand Down Expand Up @@ -157,7 +172,7 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*GenericEventStream] {
ScopedFilter: func() sq.Eq { return sq.Eq{} },
EventHandler: nil, // set below
NameField: "name",
QueryFactory: EventStreamFilters,
QueryFactory: GenericEventStreamFilters,
IDValidator: p.idValidator,
GetFieldPtr: func(inst *GenericEventStream, col string) interface{} {
switch col {
Expand Down

0 comments on commit 00303f7

Please sign in to comment.