Skip to content

Commit

Permalink
batch and worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mstgnz committed Dec 29, 2024
1 parent 5222614 commit cb12211
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 57 deletions.
138 changes: 109 additions & 29 deletions handler/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/minio/minio-go/v7"
"github.com/mstgnz/cdn/pkg/batch"
"github.com/mstgnz/cdn/pkg/worker"
"github.com/mstgnz/cdn/service"
)

Expand All @@ -30,15 +35,59 @@ type Image interface {
}

type image struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
workerPool *worker.Pool
batchProc *batch.BatchProcessor
}

func NewImage(minioService *minio.Client, awsService service.AwsService) Image {
// ImageProcessRequest represents an image processing request
type ImageProcessRequest struct {
File []byte
Width uint
Height uint
ContentType string
Filename string
}

func NewImage(minioClient *minio.Client, awsService service.AwsService) Image {
// Initialize worker pool with 5 workers
wp := worker.NewPool(5)
wp.Start()

// Initialize batch processor with size 10 and 5 second timeout
bp := batch.NewBatchProcessor(10, 5*time.Second, processBatch)
bp.Start()

return &image{
minioService: *minioService,
awsService: awsService,
minioClient: minioClient,
awsService: awsService,
workerPool: wp,
batchProc: bp,
}
}

// processBatch handles batch processing of items
func processBatch(items []batch.BatchItem) []batch.BatchItem {
// Process items in parallel using goroutines
var wg sync.WaitGroup
for i := range items {
wg.Add(1)
go func(item *batch.BatchItem) {
defer wg.Done()

// Process the item based on its type
switch data := item.Data.(type) {
case *ImageProcessRequest:
// Process image
err := processImage(data)
item.Error = err
item.Success = err == nil
}
}(&items[i])
}
wg.Wait()
return items
}

func (i image) GetImage(c *fiber.Ctx) error {
Expand All @@ -57,12 +106,12 @@ func (i image) GetImage(c *fiber.Ctx) error {
}

// Bucket exists
if found, err := i.minioService.BucketExists(ctx, bucket); !found || err != nil {
if found, err := i.minioClient.BucketExists(ctx, bucket); !found || err != nil {
return c.SendFile("./public/notfound.png")
}

// Get Object
object, err := i.minioService.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{})
object, err := i.minioClient.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{})

if err != nil {
return c.SendFile("./public/notfound.png")
Expand Down Expand Up @@ -145,8 +194,8 @@ func (i image) DeleteImageWithAws(c *fiber.Ctx) error {
return i.deleteObject(c, ctx, bucket, object, true)
}

func (i image) ResizeImage(c *fiber.Ctx) error {
c.Status(http.StatusNotFound)
// ResizeImage handles image resizing using worker pool
func (i *image) ResizeImage(c *fiber.Ctx) error {
resize, width, height := service.GetWidthAndHeight(c, service.FormsType)
file, err := c.FormFile("file")

Expand All @@ -155,30 +204,49 @@ func (i image) ResizeImage(c *fiber.Ctx) error {
}

fileBuffer, err := file.Open()
defer func(fileBuffer multipart.File) {
_ = fileBuffer.Close()
}(fileBuffer)

if err != nil {
return service.Response(c, fiber.StatusBadRequest, false, err.Error(), nil)
}
defer fileBuffer.Close()

fileContent, err := io.ReadAll(fileBuffer)
if err != nil {
return service.Response(c, fiber.StatusInternalServerError, false, "Error reading file content", nil)
}

// Set Content-Length header
c.Set("Content-Length", strconv.Itoa(len(fileContent)))
if !resize || !service.IsImageFile(file.Filename) {
c.Set("Content-Length", strconv.Itoa(len(fileContent)))
c.Set("Content-Type", http.DetectContentType(fileContent))
return c.Send(fileContent)
}

// Create response channel
respChan := make(chan error)

// Create and submit job
job := worker.Job{
ID: uuid.New().String(),
Task: func() error {
req := &ImageProcessRequest{
File: fileContent,
Width: uint(width),
Height: uint(height),
ContentType: file.Header.Get("Content-Type"),
Filename: file.Filename,
}
return processImage(req)
},
Response: respChan,
}

// Set Content-Type header
c.Set("Content-Type", http.DetectContentType(fileContent))
i.workerPool.Submit(job)

if resize && service.IsImageFile(file.Filename) {
return c.Send(service.ImagickResize(fileContent, width, height))
// Wait for response
if err := <-respChan; err != nil {
return service.Response(c, fiber.StatusInternalServerError, false, "Image processing failed", nil)
}
c.Status(http.StatusFound)
return c.Send(fileContent)

return service.Response(c, fiber.StatusOK, true, "Image processed successfully", nil)
}

func (i image) UploadImageWithUrl(c *fiber.Ctx) error {
Expand All @@ -194,10 +262,10 @@ func (i image) UploadImageWithUrl(c *fiber.Ctx) error {
}

// Check to see if already exist bucket
exists, err := i.minioService.BucketExists(ctx, bucket)
exists, err := i.minioClient.BucketExists(ctx, bucket)
if err != nil && !exists {
// Bucket not found so Make a new bucket
err = i.minioService.MakeBucket(ctx, bucket, minio.MakeBucketOptions{})
err = i.minioClient.MakeBucket(ctx, bucket, minio.MakeBucketOptions{})
if err != nil {
return service.Response(c, fiber.StatusBadRequest, false, "Bucket Not Found And Not Created!", nil)
}
Expand All @@ -214,7 +282,7 @@ func (i image) UploadImageWithUrl(c *fiber.Ctx) error {
objectName := path + "/" + randomName + "." + extension

// Upload with PutObject
minioResult, err := i.minioService.PutObject(ctx, bucket, objectName, res.Body, int64(fileSize), minio.PutObjectOptions{ContentType: contentType})
minioResult, err := i.minioClient.PutObject(ctx, bucket, objectName, res.Body, int64(fileSize), minio.PutObjectOptions{ContentType: contentType})

url = service.GetEnv("APP_URL")
url = strings.TrimSuffix(url, "/")
Expand Down Expand Up @@ -243,10 +311,10 @@ func (i image) UploadImageWithUrl(c *fiber.Ctx) error {
// Minio And Aws Upload
func (i image) commonUpload(c *fiber.Ctx, ctx context.Context, path, bucket string, file *multipart.FileHeader, awsUpload bool) error {
// Check to see if the bucket already exists
exists, err := i.minioService.BucketExists(ctx, bucket)
exists, err := i.minioClient.BucketExists(ctx, bucket)
if err != nil && !exists {
// Bucket not found, so create a new one
err = i.minioService.MakeBucket(ctx, bucket, minio.MakeBucketOptions{})
err = i.minioClient.MakeBucket(ctx, bucket, minio.MakeBucketOptions{})
if err != nil {
return service.Response(c, fiber.StatusBadRequest, false, "Bucket Not Found And Not Created!", nil)
}
Expand Down Expand Up @@ -315,7 +383,7 @@ func (i image) commonUpload(c *fiber.Ctx, ctx context.Context, path, bucket stri
}

// Minio Upload
_, err = i.minioService.PutObject(ctx, bucket, objectName, fileBuffer, fileSize, minio.PutObjectOptions{ContentType: contentType})
_, err = i.minioClient.PutObject(ctx, bucket, objectName, fileBuffer, fileSize, minio.PutObjectOptions{ContentType: contentType})
minioResult := "Minio Successfully Uploaded"

if err != nil {
Expand Down Expand Up @@ -355,7 +423,7 @@ func (i image) commonUpload(c *fiber.Ctx, ctx context.Context, path, bucket stri
// Minio And Aws Delete
func (i image) deleteObject(c *fiber.Ctx, ctx context.Context, bucket, object string, awsDelete bool) error {
// Check if the bucket exists on Minio
if found, _ := i.minioService.BucketExists(ctx, bucket); !found {
if found, _ := i.minioClient.BucketExists(ctx, bucket); !found {
return service.Response(c, fiber.StatusBadRequest, false, "Bucket Not Found On Minio!", "")
}

Expand All @@ -365,7 +433,7 @@ func (i image) deleteObject(c *fiber.Ctx, ctx context.Context, bucket, object st
}

// Remove object from Minio
if err := i.minioService.RemoveObject(ctx, bucket, object, minio.RemoveObjectOptions{}); err != nil {
if err := i.minioClient.RemoveObject(ctx, bucket, object, minio.RemoveObjectOptions{}); err != nil {
return service.Response(c, fiber.StatusInternalServerError, false, err.Error(), "")
}

Expand All @@ -378,3 +446,15 @@ func (i image) deleteObject(c *fiber.Ctx, ctx context.Context, bucket, object st

return service.Response(c, fiber.StatusOK, true, "File Successfully Deleted", "")
}

// processImage handles the actual image processing
func processImage(req *ImageProcessRequest) error {
if service.IsImageFile(req.Filename) {
resized := service.ImagickResize(req.File, req.Width, req.Height)
if resized == nil {
return fmt.Errorf("image processing failed")
}
return nil
}
return nil
}
56 changes: 28 additions & 28 deletions handler/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestNewImage(t *testing.T) {

func Test_image_DeleteImage(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -50,8 +50,8 @@ func Test_image_DeleteImage(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.DeleteImage(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("DeleteImage() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -62,8 +62,8 @@ func Test_image_DeleteImage(t *testing.T) {

func Test_image_DeleteImageWithAws(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -79,8 +79,8 @@ func Test_image_DeleteImageWithAws(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.DeleteImageWithAws(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("DeleteImageWithAws() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -91,8 +91,8 @@ func Test_image_DeleteImageWithAws(t *testing.T) {

func Test_image_GetImage(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -108,8 +108,8 @@ func Test_image_GetImage(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.GetImage(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("GetImage() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -120,8 +120,8 @@ func Test_image_GetImage(t *testing.T) {

func Test_image_ResizeImage(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -137,8 +137,8 @@ func Test_image_ResizeImage(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.ResizeImage(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("ResizeImage() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -149,8 +149,8 @@ func Test_image_ResizeImage(t *testing.T) {

func Test_image_UploadImage(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -166,8 +166,8 @@ func Test_image_UploadImage(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.UploadImage(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("UploadImage() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -178,8 +178,8 @@ func Test_image_UploadImage(t *testing.T) {

func Test_image_UploadImageWithAws(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -195,8 +195,8 @@ func Test_image_UploadImageWithAws(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.UploadImageWithAws(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("UploadImageWithAws() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -207,8 +207,8 @@ func Test_image_UploadImageWithAws(t *testing.T) {

func Test_image_UploadImageWithUrl(t *testing.T) {
type fields struct {
minioService minio.Client
awsService service.AwsService
minioClient *minio.Client
awsService service.AwsService
}
type args struct {
c *fiber.Ctx
Expand All @@ -224,8 +224,8 @@ func Test_image_UploadImageWithUrl(t *testing.T) {
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
i := image{
minioService: tt.fields.minioService,
awsService: tt.fields.awsService,
minioClient: tt.fields.minioClient,
awsService: tt.fields.awsService,
}
if err := i.UploadImageWithUrl(tt.args.c); (err != nil) != tt.wantErr {
t.Errorf("UploadImageWithUrl() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
Loading

0 comments on commit cb12211

Please sign in to comment.