Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor async command handling #83

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
)

const (
databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate"
databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL"
defaultConfigFilename = "conf.yml"
defaultPort = 8080
defaultHost = ""
Expand Down
11 changes: 4 additions & 7 deletions cmd/serve/front/src/lib/localization/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,11 @@ You may reconsider and try to make the target reachable before deleting it.`,
'jobs.dates': 'Queued at / Not before',
'jobs.error': 'error',
'jobs.payload': 'payload',
'jobs.policy': 'policy',
'jobs.policy.preserve_group_order': 'Preserve group order on error',
'jobs.policy.wait_others_resource_id': 'Wait for others jobs to finish on resource',
'jobs.policy.cancellable': 'Cancellable',
'jobs.policy.mergeable': 'Mergeable',
'jobs.group': 'group',
'jobs.cancel': 'Cancel job',
'jobs.cancel.confirm': 'Are you sure you want to cancel this job?',
'jobs.dismiss': 'Dismiss job',
'jobs.dismiss.confirm': 'Are you sure you want to dismiss this job?',
'jobs.retry': 'Retry job',
'jobs.retry.confirm': 'Are you sure you want to retry this job?',
// Jobs names
'deployment.command.cleanup_app': 'Application cleanup',
'deployment.command.cleanup_target': 'Target cleanup',
Expand Down
11 changes: 4 additions & 7 deletions cmd/serve/front/src/lib/localization/fr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,11 @@ Vous devriez probablement essayer de rendre la cible accessible avant de la supp
'jobs.dates': 'Créée le / Pas avant',
'jobs.error': 'erreur',
'jobs.payload': 'charge utile',
'jobs.policy': 'politique',
'jobs.policy.preserve_group_order': "Préserve l'ordre au sein du groupe en cas d'erreur",
'jobs.policy.wait_others_resource_id': "Attend l'achèvement des tâches sur cette ressource",
'jobs.policy.cancellable': 'Annulable',
'jobs.policy.mergeable': 'Fusionnable',
'jobs.group': 'groupe',
'jobs.cancel': 'Annuler la tâche',
'jobs.cancel.confirm': 'Voulez-vous vraiment annuler la tâche ?',
'jobs.dismiss': 'Ignorer la tâche',
'jobs.dismiss.confirm': 'Voulez-vous vraiment ignorer la tâche ?',
'jobs.retry': 'Relancer la tâche',
'jobs.retry.confirm': 'Voulez-vous vraiment relancer la tâche ?',
// Jobs names
'deployment.command.cleanup_app': "Nettoyage de l'application",
'deployment.command.cleanup_target': 'Nettoyage de la cible',
Expand Down
20 changes: 9 additions & 11 deletions cmd/serve/front/src/lib/resources/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@ import type { Paginated } from '$lib/pagination';

export type Job = {
id: string;
resource_id: string;
group: string;
message_name: string;
message_data: string;
queued_at: string;
not_before: string;
error_code?: string;
policy: number;
retrieved: boolean;
};

export enum JobPolicy {
PreserveOrder = 1,
WaitForOthersResourceID = 2,
Cancellable = 4,
Mergeable = 8
}

export interface JobsService {
delete(id: string): Promise<void>;
dismiss(id: string): Promise<void>;
retry(id: string): Promise<void>;
fetchAll(page: number, options?: FetchOptions): Promise<Paginated<Job>>;
queryAll(page: number): QueryResult<Paginated<Job>>;
}
Expand All @@ -35,12 +27,18 @@ type Options = {
export class RemoteJobsService implements JobsService {
constructor(private readonly _fetcher: FetchService, private readonly _options: Options) {}

delete(id: string): Promise<void> {
dismiss(id: string): Promise<void> {
return this._fetcher.delete(`/api/v1/jobs/${id}`, {
invalidate: ['/api/v1/jobs']
});
}

retry(id: string): Promise<void> {
return this._fetcher.put(`/api/v1/jobs/${id}`, {
invalidate: ['/api/v1/jobs']
});
}

queryAll(page: number): QueryResult<Paginated<Job>> {
return this._fetcher.query('/api/v1/jobs', {
refreshInterval: this._options.pollingInterval,
Expand Down
41 changes: 7 additions & 34 deletions cmd/serve/front/src/routes/(main)/jobs/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,16 @@
import Display from '$components/display.svelte';
import Pagination from '$components/pagination.svelte';
import Stack from '$components/stack.svelte';
import CancelButton from './cancel-button.svelte';
import service, { JobPolicy } from '$lib/resources/jobs';
import service from '$lib/resources/jobs';
import l, { type AppTranslationsString } from '$lib/localization';
import JobActionButton from './job-action-button.svelte';

let page = 1;

function translateMessageName(messageName: string) {
return l.translate(messageName as AppTranslationsString);
}

function translatePolicy(policy: number) {
if (!policy) {
return '-';
}

const policies: string[] = [];

if ((policy & JobPolicy.PreserveOrder) !== 0) {
policies.push(l.translate('jobs.policy.preserve_group_order'));
}

if ((policy & JobPolicy.WaitForOthersResourceID) !== 0) {
policies.push(l.translate('jobs.policy.wait_others_resource_id'));
}

if ((policy & JobPolicy.Cancellable) !== 0) {
policies.push(l.translate('jobs.policy.cancellable'));
}

if ((policy & JobPolicy.Mergeable) !== 0) {
policies.push(l.translate('jobs.policy.mergeable'));
}

return policies.join(', ');
}

$: ({ data } = service.queryAll(page));
</script>

Expand Down Expand Up @@ -74,7 +48,6 @@
{:else if value === 'resource'}
<!-- @ts-ignore -->
<div>{translateMessageName(item.message_name)}</div>
<div class="meta">{item.resource_id}</div>
{/if}
</svelte:fragment>

Expand All @@ -87,15 +60,15 @@
<Display label="jobs.payload">
<code>{item.message_data}</code>
</Display>
<Display label="jobs.policy">
{translatePolicy(item.policy)}
</Display>
<Display label="jobs.error">
{item.error_code ?? '-'}
</Display>
</dl>
{#if (item.policy & JobPolicy.Cancellable) !== 0}
<CancelButton id={item.id} {page} />
{#if item.error_code}
<Stack justify="flex-end">
<JobActionButton id={item.id} mode="dismiss" {page} />
<JobActionButton id={item.id} mode="retry" {page} />
</Stack>
{/if}
</Stack>
</svelte:fragment>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
<script lang="ts">
import Button from '$components/button.svelte';
import Stack from '$components/stack.svelte';
import { submitter } from '$lib/form';
import service from '$lib/resources/jobs';
import Stack from '$components/stack.svelte';
import l from '$lib/localization';

export let id: string;
export let page: number;
export let page: number; // Page number, used to refresh the jobs list
export let mode: 'dismiss' | 'retry';

const { submit, loading } = submitter(
() =>
service.delete(id).then(() =>
service[mode](id).then(() =>
service.fetchAll(page, {
cache: 'no-store' // Force the refresh of jobs list
})
),
{
confirmation: l.translate('jobs.cancel.confirm')
confirmation: l.translate(`jobs.${mode}.confirm`)
}
);
</script>

<Stack direction="row" justify="flex-end">
<Button variant="danger" text="jobs.cancel" on:click={submit} loading={$loading} />
<Button
variant={mode == 'dismiss' ? 'danger' : 'outlined'}
text={`jobs.${mode}`}
on:click={submit}
loading={$loading}
/>
</Stack>
25 changes: 20 additions & 5 deletions cmd/serve/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package serve

import (
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/bus/embedded/dismiss_job"
"github.com/YuukanOO/seelf/pkg/bus/embedded/get_jobs"
"github.com/YuukanOO/seelf/pkg/bus/embedded/retry_job"
"github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-gonic/gin"
)
Expand All @@ -12,13 +15,13 @@ type listJobsFilters struct {

func (s *server) listJobsHandler() gin.HandlerFunc {
return http.Bind(s, func(ctx *gin.Context, request listJobsFilters) error {
var filters bus.GetJobsFilters
var filters get_jobs.Query

if request.Page != 0 {
filters.Page.Set(request.Page)
}

jobs, err := s.scheduledJobsStore.GetAllJobs(ctx.Request.Context(), filters)
jobs, err := bus.Send(s.bus, ctx.Request.Context(), filters)

if err != nil {
return err
Expand All @@ -28,11 +31,23 @@ func (s *server) listJobsHandler() gin.HandlerFunc {
})
}

func (s *server) deleteJobsHandler() gin.HandlerFunc {
func (s *server) dismissJobHandler() gin.HandlerFunc {
return http.Send(s, func(ctx *gin.Context) error {
err := s.scheduledJobsStore.Delete(ctx.Request.Context(), ctx.Param("id"))
if _, err := bus.Send(s.bus, ctx.Request.Context(), dismiss_job.Command{
ID: ctx.Param("id"),
}); err != nil {
return err
}

if err != nil {
return http.NoContent(ctx)
})
}

func (s *server) retryJobHandler() gin.HandlerFunc {
return http.Send(s, func(ctx *gin.Context) error {
if _, err := bus.Send(s.bus, ctx.Request.Context(), retry_job.Command{
ID: ctx.Param("id"),
}); err != nil {
return err
}

Expand Down
14 changes: 9 additions & 5 deletions cmd/serve/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strings"
"time"

"github.com/YuukanOO/seelf/internal/auth/app/api_login"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
httputils "github.com/YuukanOO/seelf/pkg/http"
"github.com/gin-contrib/sessions"
"github.com/gin-gonic/gin"
Expand All @@ -25,8 +27,8 @@ var errUnauthorized = errors.New("unauthorized")
func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return func(ctx *gin.Context) {
// First, try to find a user id in the encrypted session cookie
sess := sessions.Default(ctx)
uid, ok := sess.Get(userSessionKey).(string)
userSession := sessions.Default(ctx)
uid, ok := userSession.Get(userSessionKey).(string)
failed := !ok || uid == ""

// If it failed and api access is not allowed, return early
Expand All @@ -50,15 +52,17 @@ func (s *server) authenticate(withApiAccess bool) gin.HandlerFunc {
return
}

id, err := s.usersReader.GetIDFromAPIKey(ctx.Request.Context(), domain.APIKey(authHeader[apiAuthPrefixLength:]))
id, err := bus.Send(s.bus, ctx.Request.Context(), api_login.Query{
Key: authHeader[apiAuthPrefixLength:],
})

if err != nil {
_ = ctx.AbortWithError(http.StatusUnauthorized, errUnauthorized)
return
}

// Attach the user id to the context passed down in every usecases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), id))
// Attach the user id to the context passed down in every use cases.
ctx.Request = ctx.Request.WithContext(domain.WithUserID(ctx.Request.Context(), domain.UserID(id)))

ctx.Next()
}
Expand Down
24 changes: 10 additions & 14 deletions cmd/serve/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/YuukanOO/seelf/cmd/startup"
"github.com/YuukanOO/seelf/internal/auth/domain"
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/log"
"github.com/gin-contrib/sessions"
Expand All @@ -39,25 +38,21 @@ type (
}

server struct {
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
usersReader domain.UsersReader
scheduledJobsStore bus.ScheduledJobsStore
options ServerOptions
router *gin.Engine
bus bus.Dispatcher
logger log.Logger
}
)

func newHttpServer(options ServerOptions, root startup.ServerRoot) *server {
gin.SetMode(gin.ReleaseMode)

s := &server{
options: options,
router: gin.New(),
usersReader: root.UsersReader(),
scheduledJobsStore: root.ScheduledJobsStore(),
bus: root.Bus(),
logger: root.Logger(),
options: options,
router: gin.New(),
bus: root.Bus(),
logger: root.Logger(),
}

_ = s.router.SetTrustedProxies(nil)
Expand Down Expand Up @@ -87,7 +82,8 @@ func newHttpServer(options ServerOptions, root startup.ServerRoot) *server {
v1secured := v1.Group("", s.authenticate(false))
v1secured.DELETE("/session", s.deleteSessionHandler())
v1secured.GET("/jobs", s.listJobsHandler())
v1secured.DELETE("/jobs/:id", s.deleteJobsHandler())
v1secured.DELETE("/jobs/:id", s.dismissJobHandler())
v1secured.PUT("/jobs/:id", s.retryJobHandler())
v1secured.GET("/profile", s.getProfileHandler())
v1secured.PATCH("/profile", s.updateProfileHandler())
v1secured.PUT("/profile/key", s.refreshProfileKeyHandler())
Expand Down
Loading
Loading