Skip to content

Commit

Permalink
dpdk: add packet distribution analysis for ethdev
Browse files Browse the repository at this point in the history
Introduce packet distribution analysis for PMD threads based on ethdev library.
This analysis computes the distribution of packets retrieved in a single
rte_eth_rx_burst() call, on a per-queue basis.

Signed-off-by: Adel Belkhiri <[email protected]>
  • Loading branch information
adel-belkhiri committed Oct 15, 2024
1 parent 759d826 commit 1a11873
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ Require-Bundle: org.eclipse.ui,
org.eclipse.tracecompass.tmf.ctf.core,
org.eclipse.tracecompass.analysis.os.linux.core,
org.eclipse.jdt.annotation;bundle-version="2.2.400",
com.google.guava
com.google.guava,
org.eclipse.tracecompass.analysis.lami.core
Export-Package: org.eclipse.tracecompass.incubator.dpdk.core.trace,
org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.spin.analysis,
org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.throughput.analysis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
</tracetype>
</module>
</extension>
<extension
point="org.eclipse.tracecompass.tmf.core.analysis.ondemand">
<analysis
class="org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis.DpdkPollDistributionAnalysis"
id="org.eclipse.tracecompass.incubator.dpdk.core.ethdev.poll.distribution">
</analysis>
</extension>
<extension
point="org.eclipse.tracecompass.tmf.core.dataprovider">
<dataProviderFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*******************************************************************************
* Copyright (c) 2024 École Polytechnique de Montréal
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License 2.0 which
* accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/
package org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis;

import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.SubMonitor;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.tracecompass.incubator.dpdk.core.trace.DpdkTrace;
import org.eclipse.tracecompass.incubator.internal.dpdk.core.analysis.DpdkEthdevEventLayout;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.aspect.LamiGenericAspect;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.aspect.LamiTableEntryAspect;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiAnalysis;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiResultTable;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiTableClass;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.module.LamiTableEntry;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiData;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiLongNumber;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiTimeRange;
import org.eclipse.tracecompass.internal.provisional.analysis.lami.core.types.LamiTimestamp;
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
import org.eclipse.tracecompass.tmf.core.event.aspect.TmfContentFieldAspect;
import org.eclipse.tracecompass.tmf.core.filter.model.TmfFilterMatchesNode;
import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
import org.eclipse.tracecompass.tmf.core.request.TmfEventRequest;
import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;

/**
* Dpdk polls distribution analysis is an on-demand analysis that calculates the
* number of packets retrieved in a single call to rte_eth_rx_burst(). The poll
* distribution is calculated per port queue only.
*
* @author Adel Belkhiri
*
*/
public class DpdkPollDistributionAnalysis extends LamiAnalysis {

private static final long PROGRESS_INTERVAL = (1 << 10) - 1L;
private static final int MEMORY_SANITY_LIMIT = 40000;

/**
* Constructor
*/
public DpdkPollDistributionAnalysis() {
super(Messages.getMessage(Messages.EthdevPollDistribution_AnalysisName), false, trace -> true, Collections.emptyList());
}

@Override
protected synchronized void initialize() {
// do nothing
}

@Override
public boolean canExecute(ITmfTrace trace) {
if (trace instanceof DpdkTrace) {
return ((DpdkTrace) trace).validate(null, trace.getPath()).isOK() ? true : false;
}
return false;
}

private static int workRemaining(ITmfTrace trace) {
return (int) Math.min(trace.getNbEvents() / (PROGRESS_INTERVAL + 1), Integer.MAX_VALUE);
}

@Override
public List<LamiResultTable> execute(ITmfTrace trace, @Nullable TmfTimeRange timeRange, String extraParamsString, IProgressMonitor monitor) throws CoreException {
AtomicLong done = new AtomicLong();
Map<String, Map<Integer, Long>> pollCountPerQueue = new TreeMap<>();
TmfTimeRange adjustedTimeRange = timeRange == null ? TmfTimeRange.ETERNITY : timeRange;
SubMonitor subMonitor = SubMonitor.convert(monitor, Messages.getMessage(Messages.EthdevPollDistribution_AnalysisName), workRemaining(trace));

/*
* Handle the filter in case the user indicates a specific port to
* process its events
*/
TmfFilterMatchesNode filter = new TmfFilterMatchesNode(null);
filter.setEventAspect(new TmfContentFieldAspect(Messages.getMessage(Messages.EthdevPollDistribution_CountLabel), DpdkEthdevEventLayout.fieldPortId()));
filter.setRegex(extraParamsString);
Predicate<ITmfEvent> filterPred = (event -> extraParamsString.isEmpty() || filter.matches(event));

// create and send the event request
TmfEventRequest eventRequest = createEventRequest(trace, adjustedTimeRange, filterPred,
pollCountPerQueue, subMonitor, done);
trace.sendRequest(eventRequest);

try {
eventRequest.waitForCompletion();
return convertToLamiTables(adjustedTimeRange, pollCountPerQueue);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}

private static TmfEventRequest createEventRequest(ITmfTrace trace, TmfTimeRange timeRange, Predicate<ITmfEvent> filterPredicate, Map<String, Map<Integer, Long>> pollAspectCounts, SubMonitor monitor, AtomicLong nbProcessevents) {
return new TmfEventRequest(ITmfEvent.class, timeRange, 0, Integer.MAX_VALUE, ExecutionType.BACKGROUND) {
@Override
public void handleData(ITmfEvent event) {
if (monitor.isCanceled()) {
cancel();
return;
}

// process events to compute RX polls distribution
processEvent(event, filterPredicate, pollAspectCounts);

if ((nbProcessevents.incrementAndGet() & PROGRESS_INTERVAL) == 0) {
monitor.setWorkRemaining(workRemaining(trace));
monitor.worked(1);
monitor.setTaskName(String.format("DPDK Polls Distribution Analysis (%s events processed)", //$NON-NLS-1$
NumberFormat.getInstance().format(nbProcessevents.get())));
}
}
};
}

private static void processEvent(ITmfEvent event, Predicate<ITmfEvent> filterPredicate,
Map<String, Map<Integer, Long>> pollAspectCounts) {

if (event.getName().equals(DpdkEthdevEventLayout.eventEthdevRxBurstNonEmpty())
&& filterPredicate.test(event)) {
Integer nbRxPkts = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldNbRxPkts());
Integer portId = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldPortId());
Integer queueId = event.getContent().getFieldValue(Integer.class, DpdkEthdevEventLayout.fieldQueueId());

if (nbRxPkts != null && portId != null && queueId != null) {
String queueName = "P" + portId + "/Q" + queueId; //$NON-NLS-1$ //$NON-NLS-2$
Map<Integer, Long> dataSet = pollAspectCounts.computeIfAbsent(queueName, k -> new HashMap<>());
if (dataSet.size() < MEMORY_SANITY_LIMIT) {
dataSet.merge(nbRxPkts, 1L, Long::sum);
}
}
}
}

private @NonNull List<LamiResultTable> convertToLamiTables(TmfTimeRange timeRange,
Map<String, Map<Integer, Long>> pollCountPerQueue) {
List<LamiResultTable> results = new ArrayList<>();
for (Map.Entry<String, Map<Integer, Long>> entry : pollCountPerQueue.entrySet()) {
String queueName = entry.getKey();
Map<Integer, Long> dataSet = entry.getValue();

List<LamiTableEntry> tableEntries = dataSet.entrySet().stream()
.map(e -> new LamiTableEntry(Arrays.asList(
new LamiString(e.getKey().toString()),
new LamiLongNumber(e.getValue()))))
.collect(Collectors.toList());

List<@NonNull LamiTableEntryAspect> tableAspects = Arrays.asList(
new LamiCategoryAspect(Messages.EthdevPollDistribution_NumberOfPacketsLabel, 0),
new LamiCountAspect(Messages.EthdevPollDistribution_CountLabel, 1));

LamiTableClass tableClass = new LamiTableClass(queueName, queueName, tableAspects, Collections.emptySet());
results.add(new LamiResultTable(createTimeRange(timeRange), tableClass, tableEntries));
}
return results;
}

/**
* Count aspect, generic
*
*/
private final class LamiCountAspect extends LamiGenericAspect {
private LamiCountAspect(String name, int column) {
super(name, null, column, true, false);
}
}

/**
* Category aspect, generic
*
*/
private final class LamiCategoryAspect extends LamiGenericAspect {
private LamiCategoryAspect(String name, int column) {
super(name, null, column, false, false);
}
}

/**
* TODO: move to LAMI
*/
private static LamiTimeRange createTimeRange(TmfTimeRange timeRange) {
return new LamiTimeRange(new LamiTimestamp(timeRange.getStartTime().toNanos()), new LamiTimestamp(timeRange.getEndTime().toNanos()));
}

/**
* TODO: LamiString in LAMI is private
*/
private final class LamiString extends LamiData {
private final String fElement;

private LamiString(String element) {
fElement = element;
}

@Override
public @NonNull String toString() {
return fElement;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*******************************************************************************
* Copyright (c) 2024 École Polytechnique de Montréal
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License 2.0 which
* accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.osgi.util.NLS;

/**
* Messages for the {@link DpdkPollDistributionAnalysis} on-demand analysis
*
* @author Adel Belkhiri
*/
@SuppressWarnings("javadoc")
public class Messages extends NLS {
private static final String BUNDLE_NAME = "org.eclipse.tracecompass.incubator.internal.dpdk.core.ethdev.poll.distribution.analysis.messages"; //$NON-NLS-1$

public static @Nullable String EthdevPollDistribution_AnalysisName;
public static @Nullable String EthdevPollDistribution_NumberOfPacketsLabel;
public static @Nullable String EthdevPollDistribution_CountLabel;

static @NonNull String getMessage(@Nullable String msg) {
if (msg == null) {
return ""; //$NON-NLS-1$
}
return msg;
}

static {
// initialize resource bundle
NLS.initializeMessages(BUNDLE_NAME, Messages.class);
}

private Messages() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
###############################################################################
# Copyright (c) 2024 École Polytechnique de Montréal
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License 2.0
# which accompanies this distribution, and is available at
# https://www.eclipse.org/legal/epl-2.0
#
# SPDX-License-Identifier: EPL-2.0
###############################################################################

EthdevPollDistribution_AnalysisName=DPDK Polls Distribution (ethdev)
EthdevPollDistribution_NumberOfPacketsLabel=Number of retrieved packets
EthdevPollDistribution_CountLabel=Count

0 comments on commit 1a11873

Please sign in to comment.