Skip to content

Commit

Permalink
Add RTT statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
zapek committed Jan 3, 2025
1 parent 537f308 commit d246d23
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 by David Gerber - https://zapek.com
* Copyright (c) 2024-2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
Expand All @@ -23,7 +23,9 @@
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.xeres.app.xrs.service.rtt.RttRsService;
import io.xeres.app.xrs.service.turtle.TurtleRsService;
import io.xeres.common.rest.statistics.RttStatisticsResponse;
import io.xeres.common.rest.statistics.TurtleStatisticsResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -39,10 +41,12 @@
public class StatisticsController
{
private final TurtleRsService turtleRsService;
private final RttRsService rttRsService;

public StatisticsController(TurtleRsService turtleRsService)
public StatisticsController(TurtleRsService turtleRsService, RttRsService rttRsService)
{
this.turtleRsService = turtleRsService;
this.rttRsService = rttRsService;
}

@GetMapping("/turtle")
Expand All @@ -52,4 +56,12 @@ public TurtleStatisticsResponse getTurtleStatistics()
{
return toDTO(turtleRsService.getStatistics());
}

@GetMapping("/rtt")
@Operation(summary = "Get RTT statistics")
@ApiResponse(responseCode = "200", description = "Request successful")
public RttStatisticsResponse getRttStatistics()
{
return rttRsService.getStatistics();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 by David Gerber - https://zapek.com
* Copyright (c) 2019-2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
Expand Down Expand Up @@ -176,7 +176,7 @@ protected List<MessageId> onMessageListResponse(GxsId groupId, Set<MessageId> me
.map(GxsMessageItem::getMessageId)
.collect(Collectors.toSet());

messageIds.removeIf(existing::contains);
messageIds.removeAll(existing);

return messageIds.stream().toList();
}
Expand Down
41 changes: 39 additions & 2 deletions app/src/main/java/io/xeres/app/xrs/service/rtt/RttRsService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 by David Gerber - https://zapek.com
* Copyright (c) 2019-2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
Expand Down Expand Up @@ -28,13 +28,20 @@
import io.xeres.app.xrs.service.RsServiceType;
import io.xeres.app.xrs.service.rtt.item.RttPingItem;
import io.xeres.app.xrs.service.rtt.item.RttPongItem;
import io.xeres.common.rest.statistics.RttPeer;
import io.xeres.common.rest.statistics.RttStatisticsResponse;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static io.xeres.app.xrs.service.RsServiceType.RTT;
Expand All @@ -47,6 +54,9 @@ public class RttRsService extends RsService
private final PeerConnectionManager peerConnectionManager;

private static final int KEY_COUNTER = 1;
public static final int KEY_RTT = 2;

private final Map<Long, CircularFifoQueue<Duration>> peers = new ConcurrentHashMap<>(); // XXX: we should clear the queue upon disconnection, but we only have initialize(), not cleanup()

public RttRsService(RsServiceRegistry rsServiceRegistry, PeerConnectionManager peerConnectionManager)
{
Expand Down Expand Up @@ -74,6 +84,9 @@ public void initialize(PeerConnection peerConnection)
0,
10,
TimeUnit.SECONDS);

var means = peers.computeIfAbsent(peerConnection.getLocation().getId(), unused -> new CircularFifoQueue<>(20));
means.clear();
}

private int getCounter(PeerConnection peerConnection)
Expand Down Expand Up @@ -115,7 +128,31 @@ else if (item instanceof RttPongItem pongItem)

log.debug("RTT: {}, offset: {}, peerTime: {}", rtt, offset, peerTime);

// To calculate a mean time offset, keep ~20 of them and compute the average between them. XXX: see how RS does it and store it in the peerConnection has an extra value
sender.putServiceData(this, KEY_RTT, rtt.toMillis());

var means = peers.get(sender.getLocation().getId());
means.add(offset);

if (means.isAtFullCapacity())
{
var mean = means.stream()
.mapToLong(Duration::toMillis)
.average()
.orElse(0.0) / 1000.0;

if (Math.abs(mean) > 120.0)
{
log.warn("Peer {}'s time is drifting ({} seconds)", sender, mean);
}
}
}
}

public RttStatisticsResponse getStatistics()
{
List<RttPeer> rttPeers = new ArrayList<>(peerConnectionManager.getNumberOfPeers());
peerConnectionManager.doForAllPeers(peerConnection -> rttPeers.add(new RttPeer(peerConnection.getLocation().getId(), peerConnection.getLocation().getProfile().getName() + "@" + peerConnection.getLocation().getName(), (long) peerConnection.getServiceData(this, KEY_RTT).orElse(0L))), this);

return new RttStatisticsResponse(rttPeers);
}
}
24 changes: 24 additions & 0 deletions common/src/main/java/io/xeres/common/rest/statistics/RttPeer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
* Xeres is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Xeres is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Xeres. If not, see <http://www.gnu.org/licenses/>.
*/

package io.xeres.common.rest.statistics;

public record RttPeer(long id, String name, long mean)
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
* Xeres is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Xeres is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Xeres. If not, see <http://www.gnu.org/licenses/>.
*/

package io.xeres.common.rest.statistics;

import java.util.List;

public record RttStatisticsResponse(List<RttPeer> peers)
{
}
13 changes: 11 additions & 2 deletions ui/src/main/java/io/xeres/ui/client/StatisticsClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 by David Gerber - https://zapek.com
* Copyright (c) 2024-2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
Expand All @@ -20,6 +20,7 @@
package io.xeres.ui.client;

import io.xeres.common.events.StartupEvent;
import io.xeres.common.rest.statistics.RttStatisticsResponse;
import io.xeres.common.rest.statistics.TurtleStatisticsResponse;
import io.xeres.common.util.RemoteUtils;
import org.springframework.context.event.EventListener;
Expand Down Expand Up @@ -49,11 +50,19 @@ public void init(StartupEvent event)
.build();
}

public Mono<TurtleStatisticsResponse> getStatistics()
public Mono<TurtleStatisticsResponse> getTurtleStatistics()
{
return webClient.get()
.uri("/turtle")
.retrieve()
.bodyToMono(TurtleStatisticsResponse.class);
}

public Mono<RttStatisticsResponse> getRttStatistics()
{
return webClient.get()
.uri("/rtt")
.retrieve()
.bodyToMono(RttStatisticsResponse.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 by David Gerber - https://zapek.com
* Copyright (c) 2024-2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
Expand Down Expand Up @@ -32,6 +32,9 @@ public class StatisticsMainWindowController implements WindowController
@FXML
private StatisticsTurtleController statisticsTurtleController;

@FXML
private StatisticsRttController statisticsRttController;

@Override
public void initialize()
{
Expand All @@ -42,11 +45,13 @@ public void initialize()
public void onShown()
{
statisticsTurtleController.start();
statisticsRttController.start();
}

@Override
public void onHiding()
{
statisticsTurtleController.stop();
statisticsRttController.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2025 by David Gerber - https://zapek.com
*
* This file is part of Xeres.
*
* Xeres is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Xeres is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Xeres. If not, see <http://www.gnu.org/licenses/>.
*/

package io.xeres.ui.controller.statistics;

import io.xeres.common.rest.statistics.RttPeer;
import io.xeres.common.util.ExecutorUtils;
import io.xeres.ui.client.StatisticsClient;
import io.xeres.ui.controller.Controller;
import javafx.application.Platform;
import javafx.fxml.FXML;
import javafx.scene.chart.LineChart;
import javafx.scene.chart.NumberAxis;
import javafx.scene.chart.XYChart;
import net.rgielen.fxweaver.core.FxmlView;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

@Component
@FxmlView(value = "/view/statistics/rtt.fxml")
public class StatisticsRttController implements Controller
{
private static final int UPDATE_IN_SECONDS = 10;
private static final int DATA_WINDOW_SIZE = 12; // 2 minutes of data (one each 10 seconds)

@FXML
private LineChart<Number, Number> lineChart;

@FXML
private NumberAxis xAxis;

private final Map<Long, XYChart.Series<Number, Number>> peerSeries = new HashMap<>();

private ScheduledExecutorService executorService;

private final StatisticsClient statisticsClient;

private final ResourceBundle bundle;

public StatisticsRttController(StatisticsClient statisticsClient, ResourceBundle bundle)
{
this.statisticsClient = statisticsClient;
this.bundle = bundle;
}

@Override
public void initialize() throws IOException
{
xAxis.setTickLabelFormatter(new NumberAxis.DefaultFormatter(xAxis)
{
@Override
public String toString(Number object)
{
return String.valueOf(-object.intValue());
}
});
}

public void start()
{
executorService = ExecutorUtils.createFixedRateExecutor(() -> statisticsClient.getRttStatistics()
.doOnSuccess(rttStatisticsResponse -> Platform.runLater(() -> {
rttStatisticsResponse.peers().forEach(rttPeer -> {
var series = peerSeries.computeIfAbsent(rttPeer.id(), aLong -> createSeries(rttPeer));
updateData(series, rttPeer.mean());
});

var ids = rttStatisticsResponse.peers().stream()
.map(RttPeer::id)
.collect(Collectors.toSet());

peerSeries.entrySet().removeIf(entry -> {
if (!ids.contains(entry.getKey()))
{
lineChart.getData().remove(entry.getValue());
return true;
}
return false;
});
}))
.subscribe(),
0,
UPDATE_IN_SECONDS); // XXX: that period should be shared somewhere
}

public void stop()
{
ExecutorUtils.cleanupExecutor(executorService);
}

private XYChart.Series<Number, Number> createSeries(RttPeer rttPeer)
{
var series = new XYChart.Series<Number, Number>();
series.setName(rttPeer.name());
lineChart.getData().add(series);
return series;
}

private static void updateData(XYChart.Series<Number, Number> series, float value)
{
series.getData().forEach(numberNumberData -> numberNumberData.setXValue(numberNumberData.getXValue().intValue() - UPDATE_IN_SECONDS));
series.getData().addFirst(new XYChart.Data<>(0, value));
if (series.getData().size() > DATA_WINDOW_SIZE + 1)
{
series.getData().removeLast();
}
}
}
Loading

0 comments on commit d246d23

Please sign in to comment.