Skip to content

Commit

Permalink
fix(task): create authorization when using token to create task
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-rushakoff committed Feb 15, 2019
1 parent 05f5af7 commit 0977a65
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 5 deletions.
2 changes: 1 addition & 1 deletion authorizer/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func VerifyPermissions(ctx context.Context, ps []influxdb.Permission) error {
if err := IsAllowed(ctx, p); err != nil {
return &influxdb.Error{
Err: err,
Msg: fmt.Sprintf("cannot create authorization with permission %s", p),
Msg: fmt.Sprintf("permission %s is not allowed", p),
Code: influxdb.EForbidden,
}
}
Expand Down
17 changes: 13 additions & 4 deletions http/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"strings"
"time"

"github.com/influxdata/flux"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/authorizer"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
Expand All @@ -32,6 +34,7 @@ type TaskBackend struct {
UserResourceMappingService platform.UserResourceMappingService
LabelService platform.LabelService
UserService platform.UserService
BucketService platform.BucketService
}

// NewTaskBackend returns a new instance of TaskBackend.
Expand All @@ -44,6 +47,7 @@ func NewTaskBackend(b *APIBackend) *TaskBackend {
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
BucketService: b.BucketService,
}
}

Expand All @@ -58,6 +62,7 @@ type TaskHandler struct {
UserResourceMappingService platform.UserResourceMappingService
LabelService platform.LabelService
UserService platform.UserService
BucketService platform.BucketService
}

const (
Expand Down Expand Up @@ -88,6 +93,7 @@ func NewTaskHandler(b *TaskBackend) *TaskHandler {
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
BucketService: b.BucketService,
}

h.HandlerFunc("GET", tasksPath, h.handleGetTasks)
Expand Down Expand Up @@ -374,9 +380,6 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request) (*getTasksReque
return req, nil
}

// TODO(desa): remove
func getPermissions() ([]platform.Permission, error) { return nil, nil }

func (h *TaskHandler) createTaskAuthorizationIfNotExists(ctx context.Context, a platform.Authorizer, t *platform.TaskCreate) error {
if t.Token != "" {
return nil
Expand All @@ -388,7 +391,13 @@ func (h *TaskHandler) createTaskAuthorizationIfNotExists(ctx context.Context, a
return nil
}

ps, err := getPermissions() // convert task to required permissions here
spec, err := flux.Compile(ctx, t.Flux, time.Now())
if err != nil {
return err
}

preAuthorizer := query.NewPreAuthorizer(h.BucketService)
ps, err := preAuthorizer.RequiredPermissions(ctx, spec)
if err != nil {
return err
}
Expand Down
99 changes: 99 additions & 0 deletions http/task_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

platform "github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
Expand Down Expand Up @@ -944,3 +945,101 @@ func TestService_handlePostTaskLabel(t *testing.T) {
})
}
}

func TestTaskHandler_CreateTaskFromSession(t *testing.T) {
var createdTasks []platform.TaskCreate
ts := &mock.TaskService{
CreateTaskFn: func(_ context.Context, tc platform.TaskCreate) (*platform.Task, error) {
createdTasks = append(createdTasks, tc)
// Task with fake IDs so it can be serialized.
return &platform.Task{ID: 9, OrganizationID: 99, AuthorizationID: 999}, nil
},
}

i := inmem.NewService()
h := NewTaskHandler(&TaskBackend{
Logger: zaptest.NewLogger(t),

TaskService: ts,
AuthorizationService: i,
OrganizationService: i,
UserResourceMappingService: i,
LabelService: i,
UserService: i,
BucketService: i,
})

ctx := context.Background()

// Set up user and org.
u := &platform.User{Name: "u"}
if err := i.CreateUser(ctx, u); err != nil {
t.Fatal(err)
}
o := &platform.Organization{Name: "o"}
if err := i.CreateOrganization(ctx, o); err != nil {
t.Fatal(err)
}

// Map user to org.
if err := i.CreateUserResourceMapping(ctx, &platform.UserResourceMapping{
ResourceType: platform.OrgsResourceType,
ResourceID: o.ID,
UserID: u.ID,
UserType: platform.Owner,
}); err != nil {
t.Fatal(err)
}

// Source and destination buckets for use in task.
bSrc := platform.Bucket{OrganizationID: o.ID, Name: "b-src"}
if err := i.CreateBucket(ctx, &bSrc); err != nil {
t.Fatal(err)
}
bDst := platform.Bucket{OrganizationID: o.ID, Name: "b-dst"}
if err := i.CreateBucket(ctx, &bDst); err != nil {
t.Fatal(err)
}

// Create a session for use in authorizing context.
s := &platform.Session{
UserID: u.ID,
Permissions: platform.OperPermissions(),
ExpiresAt: time.Now().Add(24 * time.Hour),
}

b, err := json.Marshal(platform.TaskCreate{
Flux: `option task = {name:"x", every:1m} from(bucket:"b-src") |> range(start:-1m) |> to(bucket:"b-dst", org:"o")`,
OrganizationID: o.ID,
})
if err != nil {
t.Fatal(err)
}

sessionCtx := pcontext.SetAuthorizer(context.Background(), s)
url := fmt.Sprintf("http://localhost:9999/api/v2/tasks")
r := httptest.NewRequest("POST", url, bytes.NewReader(b)).WithContext(sessionCtx)

w := httptest.NewRecorder()

h.handlePostTask(w, r)

res := w.Result()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if res.StatusCode != http.StatusCreated {
t.Logf("response body: %s", body)
t.Fatalf("expected status created, got %v", res.StatusCode)
}

if len(createdTasks) != 1 {
t.Fatalf("didn't create task; got %#v", createdTasks)
}

// The task should have been created with a valid token.
if _, err := i.FindAuthorizationByToken(ctx, createdTasks[0].Token); err != nil {
t.Fatal(err)
}
}
45 changes: 45 additions & 0 deletions query/preauthorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// for authorization to be denied at runtime even if this check passes.
type PreAuthorizer interface {
PreAuthorize(ctx context.Context, spec *flux.Spec, auth platform.Authorizer) error
RequiredPermissions(ctx context.Context, spec *flux.Spec) ([]platform.Permission, error)
}

// NewPreAuthorizer creates a new PreAuthorizer
Expand Down Expand Up @@ -71,3 +72,47 @@ func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth

return nil
}

// RequiredPermissions returns a slice of permissions required for the query contained in spec.
// This method also validates that the buckets exist.
func (a *preAuthorizer) RequiredPermissions(ctx context.Context, spec *flux.Spec) ([]platform.Permission, error) {
readBuckets, writeBuckets, err := BucketsAccessed(spec)

if err != nil {
return nil, errors.Wrap(err, "could not retrieve buckets for query.Spec")
}

ps := make([]platform.Permission, 0, len(readBuckets)+len(writeBuckets))
for _, readBucketFilter := range readBuckets {
bucket, err := a.bucketService.FindBucket(ctx, readBucketFilter)
if err != nil {
return nil, errors.Wrapf(err, "could not find read bucket with filter: %s", readBucketFilter)
}

if bucket == nil {
return nil, errors.New("bucket service returned nil bucket")
}

reqPerm, err := platform.NewPermissionAtID(bucket.ID, platform.ReadAction, platform.BucketsResourceType, bucket.OrganizationID)
if err != nil {
return nil, errors.Wrapf(err, "could not create read bucket permission")
}

ps = append(ps, *reqPerm)
}

for _, writeBucketFilter := range writeBuckets {
bucket, err := a.bucketService.FindBucket(ctx, writeBucketFilter)
if err != nil {
return nil, errors.Wrapf(err, "could not find write bucket with filter: %s", writeBucketFilter)
}

reqPerm, err := platform.NewPermissionAtID(bucket.ID, platform.WriteAction, platform.BucketsResourceType, bucket.OrganizationID)
if err != nil {
return nil, errors.Wrapf(err, "could not create write bucket permission")
}
ps = append(ps, *reqPerm)
}

return ps, nil
}
44 changes: 44 additions & 0 deletions query/preauthorizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/query"
Expand Down Expand Up @@ -81,3 +82,46 @@ func TestPreAuthorizer_PreAuthorize(t *testing.T) {
t.Errorf("Expected successful authorization, but got error: \"%v\"", err.Error())
}
}

func TestPreAuthorizer_RequiredPermissions(t *testing.T) {
ctx := context.Background()

i := inmem.NewService()

o := platform.Organization{Name: "o"}
if err := i.CreateOrganization(ctx, &o); err != nil {
t.Fatal(err)
}
bFrom := platform.Bucket{Name: "b-from", OrganizationID: o.ID}
if err := i.CreateBucket(ctx, &bFrom); err != nil {
t.Fatal(err)
}
bTo := platform.Bucket{Name: "b-to", OrganizationID: o.ID}
if err := i.CreateBucket(ctx, &bTo); err != nil {
t.Fatal(err)
}

const script = `from(bucket:"b-from") |> range(start:-1m) |> to(bucket:"b-to", org:"o")`
spec, err := flux.Compile(ctx, script, time.Now())
if err != nil {
t.Fatal(err)
}

preAuthorizer := query.NewPreAuthorizer(i)
perms, err := preAuthorizer.RequiredPermissions(ctx, spec)
if err != nil {
t.Fatal(err)
}

pWrite, err := platform.NewPermissionAtID(bTo.ID, platform.WriteAction, platform.BucketsResourceType, o.ID)
if err != nil {
t.Fatal(err)
}

t.Log("WARNING: this test does not validate permissions on the 'from' bucket. Please update after https://github.com/influxdata/flux/issues/114.")

exp := []platform.Permission{*pWrite}
if diff := cmp.Diff(exp, perms); diff != "" {
t.Fatalf("unexpected permissions: %s", diff)
}
}

0 comments on commit 0977a65

Please sign in to comment.