diff --git a/_examples/development/main.go b/_examples/development/main.go index 818d18b..9e7187e 100644 --- a/_examples/development/main.go +++ b/_examples/development/main.go @@ -3,7 +3,7 @@ package main import ( "fmt" "github.com/quix-labs/flash" - "github.com/quix-labs/flash/drivers/trigger" + "github.com/quix-labs/flash/drivers/wal_logical" "github.com/rs/zerolog" "os" "os/signal" @@ -18,12 +18,13 @@ func main() { //} //pprof.StartCPUProfile(f) //defer pprof.StopCPUProfile() - + fmt.Println((flash.OperationTruncate).IncludeAll(flash.OperationDelete)) + return postsListenerConfig := &flash.ListenerConfig{ Table: "public.posts", MaxParallelProcess: 1, // In most case 1 is ideal because sync between goroutine introduce some delay Fields: []string{"id", "slug"}, - //Conditions: []*flash.ListenerCondition{{Column: "active", Value: true}}, + Conditions: []*flash.ListenerCondition{{Column: "active", Value: true}}, } postsListener, _ := flash.NewListener(postsListenerConfig) @@ -106,7 +107,7 @@ func main() { // Create custom logger logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).With().Caller().Stack().Timestamp().Logger() - driver := trigger.NewDriver(&trigger.DriverConfig{ + driver := wal_logical.NewDriver(&wal_logical.DriverConfig{ //UseStreaming: true, }) diff --git a/driver.go b/driver.go index 6c5eaaa..74c57eb 100644 --- a/driver.go +++ b/driver.go @@ -1,5 +1,10 @@ package flash +type DatabaseEvent struct { + ListenerUid string + Event Event +} +type DatabaseEventsChan chan *DatabaseEvent type Driver interface { Init(clientConfig *ClientConfig) error Close() error diff --git a/listener.go b/listener.go index 8f24df9..3a3cbf1 100644 --- a/listener.go +++ b/listener.go @@ -22,15 +22,16 @@ type ListenerConfig struct { type CreateEventCallback func(event Operation) error type DeleteEventCallback func(event Operation) error +type EventCallback func(event Event) type Listener struct { Config *ListenerConfig // Internals sync.Mutex - callbacks map[*EventCallback]Operation - listenedEvents Operation // Use bitwise comparison to check for listened events - semaphore chan struct{} + callbacks map[*EventCallback]Operation + listenedOperations Operation // Use bitwise comparison to check for listened events + semaphore chan struct{} // Trigger client _clientCreateEventCallback CreateEventCallback @@ -72,8 +73,8 @@ func (l *Listener) On(operation Operation, callback EventCallback) (func() error l.callbacks[&callback] = operation removeFunc := func() error { - delete(l.callbacks, &callback) // Important keep before removeListenedEventIfNeeded - if err := l.removeListenedEventIfNeeded(operation); err != nil { + delete(l.callbacks, &callback) // Important keep before removeListenedOperationIfNeeded + if err := l.removeListenedOperationIfNeeded(operation); err != nil { return err } callback = nil @@ -84,8 +85,8 @@ func (l *Listener) On(operation Operation, callback EventCallback) (func() error } func (l *Listener) Dispatch(event *Event) { - for callback, listens := range l.callbacks { - if listens&(*event).GetOperation() > 0 { + for callback, listenedOperations := range l.callbacks { + if listenedOperations.IncludeOne((*event).GetOperation()) { if l.Config.MaxParallelProcess == -1 { go (*callback)(*event) continue @@ -118,7 +119,7 @@ func (l *Listener) Init(_createCallback CreateEventCallback, _deleteCallback Del // Emit all events for initialization for targetEvent := Operation(1); targetEvent != 0 && targetEvent <= OperationAll; targetEvent <<= 1 { - if l.listenedEvents&targetEvent == 0 { + if l.listenedOperations&targetEvent == 0 { continue } if err := _createCallback(targetEvent); err != nil { @@ -132,11 +133,11 @@ func (l *Listener) Init(_createCallback CreateEventCallback, _deleteCallback Del func (l *Listener) addListenedEventIfNeeded(event Operation) error { - initialEvents := l.listenedEvents - l.listenedEvents |= event + initialEvents := l.listenedOperations + l.listenedOperations |= event // Trigger event if change appears - diff := initialEvents ^ l.listenedEvents + diff := initialEvents ^ l.listenedOperations if diff == 0 { return nil } @@ -157,17 +158,17 @@ func (l *Listener) addListenedEventIfNeeded(event Operation) error { return nil } -func (l *Listener) removeListenedEventIfNeeded(event Operation) error { +func (l *Listener) removeListenedOperationIfNeeded(event Operation) error { for targetEvent := Operation(1); targetEvent != 0 && targetEvent <= event; targetEvent <<= 1 { - if targetEvent&l.listenedEvents == 0 { + if targetEvent&l.listenedOperations == 0 { continue } if l.hasListenersForEvent(targetEvent) { continue } - l.listenedEvents &= ^targetEvent + l.listenedOperations &= ^targetEvent if l._clientInitialized { l.Lock() if err := l._clientDeleteEventCallback(targetEvent); err != nil {