Skip to content

Commit

Permalink
Merge branch '0.2.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dimitri Capitaine committed Mar 30, 2023
2 parents b2d4199 + 3469877 commit 6a8fd8b
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 15 deletions.
2 changes: 1 addition & 1 deletion core/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Multiplexer struct {
outs []chan events.LookatchEvent
}

//NewMultiplexer create a new multiplexer
// NewMultiplexer create a new multiplexer
func NewMultiplexer(in chan events.LookatchEvent, outs []chan events.LookatchEvent) (multiplexer *Multiplexer) {
multiplexer = &Multiplexer{
in: in,
Expand Down
1 change: 0 additions & 1 deletion lookatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ it makes it possible for you to process your data in the same way,
no matter the database backend they come from.
You can then feed any application you may need so that they can react almost
in real time to the changes in your configured source data.
*/
package main

Expand Down
2 changes: 2 additions & 0 deletions package/deb/input-amd64
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
build/lookatch-agent=/usr/bin/
package/deb/config.json=/etc/lookatch/config.json
3 changes: 3 additions & 0 deletions package/rpm/input-amd64
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/lookatch-agent=/usr/bin/
package/rpm/config.json=/etc/lookatch/config.json
package/rpm/lookatch-agent.service=/etc/systemd/system/lookatch-agent.service
2 changes: 1 addition & 1 deletion sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type (
}
)

//sinkCreator sink Creator func
// sinkCreator sink Creator func
type sinkCreator func(*Sink) (SinkI, error)

// Factory sink Factory
Expand Down
11 changes: 6 additions & 5 deletions sources/dbsql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ func (d *DBSQLQuery) Query(database string, query string) (err error) {
}
//parse query
info.Schema, info.Table = d.ExtractDatabaseTable(query)
if info.Schema == "" {
info.Schema = "public"
}

log.WithField("query", query).Debug("Start querying")
// check that the collector is still connected to the database
Expand All @@ -200,14 +197,14 @@ func (d *DBSQLQuery) Query(database string, query string) (err error) {

// retrieve the resultset associated with the query to execute
rows, err := d.db.Query(query)
defer rows.Close()
if err != nil {
log.WithFields(log.Fields{
"query": query,
"error": err,
}).Error("Error when query mysql")
return
}
defer rows.Close()

// Fill in the metadata for each column found in the schema

Expand Down Expand Up @@ -293,7 +290,11 @@ func (d *DBSQLQuery) ProcessLines(columns []string, lines [][]interface{}, info
for i, col := range columns {
switch vr := (*values[i].(*interface{})).(type) {
case []uint8:
colmap[col], err = strconv.ParseFloat(string(vr), 64)
if utils.IsNumDot(string(vr)) {
colmap[col], err = strconv.ParseFloat(string(vr), 64)
} else {
colmap[col], err = strconv.ParseInt(string(vr), 10, 64)
}
if err != nil {
colmap[col] = string(vr)
}
Expand Down
6 changes: 3 additions & 3 deletions sources/mysql_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (m *MysqlCDC) parseEvent(e *canal.RowsEvent) error {
if m.config.ColumnsMetaValue {
columnMeta[column.Name] = events.ColumnsMeta{
Type: column.RawType,
Position: index,
Position: index + 1,
}
}
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (m *MysqlCDC) parseUpdate(e *canal.RowsEvent) error {
if m.config.ColumnsMetaValue {
columnMeta[column.Name] = events.ColumnsMeta{
Type: column.RawType,
Position: index,
Position: index + 1,
}
}
if m.config.OldValue {
Expand Down Expand Up @@ -430,7 +430,7 @@ func (m *MysqlCDC) sendEvent(ts uint32, action string, table *schema.Table, even
Tenant: m.AgentInfo.Tenant,
},
Payload: events.SQLEvent{
Timestamp: fmt.Sprint(ts),
Timestamp: fmt.Sprintf("%d%s", ts, "000000000"),
Environment: m.AgentInfo.Tenant.Env,
Database: table.Schema,
Table: table.Name,
Expand Down
40 changes: 40 additions & 0 deletions sources/pg_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,46 @@ func (p *PostgreSQLCDC) UpdateCommittedLsn() {
}
}

func (p *PostgreSQLCDC) GetOffsetTimeline(tli int32, lsn pglogrepl.LSN) (int32, error) {
mrr := p.conn.Exec(p.ctx, fmt.Sprintf("TIMELINE_HISTORY %d", tli))
results, err := mrr.ReadAll()
if err != nil {
return 0, err
}
if len(results) != 1 {
return 0, fmt.Errorf("expected 1 result set, got %d", len(results))
}

result := results[0]
if len(result.Rows) != 1 {
return 0, fmt.Errorf("expected 1 result set, got %d", len(result.Rows))
}

row := result.Rows[0]
if len(row) != 2 {
return 0, fmt.Errorf("expected 1 result set, got %d", len(row))
}

//split line
timelineList := strings.Split(string(row[1]), "\n\n")
for _, timeline := range timelineList {
current := strings.Split(timeline, "\t")
tLsn, err := pglogrepl.ParseLSN(current[1])
if err != nil {
return 0, fmt.Errorf("expected LSN string, got %s", current[1])
}
if lsn <= tLsn {
currentTli, err := strconv.Atoi(current[0])
if err != nil {
return 0, fmt.Errorf("expected timeline, got %s", current[1])
}
return int32(currentTli), nil
}
}

return tli, nil
}

func NewOffsetCommittedState() *OffsetCommittedState {
return &OffsetCommittedState{
RWMutex: sync.RWMutex{},
Expand Down
4 changes: 2 additions & 2 deletions sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ func (s *Source) GetName() string {
return s.Name
}

//GetOutputChan get output channel
// GetOutputChan get output channel
func (s *Source) GetOutputChan() chan events.LookatchEvent {
return s.OutputChannel
}

//GetCommitChan return commit channel attach to source
// GetCommitChan return commit channel attach to source
func (s *Source) GetCommitChan() chan interface{} {
return s.CommitChannel
}
Expand Down
2 changes: 1 addition & 1 deletion sources/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *Syslog) GetOutputChan() chan events.LookatchEvent {
return s.OutputChannel
}

//GetCommitChan return commit channel attach to source
// GetCommitChan return commit channel attach to source
func (s *Syslog) GetCommitChan() chan interface{} {
return s.CommitChannel
}
Expand Down
2 changes: 1 addition & 1 deletion utils/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"reflect"
)

// Available Task constants
// Available Task constants
const (
AgentStart = "StartAgent"
AgentStop = "StopAgent"
Expand Down
11 changes: 11 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,14 @@ func InSlice(slice []string, val string) bool {
}
return false
}

func IsNumDot(s string) bool {
for _, v := range s {
if v == '.' {
return true
} else if v < '0' || v > '9' {
return false
}
}
return false
}
14 changes: 14 additions & 0 deletions utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,17 @@ func TestInSlice2(t *testing.T) {
}

}

func TestNumDot(t *testing.T) {
StringMap := map[string]bool{
"254": false,
"32.2": true,
"for": false,
}
for k, v := range StringMap {
if v != IsNumDot(k) {
t.Fail()
}
}

}

0 comments on commit 6a8fd8b

Please sign in to comment.