Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for genetlink #480

Merged
merged 20 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/buildtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
${{ runner.os }}-bazel-
- name: Install pcap
run: |
sudo apt-get install libpcap-dev
sudo apt-get install libpcap-dev libnl-genl-3-dev libnl-3-dev
- name: Build Lemming
run: bazel build //...
- name: Save Bazel Cache
Expand All @@ -63,7 +63,7 @@ jobs:
${{ runner.os }}-bazel-
- name: Install pcap
run: |
sudo apt-get install libpcap-dev
sudo apt-get install libpcap-dev libnl-genl-3-dev libnl-3-dev
- name: Test
run: make coverage
- name: Coveralls
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install pcap
run: |
sudo apt-get install libpcap-dev
sudo apt-get install libpcap-dev libnl-genl-3-dev libnl-3-dev
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
4 changes: 2 additions & 2 deletions cloudbuild/buildtest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ steps:
script: |
curl -Lo bazel https://github.com/bazelbuild/bazelisk/releases/download/v1.16.0/bazelisk-linux-amd64 && \
install bazel /usr/local/bin/
apt-get update && apt-get -y install libpcap-dev
apt-get update && apt-get -y install libpcap-dev libnl-genl-3-dev libnl-3-dev
bazel build --remote_cache=https://storage.googleapis.com/lemming-bazel-cache --google_default_credentials //...
- id: test
name: gcr.io/cloud-builders/bazel
script: |
apt-get update && apt-get -y install libpcap-dev
apt-get update && apt-get -y install libpcap-dev libnl-genl-3-dev libnl-3-dev
bazel test --test_output=errors --combined_report=lcov --remote_cache=https://storage.googleapis.com/lemming-bazel-cache --google_default_credentials \
$(bazel query 'attr(size, small, tests("//...")) + attr(size, medium, tests("//..."))')
timeout: 3600s
Expand Down
2 changes: 1 addition & 1 deletion cloudbuild/lemming-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ gopath=$(go env GOPATH)
export PATH=${PATH}:$gopath/bin
curl -Lo bazel https://github.com/bazelbuild/bazelisk/releases/download/v1.16.0/bazelisk-linux-amd64 && \
sudo install bazel /usr/local/bin/
sudo apt-get -y install libpcap-dev
sudo apt-get -y install libpcap-dev libnl-genl-3-dev libnl-3-dev

cd /tmp/workspace
kne deploy ~/kne-internal/deploy/kne/kind-bridge.yaml
Expand Down
2 changes: 1 addition & 1 deletion cloudbuild/operator-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffo
sudo install skaffold /usr/local/bin/
curl -Lo bazel https://github.com/bazelbuild/bazelisk/releases/download/v1.16.0/bazelisk-linux-amd64 && \
sudo install bazel /usr/local/bin/
sudo apt-get -y install libpcap-dev
sudo apt-get -y install libpcap-dev libnl-genl-3-dev libnl-3-dev

cd /tmp/workspace
kne deploy ~/kne-internal/deploy/kne/kind-bridge.yaml
Expand Down
2 changes: 1 addition & 1 deletion cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffo
sudo install skaffold /usr/local/bin/
curl -Lo bazel https://github.com/bazelbuild/bazelisk/releases/download/v1.16.0/bazelisk-linux-amd64 && \
sudo install bazel /usr/local/bin/
sudo apt-get -y install libpcap-dev
sudo apt-get -y install libpcap-dev libnl-genl-3-dev libnl-3-dev

cd /tmp/workspace
kne deploy ~/kne-internal/deploy/kne/kind-bridge.yaml
Expand Down
1 change: 1 addition & 0 deletions dataplane/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//dataplane/dplaneopts",
"//dataplane/kernel/tap",
"//dataplane/proto/packetio",
"//dataplane/proto/sai",
"//dataplane/protocol",
Expand Down
20 changes: 0 additions & 20 deletions dataplane/forwarding/fwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,6 @@ func (e *Server) UpdateNotification(contextID *fwdpb.ContextId, notification fwd
return nil
}

// UpdatePacketSink updates the packet sink service for a context. If the
// service is set to nil, no packets are delivered externally for the context.
// The address is the address of the packet service (used in queries)
// in the host:port format.
func (e *Server) UpdatePacketSink(contextID *fwdpb.ContextId, packet fwdcontext.PacketCallback) error {
if contextID == nil {
return errors.New("fwd: UpdatePacketSink failed, No context ID")
}

ctx, err := e.FindContext(contextID)
if err != nil {
return fmt.Errorf("fwd: UpdatePacketSink failed, err %v", err)
}

ctx.Lock()
defer ctx.Unlock()
ctx.SetPacketSink(packet)
return nil
}

// ContextCreate creates a new context. Note that if the packet sink and/or
// notification services are specified but not reachable, the context creation
// fails.
Expand Down
2 changes: 1 addition & 1 deletion dataplane/forwarding/fwdport/ports/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ go_test(
"//dataplane/forwarding/protocol/ethernet",
"//dataplane/forwarding/protocol/metadata",
"//dataplane/forwarding/protocol/opaque",
"//dataplane/proto/packetio",
"//proto/forwarding",
"@com_github_go_logr_logr//testr",
"@com_github_google_go_cmp//cmp",
"@com_github_google_gopacket//:gopacket",
"@com_github_google_gopacket//layers",
"@com_github_google_gopacket//pcapgo",
"@com_github_openconfig_gnmi//errdiff",
"@org_golang_google_protobuf//proto",
"@org_uber_go_mock//gomock",
],
)
93 changes: 2 additions & 91 deletions dataplane/forwarding/fwdport/ports/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,31 +98,9 @@ func (p *CPUPort) Update(upd *fwdpb.PortUpdateDesc) error {

// Write applies output actions and writes a packet to the cable.
func (p *CPUPort) Write(packet fwdpacket.Packet) (fwdaction.State, error) {
if p.remote {
if err := p.queue.Write(packet); err != nil {
return fwdaction.DROP, err
}
return fwdaction.CONSUME, nil
}

// After the CPU packet is processed, the output port may change. Rerun the output actions.
outPort, err := fwdport.OutputPort(packet, p.ctx)
if err != nil {
if err := p.queue.Write(packet); err != nil {
return fwdaction.DROP, err
}

// TODO: The types of ports are sent over gRPC should be probably be configurable.
if outPort.Type() == fwdpb.PortType_PORT_TYPE_GENETLINK || outPort.Type() == fwdpb.PortType_PORT_TYPE_CPU_PORT {
if err := p.queue.Write(packet); err != nil {
return fwdaction.DROP, err
}
return fwdaction.CONSUME, nil
}

if err := fwdport.Output(outPort, packet, fwdpb.PortAction_PORT_ACTION_OUTPUT, p.ctx); err != nil {
return fwdaction.DROP, err
}

return fwdaction.CONSUME, nil
}

Expand All @@ -131,97 +109,30 @@ func (p *CPUPort) Write(packet fwdpacket.Packet) (fwdaction.State, error) {
// relock the context. We also do not want to hold the lock when performing
// the gRPC request.
func (p *CPUPort) punt(v any) {
if p.remote {
p.puntRemotePort(v)
return
}
packet, ok := v.(fwdpacket.Packet)
if !ok {
fwdport.Increment(p, 1, fwdpb.CounterId_COUNTER_ID_TX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_TX_ERROR_OCTETS)
return
}

p.ctx.RLock()
var ingressPID *fwdpb.PortId
if port, err := fwdport.InputPort(packet, p.ctx); err == nil {
ingressPID = fwdport.GetID(port)
}
egressPID := fwdport.GetID(p)
if port, err := fwdport.OutputPort(packet, p.ctx); err == nil {
egressPID = fwdport.GetID(port)
}
var parsed []*fwdpb.PacketFieldBytes
for _, f := range p.export {
value, err := packet.Field(fwdpacket.NewFieldID(f))
if err != nil {
continue
}
parsed = append(parsed, &fwdpb.PacketFieldBytes{
FieldId: f,
Bytes: value,
})
}
response := &fwdpb.PacketSinkResponse{
Resp: &fwdpb.PacketSinkResponse_Packet{
Packet: &fwdpb.PacketSinkPacketInfo{
PortId: fwdport.GetID(p),
Egress: egressPID,
Ingress: ingressPID,
Bytes: packet.Frame(),
ParsedFields: parsed,
},
},
}

ps := p.ctx.PacketSink()
p.ctx.RUnlock()
if ps != nil {
timer := deadlock.NewTimer(deadlock.Timeout, fmt.Sprintf("Punting packet from port %v", p))
err := ps(response)
timer.Stop()
if err == nil {
return
}
log.Errorf("ports: Unable to punt packet, request %+v, err %v.", response, err)
}
fwdport.Increment(p, packet.Length(), fwdpb.CounterId_COUNTER_ID_TX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_TX_ERROR_OCTETS)
}

func (p *CPUPort) puntRemotePort(v any) {
packet, ok := v.(fwdpacket.Packet)
if !ok {
fwdport.Increment(p, 1, fwdpb.CounterId_COUNTER_ID_TX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_TX_ERROR_OCTETS)
return
}

p.ctx.RLock()
var ingressPID *fwdpb.PortId
if port, err := fwdport.InputPort(packet, p.ctx); err == nil {
ingressPID = fwdport.GetID(port)
}
egressPID := fwdport.GetID(p)
if port, err := fwdport.OutputPort(packet, p.ctx); err == nil {
egressPID = fwdport.GetID(port)
}
hostPort, err := packet.Field(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID, 0))
if err != nil {
fwdport.Increment(p, packet.Length(), fwdpb.CounterId_COUNTER_ID_TX_ERROR_PACKETS, fwdpb.CounterId_COUNTER_ID_TX_ERROR_OCTETS)
return
}

ingressID, err := strconv.Atoi(ingressPID.GetObjectId().GetId())
if err != nil {
return
}
egressID, err := strconv.Atoi(egressPID.GetObjectId().GetId())
if err != nil {
return
}

response := &pktiopb.PacketOut{
Packet: &pktiopb.Packet{
InputPort: uint64(ingressID),
OutputPort: uint64(egressID),
OutputPort: uint64(0), // TODO: If the packet was punted after the FIB, this not be output port.
HostPort: binary.BigEndian.Uint64(hostPort),
Frame: packet.Frame(),
},
Expand Down
50 changes: 17 additions & 33 deletions dataplane/forwarding/fwdport/ports/cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package ports

import (
"encoding/binary"
"testing"

"google.golang.org/protobuf/proto"
"github.com/google/go-cmp/cmp"

"github.com/openconfig/lemming/dataplane/forwarding/fwdport"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdcontext"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdobject"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdpacket"
pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
fwdpb "github.com/openconfig/lemming/proto/forwarding"

_ "github.com/openconfig/lemming/dataplane/forwarding/protocol/arp"
Expand All @@ -34,46 +36,33 @@ import (
// A RecordPacketSink records the last injected packet and generates a
// notification on its channel.
type RecordPacketSink struct {
notify chan bool
lastResponse *fwdpb.PacketSinkResponse
notify chan bool
resp *pktiopb.PacketOut
}

// PacketSink records the inject packet request and generates a notification.
func (p *RecordPacketSink) PacketSink(resp *fwdpb.PacketSinkResponse) error {
p.lastResponse = resp
func (p *RecordPacketSink) Send(resp *pktiopb.PacketOut) error {
p.resp = resp
p.notify <- true
return nil
}

// Tests the writes to the CPU port.
func TestCpuWrite(t *testing.T) {
name := "TestPort"
name := "1"
ctx := fwdcontext.New("test", "fwd")
ps := &RecordPacketSink{
notify: make(chan bool),
}
ctx.SetPacketSink(ps.PacketSink)
ctx.SetCPUPortSink(ps.Send, func() {})

// Create a CPU port that exports the ETHER_TYPE and IP_ADDR_DST
desc := &fwdpb.PortDesc{
PortType: fwdpb.PortType_PORT_TYPE_CPU_PORT,
PortId: fwdport.MakeID(fwdobject.NewID(name)),
}
ethertype := &fwdpb.PacketFieldId{
Field: &fwdpb.PacketField{
FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_ETHER_TYPE,
Instance: 0,
},
}
ipaddress := &fwdpb.PacketFieldId{
Field: &fwdpb.PacketField{
FieldNum: fwdpb.PacketFieldNum_PACKET_FIELD_NUM_IP_ADDR_DST,
Instance: 0,
},
}

cpu := &fwdpb.CPUPortDesc{
QueueId: name,
ExportFieldIds: []*fwdpb.PacketFieldId{ethertype, ipaddress},
QueueId: name,
}
desc.Port = &fwdpb.PortDesc_Cpu{
Cpu: cpu,
Expand All @@ -95,6 +84,8 @@ func TestCpuWrite(t *testing.T) {
if err != nil {
t.Fatalf("Unable to create ARP packet, err %v.", err)
}
packet.Update(fwdpacket.NewFieldIDFromNum(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID, 1), fwdpacket.OpSet, binary.BigEndian.AppendUint64(nil, 1))
fwdport.SetInputPort(packet, port)
fwdport.SetOutputPort(packet, port)

// Write the packet out of the cpu port and wait for it to be received
Expand All @@ -103,19 +94,12 @@ func TestCpuWrite(t *testing.T) {
t.Fatalf("Write failed, err %v.", err)
}
<-ps.notify
t.Logf("Got request %+v", ps.lastResponse)
t.Logf("Got request %+v", ps.resp)

// Verify that the packet was received and the parsed fields only have
// the ETHER_TYPE set to ARP.
list := ps.lastResponse.GetPacket().GetParsedFields()
if len(list) != 1 {
t.Fatalf("Write failed to get parsed fields, got %v, want 1.", len(list))
}
want := &fwdpb.PacketFieldBytes{
FieldId: ethertype,
Bytes: []byte{0x08, 0x06},
}
if !proto.Equal(list[0], want) {
t.Fatalf("Write failed to get parsed fields, Got %v, want %v", list[0], want)
got := ps.resp.GetPacket()
if d := cmp.Diff(got.GetFrame(), arp); d != "" {
t.Fatalf("Write failed to get parsed fields, diff: %s", d)
}
}
2 changes: 1 addition & 1 deletion dataplane/forwarding/fwdport/ports/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func TestPortGroupHash(t *testing.T) {
defer ctrl.Finish()

names := []string{"p1", "p2", "p3", "p4"}
ctx := fwdcontext.New("test", "fwd")

hashFn := []fwdpb.AggregateHashAlgorithm{
fwdpb.AggregateHashAlgorithm_AGGREGATE_HASH_ALGORITHM_CRC16,
fwdpb.AggregateHashAlgorithm_AGGREGATE_HASH_ALGORITHM_CRC32,
}

for id, fn := range hashFn {
ctx := fwdcontext.New("test", "fwd")
var ports []fwdport.Port
for _, name := range names {
ports = append(ports, porttestutil.CreateTestPort(t, ctx, fmt.Sprintf("%v-%v", name, id)))
Expand Down
Loading
Loading