Skip to content

Commit

Permalink
Tvs by timeframe;
Browse files Browse the repository at this point in the history
  • Loading branch information
mismirnov committed Jan 16, 2025
1 parent 9cfa870 commit 95df86c
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 44 deletions.
6 changes: 3 additions & 3 deletions cmd/api/handler/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 +361,14 @@ func (handler RollupHandler) Stats(c echo.Context) error {
histogram, err = handler.rollups.Tvl(
c.Request().Context(),
req.Id,
req.Timeframe,
storage.Timeframe(req.Timeframe),
storage.NewSeriesRequest(req.From, req.To),
)
} else {
histogram, err = handler.rollups.Series(
c.Request().Context(),
req.Id,
req.Timeframe,
storage.Timeframe(req.Timeframe),
req.SeriesName,
storage.NewSeriesRequest(req.From, req.To),
)
Expand Down Expand Up @@ -490,7 +490,7 @@ func (handler RollupHandler) Distribution(c echo.Context) error {
c.Request().Context(),
req.Id,
req.SeriesName,
req.Timeframe,
storage.Timeframe(req.Timeframe),
)
if err != nil {
return handleError(c, err, handler.rollups)
Expand Down
39 changes: 38 additions & 1 deletion cmd/api/handler/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (sh StatsHandler) MessagesCount24h(c echo.Context) error {
// @Tags stats
// @ID stats-tvs
// @Produce json
// @Success 200 {integer} uint64
// @Success 200 {integer} float64
// @Failure 500 {object} Error
// @Router /stats/tvs [get]
func (sh StatsHandler) Tvs(c echo.Context) error {
Expand All @@ -562,3 +562,40 @@ func (sh StatsHandler) Tvs(c echo.Context) error {
}
return c.JSON(http.StatusOK, tvs)
}

// TvsSeries godoc
//
// @Summary Get histogram for network TVS
// @Description Get histogram for network TVS by timeframe
// @Tags stats
// @ID stats-tvs-series
// @Param timeframe path string true "Timeframe" Enums(day, month)
// @Produce json
// @Success 200 {array} responses.SeriesItem
// @Failure 400 {object} Error
// @Failure 500 {object} Error
// @Router /stats/tvs/{timeframe} [get]
func (sh StatsHandler) TvsSeries(c echo.Context) error {
req, err := bindAndValidate[tvsSeriesRequest](c)
if err != nil {
return badRequestError(c, err)
}

tvs, err := sh.repo.TvsSeries(c.Request().Context(), storage.Timeframe(req.Timeframe))
if err != nil {
return handleError(c, err, sh.nsRepo)
}
if len(tvs) == 0 {
return c.JSON(http.StatusOK, []any{})
}

response := make([]responses.SeriesItem, len(tvs))
for i := range tvs {
response[i] = responses.NewSeriesItem(tvs[i])
}
return returnArray(c, response)
}

type tvsSeriesRequest struct {
Timeframe string `example:"hour" param:"timeframe" swaggertype:"string" validate:"required,oneof=day month"`
}
1 change: 1 addition & 0 deletions cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func initHandlers(ctx context.Context, e *echo.Echo, cfg Config, db postgres.Sto
stats.GET("/square_size", statsHandler.SquareSize)
stats.GET("/messages_count_24h", statsHandler.MessagesCount24h)
stats.GET("/tvs", statsHandler.Tvs)
stats.GET("/tvs/:timeframe", statsHandler.TvsSeries)

price := stats.Group("/price")
{
Expand Down
4 changes: 2 additions & 2 deletions database/views/25_rollup_tvl_by_month.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
select
time_bucket('1 month'::interval, logs.time) AS time,
logs.rollup_id as rollup_id,
sum(logs.value) as value
last(logs.value, time) as value
from tvl as logs
group by 1, 2
with no data;
CALL add_view_refresh_job('rollup_tvl_by_month', NULL, INTERVAL '1 hour');
CALL add_view_refresh_job('rollup_tvl_by_month', NULL, INTERVAL '12 hours');
9 changes: 9 additions & 0 deletions database/views/26_tvs_by_day.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS tvs_by_day
WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
select
time_bucket('1 day'::interval, logs.time) AS time,
sum(logs.value) as value
from tvl as logs
group by 1
with no data;
CALL add_view_refresh_job('tvs_by_day', NULL, INTERVAL '4 hours');
11 changes: 11 additions & 0 deletions database/views/27_tvs_by_month.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS tvs_by_month
WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
select
time_bucket('1 month'::interval, logs.time) AS time,
min(logs.value)::DOUBLE PRECISION AS min_value,
max(logs.value)::DOUBLE PRECISION AS max_value,
last(logs.value, time)::DOUBLE PRECISION AS value
from tvl as logs
group by 1
with no data;
CALL add_view_refresh_job('tvs_by_month', NULL, INTERVAL '12 hours');
1 change: 1 addition & 0 deletions internal/storage/mock/namespace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions internal/storage/mock/rollup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions internal/storage/mock/stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 11 additions & 17 deletions internal/storage/postgres/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ import (
"github.com/uptrace/bun"
)

const (
Hour = string(storage.TimeframeHour)
Day = string(storage.TimeframeDay)
Month = string(storage.TimeframeMonth)
)

// Rollup -
type Rollup struct {
*postgres.Table[*storage.Rollup]
Expand Down Expand Up @@ -132,7 +126,7 @@ func (r *Rollup) RollupsByNamespace(ctx context.Context, namespaceId uint64, lim
return
}

func (r *Rollup) Series(ctx context.Context, rollupId uint64, timeframe, column string, req storage.SeriesRequest) (items []storage.HistogramItem, err error) {
func (r *Rollup) Series(ctx context.Context, rollupId uint64, timeframe storage.Timeframe, column string, req storage.SeriesRequest) (items []storage.HistogramItem, err error) {
providers, err := r.Providers(ctx, rollupId)
if err != nil {
return nil, err
Expand All @@ -145,11 +139,11 @@ func (r *Rollup) Series(ctx context.Context, rollupId uint64, timeframe, column
query := r.DB().NewSelect().Order("time desc").Limit(100).Group("time")

switch timeframe {
case Hour:
case storage.TimeframeHour:
query = query.Table("rollup_stats_by_hour")
case Day:
case storage.TimeframeDay:
query = query.Table("rollup_stats_by_day")
case Month:
case storage.TimeframeMonth:
query = query.Table("rollup_stats_by_month")
default:
return nil, errors.Errorf("invalid timeframe: %s", timeframe)
Expand Down Expand Up @@ -226,19 +220,19 @@ func (r *Rollup) ById(ctx context.Context, rollupId uint64) (rollup storage.Roll
return
}

func (r *Rollup) Tvl(ctx context.Context, rollupId uint64, timeframe string, req storage.SeriesRequest) (items []storage.HistogramItem, err error) {
func (r *Rollup) Tvl(ctx context.Context, rollupId uint64, timeframe storage.Timeframe, req storage.SeriesRequest) (items []storage.HistogramItem, err error) {
query := r.DB().NewSelect().
ColumnExpr("value, time as bucket").
Where("rollup_id = ?", rollupId).
Limit(100).
Order("time desc")

switch timeframe {
case Hour:
case storage.TimeframeHour:
return nil, errors.Errorf("unavailable data for this timeframe: %s", timeframe)
case Day:
case storage.TimeframeDay:
query = query.Table("tvl")
case Month:
case storage.TimeframeMonth:
query = query.Table(storage.ViewRollupTvlByMonth)
default:
return nil, errors.Errorf("invalid timeframe: %s", timeframe)
Expand All @@ -256,7 +250,7 @@ func (r *Rollup) Tvl(ctx context.Context, rollupId uint64, timeframe string, req
return
}

func (r *Rollup) Distribution(ctx context.Context, rollupId uint64, series, groupBy string) (items []storage.DistributionItem, err error) {
func (r *Rollup) Distribution(ctx context.Context, rollupId uint64, series string, groupBy storage.Timeframe) (items []storage.DistributionItem, err error) {
providers, err := r.Providers(ctx, rollupId)
if err != nil {
return
Expand All @@ -280,11 +274,11 @@ func (r *Rollup) Distribution(ctx context.Context, rollupId uint64, series, grou
}

switch groupBy {
case Day:
case storage.TimeframeDay:
cte = cte.Table("rollup_stats_by_day").
ColumnExpr("extract(isodow from time) as name").
Where("time >= ?", time.Now().AddDate(0, -3, 0).UTC())
case Hour:
case storage.TimeframeHour:
cte = cte.Table("rollup_stats_by_hour").
ColumnExpr("extract(hour from time) as name").
Where("time >= ?", time.Now().AddDate(0, -1, 0).UTC())
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/postgres/rollup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (s *StorageTestSuite) TestRollupSeries() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

for _, tf := range []string{
"day", "hour", "month",
for _, tf := range []storage.Timeframe{
storage.TimeframeDay, storage.TimeframeHour, storage.TimeframeMonth,
} {
for _, column := range []string{
"size", "blobs_count", "size_per_blob", "fee",
Expand Down Expand Up @@ -239,8 +239,8 @@ func (s *StorageTestSuite) TestRollupDistribution() {
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ctxCancel()

for _, groupBy := range []string{
"day", "hour",
for _, groupBy := range []storage.Timeframe{
storage.TimeframeDay, storage.TimeframeHour,
} {
for _, series := range []string{
"size", "blobs_count", "size_per_blob", "fee_per_blob",
Expand Down
30 changes: 25 additions & 5 deletions internal/storage/postgres/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,42 @@ func (s Stats) MessagesCount24h(ctx context.Context) (response []storage.CountIt

func (s Stats) Tvs(ctx context.Context) (tvs float64, err error) {
var lastTs time.Time
errTs := s.db.DB().
var query = s.db.DB().
NewSelect().
Table(storage.ViewRollupTvlByMonth).
Table(storage.ViewRollupTvlByMonth)
errTs := query.
ColumnExpr("MAX(time) AS time").
Scan(ctx, &lastTs)

if errTs != nil {
return 0, errTs
}

err = s.db.DB().
NewSelect().
Table(storage.ViewRollupTvlByMonth).
err = query.
ColumnExpr("sum(value) as value").
Where("time = ?", lastTs).
Scan(ctx, &tvs)

return
}

func (s Stats) TvsSeries(ctx context.Context, timeframe storage.Timeframe) (response []storage.SeriesItem, err error) {
query := s.db.DB().NewSelect()

switch timeframe {
case storage.TimeframeDay:
query = query.Table(storage.ViewTvsByDay).
ColumnExpr("value, time as ts")
case storage.TimeframeMonth:
query = query.Table(storage.ViewTvsByMonth).
ColumnExpr("value, min_value as min, max_value as max, time as ts")
default:
return nil, errors.Errorf("unexpected timeframe %s", timeframe)
}

err = query.Order("time desc").
Limit(100).
Scan(ctx, &response)

return
}
Loading

0 comments on commit 95df86c

Please sign in to comment.