This repository has been archived by the owner on Jun 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter.go
157 lines (131 loc) · 3.58 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package main
import (
"fmt"
"log"
"sync/atomic"
"github.com/gofrs/uuid/v5"
"golang.org/x/exp/slices"
)
type Subscription struct {
// Client
ClientId string
// Filters
TeamId int
Token string
DistinctId string
EventTypes []string
Geo bool
// Channels
EventChan chan interface{}
ShouldClose *atomic.Bool
}
type ResponsePostHogEvent struct {
Uuid string `json:"uuid"`
Timestamp string `json:"timestamp"`
DistinctId string `json:"distinct_id"`
PersonId string `json:"person_id"`
Event string `json:"event"`
Properties map[string]interface{} `json:"properties"`
}
type ResponseGeoEvent struct {
Lat float64 `json:"lat"`
Lng float64 `json:"lng"`
Count uint `json:"count"`
}
type Filter struct {
inboundChan chan PostHogEvent
subChan chan Subscription
unSubChan chan Subscription
subs []Subscription
}
func NewFilter(subChan chan Subscription, unSubChan chan Subscription, inboundChan chan PostHogEvent) *Filter {
return &Filter{subChan: subChan, unSubChan: unSubChan, inboundChan: inboundChan, subs: make([]Subscription, 0)}
}
func convertToResponseGeoEvent(event PostHogEvent) *ResponseGeoEvent {
return &ResponseGeoEvent{
Lat: event.Lat,
Lng: event.Lng,
Count: 1,
}
}
func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePostHogEvent {
return &ResponsePostHogEvent{
Uuid: event.Uuid,
Timestamp: event.Timestamp,
DistinctId: event.DistinctId,
PersonId: uuidFromDistinctId(teamId, event.DistinctId),
Event: event.Event,
Properties: event.Properties,
}
}
var personUUIDV5Namespace *uuid.UUID
func uuidFromDistinctId(teamId int, distinctId string) string {
if teamId == 0 || distinctId == "" {
return ""
}
if personUUIDV5Namespace == nil {
uuid, _ := uuid.FromString("932979b4-65c3-4424-8467-0b66ec27bc22")
personUUIDV5Namespace = &uuid
}
input := fmt.Sprintf("%d:%s", teamId, distinctId)
return uuid.NewV5(*personUUIDV5Namespace, input).String()
}
func removeSubscription(clientId string, subs []Subscription) []Subscription {
var lighterSubs []Subscription
for i, sub := range subs {
if clientId == sub.ClientId {
lighterSubs = slices.Delete(subs, i, i+1)
}
}
return lighterSubs
}
func (c *Filter) Run() {
for {
select {
case newSub := <-c.subChan:
c.subs = append(c.subs, newSub)
case unSub := <-c.unSubChan:
c.subs = removeSubscription(unSub.ClientId, c.subs)
case event := <-c.inboundChan:
var responseEvent *ResponsePostHogEvent
var responseGeoEvent *ResponseGeoEvent
for _, sub := range c.subs {
if sub.ShouldClose.Load() {
log.Println("User has unsubscribed, but not been removed from the slice of subs")
continue
}
// log.Printf("event.Token: %s, sub.Token: %s", event.Token, sub.Token)
if sub.Token != "" && event.Token != sub.Token {
continue
}
if sub.DistinctId != "" && event.DistinctId != sub.DistinctId {
continue
}
if len(sub.EventTypes) > 0 && !slices.Contains(sub.EventTypes, event.Event) {
continue
}
if sub.Geo {
if event.Lat != 0.0 {
if responseGeoEvent == nil {
responseGeoEvent = convertToResponseGeoEvent(event)
}
select {
case sub.EventChan <- *responseGeoEvent:
default:
// Don't block
}
}
} else {
if responseEvent == nil {
responseEvent = convertToResponsePostHogEvent(event, sub.TeamId)
}
select {
case sub.EventChan <- *responseEvent:
default:
// Don't block
}
}
}
}
}
}