Skip to content

Commit

Permalink
renaming some function
Browse files Browse the repository at this point in the history
  • Loading branch information
alancolant committed Jul 10, 2024
1 parent d8ad776 commit 0805d12
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
9 changes: 5 additions & 4 deletions _examples/development/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/trigger"
"github.com/quix-labs/flash/drivers/wal_logical"
"github.com/rs/zerolog"
"os"
"os/signal"
Expand All @@ -18,12 +18,13 @@ func main() {
//}
//pprof.StartCPUProfile(f)
//defer pprof.StopCPUProfile()

fmt.Println((flash.OperationTruncate).IncludeAll(flash.OperationDelete))
return
postsListenerConfig := &flash.ListenerConfig{
Table: "public.posts",
MaxParallelProcess: 1, // In most case 1 is ideal because sync between goroutine introduce some delay
Fields: []string{"id", "slug"},
//Conditions: []*flash.ListenerCondition{{Column: "active", Value: true}},
Conditions: []*flash.ListenerCondition{{Column: "active", Value: true}},
}
postsListener, _ := flash.NewListener(postsListenerConfig)

Expand Down Expand Up @@ -106,7 +107,7 @@ func main() {
// Create custom logger
logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).With().Caller().Stack().Timestamp().Logger()

driver := trigger.NewDriver(&trigger.DriverConfig{
driver := wal_logical.NewDriver(&wal_logical.DriverConfig{
//UseStreaming: true,
})

Expand Down
5 changes: 5 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package flash

type DatabaseEvent struct {
ListenerUid string
Event Event
}
type DatabaseEventsChan chan *DatabaseEvent
type Driver interface {
Init(clientConfig *ClientConfig) error
Close() error
Expand Down
29 changes: 15 additions & 14 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ type ListenerConfig struct {

type CreateEventCallback func(event Operation) error
type DeleteEventCallback func(event Operation) error
type EventCallback func(event Event)

type Listener struct {
Config *ListenerConfig

// Internals
sync.Mutex
callbacks map[*EventCallback]Operation
listenedEvents Operation // Use bitwise comparison to check for listened events
semaphore chan struct{}
callbacks map[*EventCallback]Operation
listenedOperations Operation // Use bitwise comparison to check for listened events
semaphore chan struct{}

// Trigger client
_clientCreateEventCallback CreateEventCallback
Expand Down Expand Up @@ -72,8 +73,8 @@ func (l *Listener) On(operation Operation, callback EventCallback) (func() error
l.callbacks[&callback] = operation

removeFunc := func() error {
delete(l.callbacks, &callback) // Important keep before removeListenedEventIfNeeded
if err := l.removeListenedEventIfNeeded(operation); err != nil {
delete(l.callbacks, &callback) // Important keep before removeListenedOperationIfNeeded
if err := l.removeListenedOperationIfNeeded(operation); err != nil {
return err
}
callback = nil
Expand All @@ -84,8 +85,8 @@ func (l *Listener) On(operation Operation, callback EventCallback) (func() error
}

func (l *Listener) Dispatch(event *Event) {
for callback, listens := range l.callbacks {
if listens&(*event).GetOperation() > 0 {
for callback, listenedOperations := range l.callbacks {
if listenedOperations.IncludeOne((*event).GetOperation()) {
if l.Config.MaxParallelProcess == -1 {
go (*callback)(*event)
continue
Expand Down Expand Up @@ -118,7 +119,7 @@ func (l *Listener) Init(_createCallback CreateEventCallback, _deleteCallback Del

// Emit all events for initialization
for targetEvent := Operation(1); targetEvent != 0 && targetEvent <= OperationAll; targetEvent <<= 1 {
if l.listenedEvents&targetEvent == 0 {
if l.listenedOperations&targetEvent == 0 {
continue
}
if err := _createCallback(targetEvent); err != nil {
Expand All @@ -132,11 +133,11 @@ func (l *Listener) Init(_createCallback CreateEventCallback, _deleteCallback Del

func (l *Listener) addListenedEventIfNeeded(event Operation) error {

initialEvents := l.listenedEvents
l.listenedEvents |= event
initialEvents := l.listenedOperations
l.listenedOperations |= event

// Trigger event if change appears
diff := initialEvents ^ l.listenedEvents
diff := initialEvents ^ l.listenedOperations
if diff == 0 {
return nil
}
Expand All @@ -157,17 +158,17 @@ func (l *Listener) addListenedEventIfNeeded(event Operation) error {
return nil
}

func (l *Listener) removeListenedEventIfNeeded(event Operation) error {
func (l *Listener) removeListenedOperationIfNeeded(event Operation) error {

for targetEvent := Operation(1); targetEvent != 0 && targetEvent <= event; targetEvent <<= 1 {
if targetEvent&l.listenedEvents == 0 {
if targetEvent&l.listenedOperations == 0 {
continue
}
if l.hasListenersForEvent(targetEvent) {
continue
}

l.listenedEvents &= ^targetEvent
l.listenedOperations &= ^targetEvent
if l._clientInitialized {
l.Lock()
if err := l._clientDeleteEventCallback(targetEvent); err != nil {
Expand Down

0 comments on commit 0805d12

Please sign in to comment.