Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#11 #12 issues fixed #8

Merged
merged 15 commits into from
Dec 5, 2017
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ vendor
build/
configs/

.DS_Store
34 changes: 25 additions & 9 deletions disneyland/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestGRPCJobCRUD(t *testing.T) {
c := NewDisneylandClient(conn)

ctx := context.Background()

//first
createdJob, err := c.CreateJob(ctx, &Job{Status: Job_PENDING})
checkTestErr(err, t)

Expand All @@ -94,8 +94,8 @@ func TestGRPCJobCRUD(t *testing.T) {
}

createdJob.Status = Job_PENDING
createdJob.Metadata = "meta_test"
createdJob.Output = "output_test"
createdJob.Metadata = "updated_test"
createdJob.Output = "updated_test"
createdJob.Kind = "docker"

updatedJob, err := c.ModifyJob(ctx, createdJob)
Expand All @@ -104,25 +104,41 @@ func TestGRPCJobCRUD(t *testing.T) {
if !checkJobsEqual(createdJob, updatedJob) {
t.Fail()
}
//second
createdJob, err = c.CreateJob(ctx, &Job{Kind: "docker"})
checkTestErr(err, t)

createdJob, err = c.CreateJob(ctx, &Job{})
allJobs, err := c.ListJobs(ctx, &ListJobsRequest{HowMany: 0})
checkTestErr(err, t)

allJobs, err := c.ListJobs(ctx, &ListJobsRequest{HowMany: 2})
if len(allJobs.Jobs) != 2 {
t.Fail()
}

allJobs, err = c.ListJobs(ctx, &ListJobsRequest{HowMany: 1})
checkTestErr(err, t)

if len(allJobs.Jobs) < 1 {
if len(allJobs.Jobs) != 1 {
t.Fail()
}

pulledJobs, err := c.PullPendingJobs(ctx, &ListJobsRequest{HowMany: 2})
allJobs, err = c.ListJobs(ctx, &ListJobsRequest{Kind: "docker", HowMany:2})
checkTestErr(err, t)

if len(pulledJobs.Jobs) < 1 {
if len(allJobs.Jobs) != 2 {
t.Fail()
}

_, err = c.DeleteJob(ctx, &RequestWithId{Id: 2})
pulledJobs, err := c.PullPendingJobs(ctx, &ListJobsRequest{HowMany: 1})
checkTestErr(err, t)

if len(pulledJobs.Jobs) != 1 {
t.Fail()
}
//third
createdJob, err = c.CreateJob(ctx, &Job{Kind: "remove"})
checkTestErr(err, t)

_, err = c.DeleteJob(ctx, &RequestWithId{Id: createdJob.Id})
checkTestErr(err, t)
}
76 changes: 28 additions & 48 deletions disneyland/disneyland.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions disneyland/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (s *Server) PullPendingJobs(ctx context.Context, in *ListJobsRequest) (*Lis
if user.IsUser() {
in.Project = user.ProjectAccess
}

pts, err := s.Storage.PullJobs(in.HowMany, in.Project, in.Kind)

if err != nil {
Expand Down
134 changes: 112 additions & 22 deletions disneyland/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,27 @@ package disneyland
import (
"database/sql"
_ "github.com/lib/pq"
"strconv"
)

const PULLINGSTRQ_1 = `WITH updatedPts AS (
WITH pulledPts AS (
SELECT id, project, kind
FROM jobs
WHERE status=$1`
const PULLINGSTRQ_2 = ` FOR UPDATE SKIP LOCKED)
UPDATE jobs pts
SET status=$2
FROM pulledPts
WHERE pulledPts.id=pts.id AND pulledPts.project=pts.project AND pulledPts.kind=pts.kind
RETURNING pts.id, pts.project, pts.status, pts.metadata, pts.input, pts.output, pts.kind)
SELECT *
FROM updatedPts
ORDER BY id ASC;`
const LISTSTRQ_1 = `SELECT id, project, status, metadata, input, output, kind
FROM jobs
WHERE`

type DisneylandStorageConfig struct {
DatabaseURI string `json:"db_uri"`
}
Expand Down Expand Up @@ -92,6 +111,7 @@ func (storage *DisneylandStorage) GetJob(id uint64) (*Job, error) {
}
return job, err
}

func queryJobs(rows *sql.Rows) (*ListOfJobs, error) {
ret := &ListOfJobs{Jobs: []*Job{}}
var err error
Expand All @@ -118,12 +138,56 @@ func queryJobs(rows *sql.Rows) (*ListOfJobs, error) {
}

func (storage *DisneylandStorage) ListJobs(howmany uint32, project string, kind string) (*ListOfJobs, error) {
strQuery := `SELECT id, project, status, metadata, input, output, kind
FROM jobs
WHERE project LIKE '%' || $1 || '%' AND kind LIKE '%' || $2 || '%'
LIMIT $3;`
tx, err := storage.db.Begin()
if err != nil {
return nil, err
}
var rows *sql.Rows
projectFlag := false
kindFlag := false
limitFlag := false

strQuery := LISTSTRQ_1
inc := 1
if project != "" {
strQuery += " project=$"
strQuery += strconv.Itoa(inc)
inc++
projectFlag = true
}
if kind != "" {
if projectFlag {
strQuery += " AND kind=$"
} else {
strQuery += " kind=$"
}
strQuery += strconv.Itoa(inc)
inc++
kindFlag = true
}
if howmany != 0 {
strQuery += " LIMIT $"
strQuery += strconv.Itoa(inc)
inc++
limitFlag = true
}
strQuery += `;`

rows, err := storage.db.Query(strQuery, project, kind, howmany)
if projectFlag {
if kindFlag {
if limitFlag {
rows, err = tx.Query(strQuery, project, kind, howmany)
} else {
rows, err = tx.Query(strQuery, project, kind)
}
} else if limitFlag {
rows, err = tx.Query(strQuery, project, howmany)
} else {
rows, err = tx.Query(strQuery, project)
}
} else if limitFlag {
rows, err = tx.Query(strQuery, kind, howmany)
}

if err != nil {
return nil, err
Expand Down Expand Up @@ -180,32 +244,58 @@ func (storage *DisneylandStorage) UpdateJob(job *Job) (*Job, error) {
return resultJob, err
}

func (storage *DisneylandStorage) PullJobs(how_many uint32, project string, kind string) (*ListOfJobs, error) {
func (storage *DisneylandStorage) PullJobs(howmany uint32, project string, kind string) (*ListOfJobs, error) {
tx, err := storage.db.Begin()
if err != nil {
return nil, err
}
strQuery := `WITH updatedPts AS (
WITH pulledPts AS (
SELECT id, project, kind
FROM jobs
WHERE status=$1 AND project LIKE '%' || $2 || '%' AND kind LIKE '%' || $3 || '%'
LIMIT $4
FOR UPDATE SKIP LOCKED)
UPDATE jobs pts
SET status=$5
FROM pulledPts
WHERE pulledPts.id=pts.id AND pulledPts.project=pts.project AND pulledPts.kind=pts.kind
RETURNING pts.id, pts.project, pts.status, pts.metadata, pts.input, pts.output, pts.kind)
SELECT *
FROM updatedPts
ORDER BY id ASC;`
var rows *sql.Rows
projectFlag := false
kindFlag := false
limitFlag := false

strQuery := PULLINGSTRQ_1
inc := 3
if project != "" {
strQuery += " AND project=$"
strQuery += strconv.Itoa(inc)
inc++
projectFlag = true
}
if kind != "" {
strQuery += " AND kind=$"
strQuery += strconv.Itoa(inc)
inc++
kindFlag = true
}
if howmany != 0 {
strQuery += " LIMIT $"
strQuery += strconv.Itoa(inc)
inc++
limitFlag = true
}
strQuery += PULLINGSTRQ_2

rows, err := tx.Query(strQuery, Job_PENDING, project, kind, how_many, Job_PULLED)
if projectFlag {
if kindFlag {
if limitFlag {
rows, err = tx.Query(strQuery, Job_PENDING, Job_PULLED, project, kind, howmany)
} else {
rows, err = tx.Query(strQuery, Job_PENDING, Job_PULLED, project, kind)
}
} else if limitFlag {
rows, err = tx.Query(strQuery, Job_PENDING, Job_PULLED, project, howmany)
} else {
rows, err = tx.Query(strQuery, Job_PENDING, Job_PULLED, project)
}
} else if limitFlag {
rows, err = tx.Query(strQuery, Job_PENDING, Job_PULLED, kind, howmany)
}

if err != nil {
return nil, err
}

ret, err := queryJobs(rows)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion disneyland/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type User struct {

func (u *User) IsUser() bool {
// if user
if u.ProjectAccess != "ANY" && u.KindAccess == "ANY" {
if u.ProjectAccess != "ANY" {
return true
}
return false
Expand Down