-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpacket_assembly.go
105 lines (94 loc) · 3.53 KB
/
packet_assembly.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
"github.com/julsemaan/garin/base"
GarinUtil "github.com/julsemaan/garin/util"
"time"
)
// simpleStreamFactory implements tcpassembly.StreamFactory
type sniffStreamFactory struct{}
// sniffStream will handle the actual decoding of sniff requests.
type sniffStream struct {
net, transport gopacket.Flow
bytesLen, packets, outOfOrder, skipped int64
start, end time.Time
sawStart, sawEnd bool
bytes []byte
}
// New creates a new stream. It's called whenever the assembler sees a stream
// it isn't currently following.
func (factory *sniffStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
// log.Printf("new stream %v:%v started", net, transport)
s := &sniffStream{
net: net,
transport: transport,
start: time.Now(),
}
s.end = s.start
// ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
return s
}
// Reassembled is called whenever new packet data is available for reading.
// Reassembly objects contain stream data IN ORDER.
func (s *sniffStream) Reassembled(reassemblies []tcpassembly.Reassembly) {
for _, reassembly := range reassemblies {
if reassembly.Seen.Before(s.end) {
s.outOfOrder++
} else {
s.end = reassembly.Seen
}
s.bytesLen += int64(len(reassembly.Bytes))
s.packets += 1
if reassembly.Skip > 0 {
s.skipped += int64(reassembly.Skip)
}
s.bytes = append(s.bytes, reassembly.Bytes...)
s.sawStart = s.sawStart || reassembly.Start
s.sawEnd = s.sawEnd || reassembly.End
}
}
// ReassemblyComplete is called when the TCP assembler believes a stream has
// finished.
func (s *sniffStream) ReassemblyComplete() {
//diffSecs := float64(s.end.Sub(s.start)) / float64(time.Second)
// log.Printf("Reassembly of stream %v:%v complete - start:%v end:%v bytes:%v packets:%v ooo:%v bps:%v pps:%v skipped:%v",
//s.net, s.transport, s.start, s.end, s.bytesLen, s.packets, s.outOfOrder,
//float64(s.bytesLen)/diffSecs, float64(s.packets)/diffSecs, s.skipped)
go func() {
wg.Add(1)
parsingConcurrencyChan <- 1
defer func() {
if r := recover(); r != nil {
err, ok := r.(error)
if ok && err.Error() == "runtime error: index out of range" {
Logger().Debug("Error decoding packet due to its unknown format. This is likely normal.", err.Error())
} else {
Logger().Error("Error decoding packet.", r)
}
}
<-parsingConcurrencyChan
wg.Done()
}()
var destination *base.Destination
if params.UnencryptedPorts[s.transport.Src().String()] || params.UnencryptedPorts[s.transport.Dst().String()] {
http_packet := &GarinUtil.Packet{Hosts: s.net, Ports: s.transport, Payload: s.bytes}
destination = ParseHTTP(http_packet)
if destination != nil {
destination.Protocol = "HTTP"
}
}
if params.EncryptedPorts[s.transport.Src().String()] || params.EncryptedPorts[s.transport.Dst().String()] {
https_packet := &GarinUtil.Packet{Hosts: s.net, Ports: s.transport, Payload: s.bytes}
destination = ParseHTTPS(https_packet)
if destination != nil {
destination.Protocol = "TLS/SSL"
}
}
if destination != nil {
destination.Timestamp = s.start
recordingQueue.push(destination)
Logger().Infof("Destination detected protocol='%s' source_ip='%s' destination_ip='%s' host='%s' packet_timestamp='%s'", destination.Protocol, destination.SourceIp, destination.DestinationIp, destination.ServerName, destination.Timestamp)
}
}()
}