Skip to content

Commit

Permalink
Merge pull request #2 from quix-labs/dev
Browse files Browse the repository at this point in the history
Restructuration (BREAKING CHANGES)
  • Loading branch information
alancolant authored Jul 10, 2024
2 parents 27c29b3 + 0805d12 commit 983c8b1
Show file tree
Hide file tree
Showing 36 changed files with 805 additions and 467 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ Thumbs.db
/go/src/
go.sum
*.prof
/development
/development
go.work
go.work.sum
coverage.*
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.PHONY: test test-coverage

test:
go test ./...

test-coverage:
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html
xdg-open coverage.html
55 changes: 34 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ Contributions and feedback are welcome!
To install the library, run:

```bash
go get github.com/quix-labs/flash@main # Actually main is used for development
go get -u github.com/quix-labs/flash@main # Actually main is used for development
go get -u github.com/quix-labs/flash/drivers/trigger@main # Show below to for other drivers

# Write your main package

Expand All @@ -55,34 +56,34 @@ package main

import (
"fmt"
"github.com/quix-labs/flash/pkg/listeners"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/trigger"
"os"
"os/signal"

"github.com/quix-labs/flash/pkg/client"
"github.com/quix-labs/flash/pkg/types"
)

func main() {
// Example with listener and client setup
postsListenerConfig := &types.ListenerConfig{Table: "public.posts"}
postsListener := listeners.NewListener(postsListenerConfig)
postsListener.On(types.OperationAll, func(event types.Event) {
postsListener, _ := flash.NewListener(&flash.ListenerConfig{Table: "public.posts"})

postsListener.On(flash.OperationAll, func(event flash.Event) {
switch typedEvent := event.(type) {
case *types.InsertEvent:
case *flash.InsertEvent:
fmt.Printf("insert - new: %+v\n", typedEvent.New)
case *types.UpdateEvent:
case *flash.UpdateEvent:
fmt.Printf("update - old: %+v - new: %+v\n", typedEvent.Old, typedEvent.New)
case *types.DeleteEvent:
case *flash.DeleteEvent:
fmt.Printf("delete - old: %+v \n", typedEvent.Old)
case *types.TruncateEvent:
case *flash.TruncateEvent:
fmt.Printf("truncate \n")
}
})

// Create client
clientConfig := &types.ClientConfig{DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb"}
flashClient, _ := client.NewClient(clientConfig)
flashClient, _ := flash.NewClient(&flash.ClientConfig{
DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb",
Driver: trigger.NewDriver(&trigger.DriverConfig{}),
})
flashClient.Attach(postsListener)

// Start listening
Expand All @@ -96,15 +97,16 @@ func main() {

fmt.Println("Program terminated.")
}

```

For more detailed examples, check out the following files:

- [Debug queries](examples/debug_trace/debug_trace.go)
- [Trigger insert events on table](examples/trigger_insert/trigger_insert.go)
- [Trigger all events on table](examples/trigger_all/trigger_all.go)
- [Listen for specific fields](examples/specific_fields/specific_fields.go)
- [Parallel Callback](examples/parallel_callback/parallel_callback.go)
- [Debug queries](_examples/debug_trace/main.go)
- [Trigger insert events on table](_examples/trigger_insert/main.go)
- [Trigger all events on table](_examples/trigger_all/main.go)
- [Listen for specific fields](_examples/specific_fields/main.go)
- [Parallel Callback](_examples/parallel_callback/main.go)

## Advanced Features

Expand All @@ -116,7 +118,7 @@ events:
- A delete event with the old value of this column (and other fields).
- An insert event with the new value of this column (and other fields).

### 2. Custom Conditions
### 2. Custom Conditions

You can configure conditions, and if a database row does not match the criteria, you will not receive any event.

Expand All @@ -140,7 +142,18 @@ Check [drivers/README.md](pkg/drivers/README.md) to see if the driver you have c

The following features are planned for future implementation:

- ⏳ Soft-delete support: receive delete events when SQL condition is respected. Example: `deleted_at IS NOT NULL`.
- ⏳ Support for conditional listens.

| Operator | trigger | wal_logical |
|:--------:|:-----------------:|:-----------------:|
| equals |||
| neq |||
| lt |||
| lte |||
| gte |||
| not null |||
| is null | ⚠️ using eq + nil | ⚠️ using eq + nil |

- ⏳ Handling custom primary for fake insert/delete when change appears
- ⬜ Remove client in favor of direct listener start
- ⬜ Support attaching/detaching new listener during runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package main

import (
"fmt"
"github.com/quix-labs/flash/pkg/client"
"github.com/quix-labs/flash/pkg/listeners"
"github.com/quix-labs/flash/pkg/types"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/trigger"
"github.com/rs/zerolog"
"os"
)

func main() {
postsListenerConfig := &types.ListenerConfig{Table: "public.posts"}
postsListener, _ := listeners.NewListener(postsListenerConfig)

postsListenerConfig := &flash.ListenerConfig{Table: "public.posts"}
postsListener, _ := flash.NewListener(postsListenerConfig)

// Registering your callbacks
stop, err := postsListener.On(types.OperationInsert, func(event types.Event) {
typedEvent := event.(*types.InsertEvent)
stop, err := postsListener.On(flash.OperationInsert, func(event flash.Event) {
typedEvent := event.(*flash.InsertEvent)
fmt.Printf("Insert received - new: %+v\n", typedEvent.New)
})
if err != nil {
Expand All @@ -25,13 +25,18 @@ func main() {

// Create custom logger with Level Trace <-> Default is Debug
logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).With().Stack().Timestamp().Logger()

driver := trigger.NewDriver(&trigger.DriverConfig{})
// Create client
clientConfig := &types.ClientConfig{
clientConfig := &flash.ClientConfig{
DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb",
Logger: &logger, // Define your custom zerolog.Logger here
Driver: driver,
}

flashClient, err := flash.NewClient(clientConfig)
if err != nil {
fmt.Println(err)
}
flashClient, _ := client.NewClient(clientConfig)
flashClient.Attach(postsListener)

// Start listening
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"fmt"
"github.com/quix-labs/flash/pkg/client"
"github.com/quix-labs/flash/pkg/drivers/trigger"
"github.com/quix-labs/flash/pkg/listeners"
"github.com/quix-labs/flash/pkg/types"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/wal_logical"
"github.com/rs/zerolog"
"os"
"os/signal"
Expand All @@ -20,40 +18,41 @@ func main() {
//}
//pprof.StartCPUProfile(f)
//defer pprof.StopCPUProfile()

postsListenerConfig := &types.ListenerConfig{
fmt.Println((flash.OperationTruncate).IncludeAll(flash.OperationDelete))
return
postsListenerConfig := &flash.ListenerConfig{
Table: "public.posts",
MaxParallelProcess: 1, // In most case 1 is ideal because sync between goroutine introduce some delay
Fields: []string{"id", "slug"},
Conditions: []*types.ListenerCondition{{Column: "active", Value: true}},
Conditions: []*flash.ListenerCondition{{Column: "active", Value: true}},
}
postsListener, _ := listeners.NewListener(postsListenerConfig)
postsListener, _ := flash.NewListener(postsListenerConfig)

postsListener2Config := &types.ListenerConfig{
postsListener2Config := &flash.ListenerConfig{
Table: "public.posts",
MaxParallelProcess: 1, // In most case 1 is ideal because sync between goroutine introduce some delay
Fields: []string{"active"},
Conditions: []*types.ListenerCondition{{Column: "slug", Value: nil}},
Conditions: []*flash.ListenerCondition{{Column: "slug", Value: nil}},
}
postsListener2, _ := listeners.NewListener(postsListener2Config)
postsListener2, _ := flash.NewListener(postsListener2Config)

// Registering your callbacks
var i = 0
var mutex sync.Mutex

stopAll, err := postsListener.On(types.OperationAll, func(event types.Event) {
stopAll, err := postsListener.On(flash.OperationAll, func(event flash.Event) {
mutex.Lock()
i++
mutex.Unlock()

switch typedEvent := event.(type) {
case *types.InsertEvent:
case *flash.InsertEvent:
fmt.Printf("insert - new: %+v\n", typedEvent.New)
case *types.UpdateEvent:
case *flash.UpdateEvent:
fmt.Printf("update - old: %+v - new: %+v\n", typedEvent.Old, typedEvent.New)
case *types.DeleteEvent:
case *flash.DeleteEvent:
fmt.Printf("delete - old: %+v \n", typedEvent.Old)
case *types.TruncateEvent:
case *flash.TruncateEvent:
fmt.Printf("truncate \n")
}
})
Expand All @@ -68,19 +67,19 @@ func main() {
}
}()

stopAll2, err := postsListener2.On(types.OperationAll, func(event types.Event) {
stopAll2, err := postsListener2.On(flash.OperationAll, func(event flash.Event) {
mutex.Lock()
i++
mutex.Unlock()

switch typedEvent := event.(type) {
case *types.InsertEvent:
case *flash.InsertEvent:
fmt.Printf("2-insert - new: %+v\n", typedEvent.New)
case *types.UpdateEvent:
case *flash.UpdateEvent:
fmt.Printf("2-update - old: %+v - new: %+v\n", typedEvent.Old, typedEvent.New)
case *types.DeleteEvent:
case *flash.DeleteEvent:
fmt.Printf("2-delete - old: %+v \n", typedEvent.Old)
case *types.TruncateEvent:
case *flash.TruncateEvent:
fmt.Printf("2-truncate \n")
}
})
Expand Down Expand Up @@ -108,20 +107,19 @@ func main() {
// Create custom logger
logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).With().Caller().Stack().Timestamp().Logger()

driver := trigger.NewDriver(&trigger.DriverConfig{
driver := wal_logical.NewDriver(&wal_logical.DriverConfig{
//UseStreaming: true,
})

// Create client
clientConfig := &types.ClientConfig{
clientConfig := &flash.ClientConfig{
DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb",
Logger: &logger, // Define your custom zerolog.Logger here
ShutdownTimeout: time.Second * 2,
Driver: driver,
}
flashClient, _ := client.NewClient(clientConfig)
flashClient.Attach(postsListener)
flashClient.Attach(postsListener2)
flashClient, _ := flash.NewClient(clientConfig)
flashClient.Attach(postsListener, postsListener2)

// Start listening
go func() {
Expand Down
37 changes: 37 additions & 0 deletions _examples/parallel_callback/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/trigger"
)

func main() {
postsListener, _ := flash.NewListener(&flash.ListenerConfig{
Table: "public.posts",
MaxParallelProcess: 50, // Default to 1, you can use -1 for infinite goroutine
})

stop, err := postsListener.On(flash.OperationInsert|flash.OperationDelete, func(event flash.Event) {
switch typedEvent := event.(type) {
case *flash.InsertEvent:
fmt.Printf("insert - new: %+v\n", typedEvent.New)
case *flash.DeleteEvent:
fmt.Printf("delete - old: %+v \n", typedEvent.Old)
}
})
if err != nil {
fmt.Println(err)
}
defer stop()

flashClient, _ := flash.NewClient(&flash.ClientConfig{
DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb",
Driver: trigger.NewDriver(&trigger.DriverConfig{}),
})
flashClient.Attach(postsListener)
go flashClient.Start() // Error Handling
defer flashClient.Close()

select {} // Keep process running
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@ package main

import (
"fmt"
"github.com/quix-labs/flash/pkg/client"
"github.com/quix-labs/flash/pkg/listeners"
"github.com/quix-labs/flash/pkg/types"
"github.com/quix-labs/flash"
"github.com/quix-labs/flash/drivers/trigger"
)

func main() {
postsListener, _ := listeners.NewListener(&types.ListenerConfig{
postsListener, _ := flash.NewListener(&flash.ListenerConfig{
Table: "public.posts",
Fields: []string{"id", "slug"},
})
postsListener.On(types.OperationAll, func(event types.Event) {
postsListener.On(flash.OperationAll, func(event flash.Event) {
switch typedEvent := event.(type) {
case *types.InsertEvent:
case *flash.InsertEvent:
fmt.Printf("insert - new: %+v\n", typedEvent.New)
case *types.UpdateEvent:
case *flash.UpdateEvent:
fmt.Printf("update - old: %+v - new: %+v\n", typedEvent.Old, typedEvent.New)
case *types.DeleteEvent:
case *flash.DeleteEvent:
fmt.Printf("delete - old: %+v \n", typedEvent.Old)
case *types.TruncateEvent:
case *flash.TruncateEvent:
fmt.Printf("truncate \n")
}
})

// Create client
clientConfig := &types.ClientConfig{DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb"}
flashClient, _ := client.NewClient(clientConfig)
flashClient, _ := flash.NewClient(&flash.ClientConfig{
DatabaseCnx: "postgresql://devuser:devpass@localhost:5432/devdb",
Driver: trigger.NewDriver(&trigger.DriverConfig{}),
})
flashClient.Attach(postsListener)

go func() {
Expand Down
Loading

0 comments on commit 983c8b1

Please sign in to comment.