From b413e6c96546cc50b1dbc94731d24f5c51e7a455 Mon Sep 17 00:00:00 2001
From: Yeuoly <admin@srmxy.cn>
Date: Wed, 8 Jan 2025 16:57:21 +0800
Subject: [PATCH] refactor: serverless connector

---
 .../plugin_manager/install_to_serverless.go   |  23 +--
 .../serverless_connector/connector.go         | 151 +++++++++++-----
 .../{upload.go => launch.go}                  |  36 ++--
 .../serverless_connector/packager.go          | 161 ------------------
 .../serverless_connector/packager_test.go     | 124 --------------
 .../packager_test_plugin/main.py              |   5 -
 .../packager_test_plugin/manifest.yaml        |  30 ----
 .../packager_test_plugin/provider/jina.yaml   |  82 ---------
 .../packager_test_plugin/requirements.txt     |   0
 .../serverless_connector/types.go             |  46 +++++
 internal/service/install_plugin.go            |   2 +-
 internal/utils/http_requests/http_options.go  |   7 +-
 internal/utils/http_requests/http_request.go  |   8 +-
 internal/utils/parser/comma.go                |  80 +++++++++
 internal/utils/parser/comma_test.go           | 129 ++++++++++++++
 15 files changed, 398 insertions(+), 486 deletions(-)
 rename internal/core/plugin_manager/serverless_connector/{upload.go => launch.go} (55%)
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager.go
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager_test.go
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager_test_plugin/main.py
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager_test_plugin/manifest.yaml
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager_test_plugin/provider/jina.yaml
 delete mode 100644 internal/core/plugin_manager/serverless_connector/packager_test_plugin/requirements.txt
 create mode 100644 internal/core/plugin_manager/serverless_connector/types.go
 create mode 100644 internal/utils/parser/comma.go
 create mode 100644 internal/utils/parser/comma_test.go

diff --git a/internal/core/plugin_manager/install_to_serverless.go b/internal/core/plugin_manager/install_to_serverless.go
index faf7c290..702df436 100644
--- a/internal/core/plugin_manager/install_to_serverless.go
+++ b/internal/core/plugin_manager/install_to_serverless.go
@@ -13,6 +13,7 @@ import (
 
 // InstallToAWSFromPkg installs a plugin to AWS Lambda
 func (p *PluginManager) InstallToAWSFromPkg(
+	originalPackager []byte,
 	decoder decoder.PluginDecoder,
 	source string,
 	meta map[string]any,
@@ -32,7 +33,7 @@ func (p *PluginManager) InstallToAWSFromPkg(
 		return nil, err
 	}
 
-	response, err := serverless.UploadPlugin(decoder)
+	response, err := serverless.LaunchPlugin(originalPackager, decoder)
 	if err != nil {
 		return nil, err
 	}
@@ -49,17 +50,17 @@ func (p *PluginManager) InstallToAWSFromPkg(
 			newResponse.Close()
 		}()
 
-		lambdaUrl := ""
-		lambdaFunctionName := ""
+		functionUrl := ""
+		functionName := ""
 
-		response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
+		response.Async(func(r serverless.LaunchFunctionResponse) {
 			if r.Event == serverless.Info {
 				newResponse.Write(PluginInstallResponse{
 					Event: PluginInstallEventInfo,
 					Data:  "Installing...",
 				})
 			} else if r.Event == serverless.Done {
-				if lambdaUrl == "" || lambdaFunctionName == "" {
+				if functionUrl == "" || functionName == "" {
 					newResponse.Write(PluginInstallResponse{
 						Event: PluginInstallEventError,
 						Data:  "Internal server error, failed to get lambda url or function name",
@@ -76,8 +77,8 @@ func (p *PluginManager) InstallToAWSFromPkg(
 					serverlessModel := &models.ServerlessRuntime{
 						Checksum:               checksum,
 						Type:                   models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
-						FunctionURL:            lambdaUrl,
-						FunctionName:           lambdaFunctionName,
+						FunctionURL:            functionUrl,
+						FunctionName:           functionName,
 						PluginUniqueIdentifier: uniqueIdentity.String(),
 						Declaration:            declaration,
 					}
@@ -106,10 +107,10 @@ func (p *PluginManager) InstallToAWSFromPkg(
 					Event: PluginInstallEventError,
 					Data:  "Internal server error",
 				})
-			} else if r.Event == serverless.LambdaUrl {
-				lambdaUrl = r.Message
-			} else if r.Event == serverless.Lambda {
-				lambdaFunctionName = r.Message
+			} else if r.Event == serverless.FunctionUrl {
+				functionUrl = r.Message
+			} else if r.Event == serverless.Function {
+				functionName = r.Message
 			} else {
 				newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
 			}
diff --git a/internal/core/plugin_manager/serverless_connector/connector.go b/internal/core/plugin_manager/serverless_connector/connector.go
index 1d520a2b..bc1e8393 100644
--- a/internal/core/plugin_manager/serverless_connector/connector.go
+++ b/internal/core/plugin_manager/serverless_connector/connector.go
@@ -7,15 +7,16 @@ import (
 	"net/url"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
+	"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
 )
 
-var ()
-
-type LambdaFunction struct {
+type ServerlessFunction struct {
 	FunctionName string `json:"function_name" validate:"required"`
-	FunctionARN  string `json:"function_arn" validate:"required"`
+	FunctionDRN  string `json:"function_drn" validate:"required"`
 	FunctionURL  string `json:"function_url" validate:"required"`
 }
 
@@ -47,72 +48,79 @@ func Ping() error {
 }
 
 var (
-	ErrNoLambdaFunction = errors.New("no lambda function found")
+	ErrFunctionNotFound = errors.New("no function found")
 )
 
-// Fetch the lambda function from serverless connector, return error if failed
-func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
-	request := map[string]any{
-		"plugin": map[string]any{
-			"config": map[string]any{
-				"identity": identity,
-				"checksum": checksum,
-			},
-		},
-	}
+// Fetch the function from serverless connector, return error if failed
+func FetchFunction(manifest plugin_entities.PluginDeclaration, checksum string) (*ServerlessFunction, error) {
+	filename := fmt.Sprintf("%s-%s_%s@%s.difypkg", manifest.Author, manifest.Name, manifest.Version, checksum)
 
-	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/fetch")
+	url, err := url.JoinPath(baseurl.String(), "/v1/runner/instances")
 	if err != nil {
 		return nil, err
 	}
 
-	response, err := http_requests.PostAndParse[entities.GenericResponse[LambdaFunction]](
+	response, err := http_requests.GetAndParse[RunnerInstances](
 		client,
 		url,
 		http_requests.HttpHeader(map[string]string{
 			"Authorization": SERVERLESS_CONNECTOR_API_KEY,
 		}),
-		http_requests.HttpPayloadJson(request),
+		http_requests.HttpParams(map[string]string{
+			"filename": filename,
+		}),
 	)
+
 	if err != nil {
 		return nil, err
 	}
 
-	if response.Code != 0 {
-		if response.Code == -404 {
-			return nil, ErrNoLambdaFunction
-		}
-		return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
+	if response.Error != "" {
+		return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Error)
 	}
 
-	return &response.Data, nil
+	if len(response.Items) == 0 {
+		return nil, ErrFunctionNotFound
+	}
+
+	return &ServerlessFunction{
+		FunctionName: response.Items[0].Name,
+		FunctionDRN:  response.Items[0].ResourceName,
+		FunctionURL:  response.Items[0].Endpoint,
+	}, nil
 }
 
-type LaunchAWSLambdaFunctionEvent string
+type LaunchFunctionEvent string
 
 const (
-	Error     LaunchAWSLambdaFunctionEvent = "error"
-	Info      LaunchAWSLambdaFunctionEvent = "info"
-	Lambda    LaunchAWSLambdaFunctionEvent = "lambda"
-	LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
-	Done      LaunchAWSLambdaFunctionEvent = "done"
+	Error       LaunchFunctionEvent = "error"
+	Info        LaunchFunctionEvent = "info"
+	Function    LaunchFunctionEvent = "function"
+	FunctionUrl LaunchFunctionEvent = "function_url"
+	Done        LaunchFunctionEvent = "done"
 )
 
-type LaunchAWSLambdaFunctionResponse struct {
-	Event   LaunchAWSLambdaFunctionEvent `json:"event"`
-	Message string                       `json:"message"`
+type LaunchFunctionResponse struct {
+	Event   LaunchFunctionEvent `json:"event"`
+	Message string              `json:"message"`
 }
 
-// Launch the lambda function from serverless connector, it will receive the context_tar as the input
+// Setup the function from serverless connector, it will receive the context as the input
 // and build it a docker image, then run it on serverless platform like AWS Lambda
 // it returns a event stream, the caller should consider it as a async operation
-func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
-	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
+func SetupFunction(
+	manifest plugin_entities.PluginDeclaration,
+	checksum string,
+	context io.Reader,
+) (*stream.Stream[LaunchFunctionResponse], error) {
+	url, err := url.JoinPath(baseurl.String(), "/v1/launch")
 	if err != nil {
 		return nil, err
 	}
 
-	response, err := http_requests.PostAndParseStream[LaunchAWSLambdaFunctionResponse](
+	// join a filename
+	filename := fmt.Sprintf("%s-%s_%s@%s.difypkg", manifest.Author, manifest.Name, manifest.Version, checksum)
+	serverless_connector_response, err := http_requests.PostAndParseStream[LaunchFunctionResponseChunk](
 		client,
 		url,
 		http_requests.HttpHeader(map[string]string{
@@ -121,20 +129,75 @@ func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*str
 		http_requests.HttpReadTimeout(240000),
 		http_requests.HttpWriteTimeout(240000),
 		http_requests.HttpPayloadMultipart(
-			map[string]string{
-				"identity": identity,
-				"checksum": checksum,
-			},
-			map[string]io.Reader{
-				"context": context_tar,
+			map[string]string{},
+			map[string]http_requests.HttpPayloadMultipartFile{
+				"context": {
+					Filename: filename,
+					Reader:   context,
+				},
 			},
 		),
 		http_requests.HttpRaiseErrorWhenStreamDataNotMatch(true),
 	)
-
 	if err != nil {
 		return nil, err
 	}
 
+	response := stream.NewStream[LaunchFunctionResponse](10)
+
+	routine.Submit(map[string]string{
+		"module": "serverless_connector",
+		"func":   "SetupFunction",
+	}, func() {
+		defer response.Close()
+		serverless_connector_response.Async(func(chunk LaunchFunctionResponseChunk) {
+			if chunk.State == LAUNCH_STATE_FAILED {
+				response.Write(LaunchFunctionResponse{
+					Event:   Error,
+					Message: chunk.Message,
+				})
+				return
+			}
+
+			switch chunk.Stage {
+			case LAUNCH_STAGE_START, LAUNCH_STAGE_BUILD:
+				response.Write(LaunchFunctionResponse{
+					Event:   Info,
+					Message: "Building plugin...",
+				})
+			case LAUNCH_STAGE_RUN:
+				if chunk.State == LAUNCH_STATE_SUCCESS {
+					data, err := parser.ParserCommaSeparatedValues[LaunchFunctionFinalStageMessage]([]byte(chunk.Message))
+					if err != nil {
+						response.Write(LaunchFunctionResponse{
+							Event:   Error,
+							Message: err.Error(),
+						})
+						return
+					}
+
+					response.Write(LaunchFunctionResponse{
+						Event:   Function,
+						Message: data.Name,
+					})
+					response.Write(LaunchFunctionResponse{
+						Event:   FunctionUrl,
+						Message: data.Endpoint,
+					})
+				} else {
+					response.Write(LaunchFunctionResponse{
+						Event:   Info,
+						Message: "Launching plugin...",
+					})
+				}
+			case LAUNCH_STAGE_END:
+				response.Write(LaunchFunctionResponse{
+					Event:   Done,
+					Message: "Plugin launched",
+				})
+			}
+		})
+	})
+
 	return response, nil
 }
diff --git a/internal/core/plugin_manager/serverless_connector/upload.go b/internal/core/plugin_manager/serverless_connector/launch.go
similarity index 55%
rename from internal/core/plugin_manager/serverless_connector/upload.go
rename to internal/core/plugin_manager/serverless_connector/launch.go
index 52aaf044..e2f40ade 100644
--- a/internal/core/plugin_manager/serverless_connector/upload.go
+++ b/internal/core/plugin_manager/serverless_connector/launch.go
@@ -1,7 +1,7 @@
 package serverless
 
 import (
-	"os"
+	"bytes"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
@@ -13,9 +13,9 @@ var (
 	AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
 )
 
-// UploadPlugin uploads the plugin to the AWS Lambda
-// return the lambda url and name
-func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
+// LaunchPlugin uploads the plugin to specific serverless connector
+// return the function url and name
+func LaunchPlugin(originPackage []byte, decoder decoder.PluginDecoder) (*stream.Stream[LaunchFunctionResponse], error) {
 	checksum, err := decoder.Checksum()
 	if err != nil {
 		return nil, err
@@ -32,24 +32,23 @@ func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambda
 		return nil, err
 	}
 
-	identity := manifest.Identity()
-	function, err := FetchLambda(identity, checksum)
+	function, err := FetchFunction(manifest, checksum)
 	if err != nil {
-		if err != ErrNoLambdaFunction {
+		if err != ErrFunctionNotFound {
 			return nil, err
 		}
 	} else {
 		// found, return directly
-		response := stream.NewStream[LaunchAWSLambdaFunctionResponse](3)
-		response.Write(LaunchAWSLambdaFunctionResponse{
-			Event:   LambdaUrl,
+		response := stream.NewStream[LaunchFunctionResponse](3)
+		response.Write(LaunchFunctionResponse{
+			Event:   FunctionUrl,
 			Message: function.FunctionURL,
 		})
-		response.Write(LaunchAWSLambdaFunctionResponse{
-			Event:   Lambda,
+		response.Write(LaunchFunctionResponse{
+			Event:   Function,
 			Message: function.FunctionName,
 		})
-		response.Write(LaunchAWSLambdaFunctionResponse{
+		response.Write(LaunchFunctionResponse{
 			Event:   Done,
 			Message: "",
 		})
@@ -57,16 +56,7 @@ func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambda
 		return response, nil
 	}
 
-	// create lambda function
-	packager := NewPackager(decoder)
-	context, err := packager.Pack()
-	if err != nil {
-		return nil, err
-	}
-	defer os.Remove(context.Name())
-	defer context.Close()
-
-	response, err := LaunchLambda(identity, checksum, context)
+	response, err := SetupFunction(manifest, checksum, bytes.NewReader(originPackage))
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/core/plugin_manager/serverless_connector/packager.go b/internal/core/plugin_manager/serverless_connector/packager.go
deleted file mode 100644
index a8715dfd..00000000
--- a/internal/core/plugin_manager/serverless_connector/packager.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package serverless
-
-import (
-	"archive/tar"
-	"compress/gzip"
-	"errors"
-	"io"
-	"io/fs"
-	"os"
-	"path"
-	"strings"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_runtime/dockerfile"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
-)
-
-type Packager struct {
-	decoder decoder.PluginDecoder
-}
-
-func NewPackager(decoder decoder.PluginDecoder) *Packager {
-	return &Packager{
-		decoder: decoder,
-	}
-}
-
-type dockerFileInfo struct {
-	fs.FileInfo
-
-	size int64
-}
-
-func (d *dockerFileInfo) Size() int64 {
-	return d.size
-}
-
-func (d *dockerFileInfo) Name() string {
-	return "Dockerfile"
-}
-
-func (d *dockerFileInfo) Mode() os.FileMode {
-	return 0644
-}
-
-func (d *dockerFileInfo) ModTime() time.Time {
-	return time.Now()
-}
-
-func (d *dockerFileInfo) IsDir() bool {
-	return false
-}
-
-func (d *dockerFileInfo) Sys() any {
-	return nil
-}
-
-// Pack takes a plugin and packs it into a tar file with dockerfile inside
-// returns a *os.File with the tar file
-func (p *Packager) Pack() (*os.File, error) {
-	declaration, err := p.decoder.Manifest()
-	if err != nil {
-		return nil, err
-	}
-
-	// walk through the plugin directory and add it to a tar file
-	// create a tmpfile
-	tmpfile, cleanup, err := tmpfile.CreateTempFile("plugin-aws-tar-*")
-	if err != nil {
-		return nil, err
-	}
-	success := false
-
-	defer func() {
-		if !success {
-			cleanup()
-		}
-	}()
-
-	gzipWriter, err := gzip.NewWriterLevel(tmpfile, gzip.BestCompression)
-	if err != nil {
-		return nil, err
-	}
-	defer gzipWriter.Close()
-
-	tarWriter := tar.NewWriter(gzipWriter)
-	defer tarWriter.Close()
-
-	if err := p.decoder.Walk(func(filename, dir string) error {
-		if strings.ToLower(filename) == "dockerfile" {
-			return errors.New("dockerfile is not allowed to be in the plugin directory")
-		}
-
-		fullFilename := path.Join(dir, filename)
-
-		state, err := p.decoder.Stat(fullFilename)
-		if err != nil {
-			return err
-		}
-
-		tarHeader, err := tar.FileInfoHeader(state, fullFilename)
-		if err != nil {
-			return err
-		}
-		tarHeader.Name = fullFilename
-
-		// write tar header
-		if err := tarWriter.WriteHeader(tarHeader); err != nil {
-			return err
-		}
-
-		// write file content
-		fileReader, err := p.decoder.FileReader(fullFilename)
-		if err != nil {
-			return err
-		}
-		if _, err := io.Copy(tarWriter, fileReader); err != nil {
-			fileReader.Close()
-			return err
-		}
-		// release resources
-		fileReader.Close()
-
-		return nil
-	}); err != nil {
-		return nil, err
-	}
-
-	// add dockerfile
-	dockerfile, err := dockerfile.GenerateDockerfile(&declaration)
-	if err != nil {
-		return nil, err
-	}
-
-	tarHeader, err := tar.FileInfoHeader(&dockerFileInfo{
-		size: int64(len(dockerfile)),
-	}, "Dockerfile")
-	if err != nil {
-		return nil, err
-	}
-
-	// create a fake dockerfile stat
-	if err := tarWriter.WriteHeader(tarHeader); err != nil {
-		return nil, err
-	}
-
-	if _, err := tarWriter.Write([]byte(dockerfile)); err != nil {
-		return nil, err
-	}
-
-	// close writers to flush data
-	tarWriter.Close()
-	gzipWriter.Close()
-
-	tmpfile.Seek(0, io.SeekStart)
-
-	success = true
-
-	return tmpfile, nil
-}
diff --git a/internal/core/plugin_manager/serverless_connector/packager_test.go b/internal/core/plugin_manager/serverless_connector/packager_test.go
deleted file mode 100644
index 0c990733..00000000
--- a/internal/core/plugin_manager/serverless_connector/packager_test.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package serverless
-
-import (
-	"archive/tar"
-	"compress/gzip"
-	"embed"
-	"io"
-	"io/fs"
-	"os"
-	"path"
-	"path/filepath"
-	"testing"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-)
-
-//go:embed packager_test_plugin/*
-var test_plugin embed.FS
-
-func TestPackager_Pack(t *testing.T) {
-	// create a temp dir
-	tmpDir, err := os.MkdirTemp("", "test_plugin")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(tmpDir)
-
-	// copy the test_plugin to the temp dir
-	if err := fs.WalkDir(test_plugin, ".", func(path string, d fs.DirEntry, err error) error {
-		if err != nil {
-			return err
-		}
-
-		if d.IsDir() {
-			// create the dir
-			os.MkdirAll(filepath.Join(tmpDir, path), 0755)
-		} else {
-			// copy the file
-			originFile, err := test_plugin.Open(path)
-			if err != nil {
-				return err
-			}
-			defer originFile.Close()
-
-			content, err := io.ReadAll(originFile)
-			if err != nil {
-				return err
-			}
-
-			if err := os.WriteFile(filepath.Join(tmpDir, path), content, 0644); err != nil {
-				return err
-			}
-		}
-
-		return nil
-	}); err != nil {
-		t.Fatal(err)
-	}
-
-	decoder, err := decoder.NewFSPluginDecoder(path.Join(tmpDir, "packager_test_plugin"))
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	packager := NewPackager(decoder)
-
-	f, err := packager.Pack()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() {
-		f.Close()
-		os.Remove(f.Name())
-	}()
-
-	gzipReader, err := gzip.NewReader(f)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer gzipReader.Close()
-
-	// Create a new tar reader
-	tarReader := tar.NewReader(gzipReader)
-
-	dockerfileFound := false
-	requirementsFound := false
-	mainPyFound := false
-	jinaYamlFound := false
-	// Iterate through the files in the tar.gz archive
-	for {
-		header, err := tarReader.Next()
-		if err == io.EOF {
-			break // End of archive
-		}
-		if err != nil {
-			t.Fatal(err)
-		}
-
-		switch header.Name {
-		case "Dockerfile":
-			dockerfileFound = true
-		case "requirements.txt":
-			requirementsFound = true
-		case "main.py":
-			mainPyFound = true
-		case "provider/jina.yaml":
-			jinaYamlFound = true
-		}
-	}
-
-	// Check if all required files are present
-	if !dockerfileFound {
-		t.Error("Dockerfile not found in the packed archive")
-	}
-	if !requirementsFound {
-		t.Error("requirements.txt not found in the packed archive")
-	}
-	if !mainPyFound {
-		t.Error("main.py not found in the packed archive")
-	}
-	if !jinaYamlFound {
-		t.Error("jina.yaml not found in the packed archive")
-	}
-}
diff --git a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/main.py b/internal/core/plugin_manager/serverless_connector/packager_test_plugin/main.py
deleted file mode 100644
index 735ae134..00000000
--- a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/main.py
+++ /dev/null
@@ -1,5 +0,0 @@
-def main():
-    print("Hello, World!")
-
-if __name__ == "__main__":
-    main()
\ No newline at end of file
diff --git a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/manifest.yaml b/internal/core/plugin_manager/serverless_connector/packager_test_plugin/manifest.yaml
deleted file mode 100644
index 4d0fbdf2..00000000
--- a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/manifest.yaml
+++ /dev/null
@@ -1,30 +0,0 @@
-version: 0.0.1
-type: plugin
-author: "Yeuoly"
-name: "jina"
-label:
-  en_US: "Jina"
-created_at: "2024-07-12T08:03:44.658609186Z"
-resource:
-  memory: 1048576
-  permission:
-    tool:
-      enabled: true
-    model:
-      enabled: true
-      llm: true
-plugins:
-  models:
-    - "provider/jina.yaml"
-execution:
-  install: install.sh
-  launch: launch.sh
-meta:
-  version: 0.0.1
-  arch:
-    - "amd64"
-    - "arm64"
-  runner:
-    language: "python"
-    version: "3.12"
-    entrypoint: "main"
diff --git a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/provider/jina.yaml b/internal/core/plugin_manager/serverless_connector/packager_test_plugin/provider/jina.yaml
deleted file mode 100644
index b9ca9ed6..00000000
--- a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/provider/jina.yaml
+++ /dev/null
@@ -1,82 +0,0 @@
-provider: jina
-label:
-  en_US: Jina
-description:
-  en_US: Embedding and Rerank Model Supported
-icon_small:
-  en_US: icon_s_en.svg
-icon_large:
-  en_US: icon_l_en.svg
-background: "#EFFDFD"
-help:
-  title:
-    en_US: Get your API key from Jina AI
-    zh_Hans: 从 Jina 获取 API Key
-  url:
-    en_US: https://jina.ai/
-supported_model_types:
-  - text-embedding
-  - rerank
-configurate_methods:
-  - predefined-model
-  - customizable-model
-provider_credential_schema:
-  credential_form_schemas:
-    - variable: api_key
-      label:
-        en_US: API Key
-      type: secret-input
-      required: true
-      placeholder:
-        zh_Hans: 在此输入您的 API Key
-        en_US: Enter your API Key
-model_credential_schema:
-  model:
-    label:
-      en_US: Model Name
-      zh_Hans: 模型名称
-    placeholder:
-      en_US: Enter your model name
-      zh_Hans: 输入模型名称
-  credential_form_schemas:
-    - variable: api_key
-      label:
-        en_US: API Key
-      type: secret-input
-      required: true
-      placeholder:
-        zh_Hans: 在此输入您的 API Key
-        en_US: Enter your API Key
-    - variable: base_url
-      label:
-        zh_Hans: 服务器 URL
-        en_US: Base URL
-      type: text-input
-      required: true
-      placeholder:
-        zh_Hans: Base URL, e.g. https://api.jina.ai/v1
-        en_US: Base URL, e.g. https://api.jina.ai/v1
-      default: 'https://api.jina.ai/v1'
-    - variable: context_size
-      label:
-        zh_Hans: 上下文大小
-        en_US: Context size
-      placeholder:
-        zh_Hans: 输入上下文大小
-        en_US: Enter context size
-      required: false
-      type: text-input
-      default: '8192'
-models:
-  rerank:
-    predefined:
-      - "models/**/*.yaml"
-  text-embedding:
-    predefined:
-      - "models/**/*.yaml"
-extra:
-  python:
-    provider_source: provider/jina.py
-    model_sources:
-      - "models/text_embedding/text_embedding.py"
-      - "models/rerank/rerank.py"
diff --git a/internal/core/plugin_manager/serverless_connector/packager_test_plugin/requirements.txt b/internal/core/plugin_manager/serverless_connector/packager_test_plugin/requirements.txt
deleted file mode 100644
index e69de29b..00000000
diff --git a/internal/core/plugin_manager/serverless_connector/types.go b/internal/core/plugin_manager/serverless_connector/types.go
new file mode 100644
index 00000000..bc6868f8
--- /dev/null
+++ b/internal/core/plugin_manager/serverless_connector/types.go
@@ -0,0 +1,46 @@
+package serverless
+
+type RunnerInstance struct {
+	ID           string `json:"ID" validate:"required"`
+	Name         string `json:"Name" validate:"required"`
+	Endpoint     string `json:"Endpoint" validate:"required"`
+	ResourceName string `json:"ResourceName" validate:"required"`
+	Status       struct {
+		State string `json:"State" validate:"required"`
+	} `json:"Status" validate:"required"`
+}
+
+type RunnerInstances struct {
+	Error string           `json:"error"`
+	Items []RunnerInstance `json:"Items"`
+}
+
+type LaunchStage string
+
+const (
+	LAUNCH_STAGE_START LaunchStage = "start"
+	LAUNCH_STAGE_BUILD LaunchStage = "build"
+	LAUNCH_STAGE_RUN   LaunchStage = "run"
+	LAUNCH_STAGE_END   LaunchStage = "end"
+)
+
+type LaunchState string
+
+const (
+	LAUNCH_STATE_SUCCESS LaunchState = "success"
+	LAUNCH_STATE_RUNNING LaunchState = "running"
+	LAUNCH_STATE_FAILED  LaunchState = "failed"
+)
+
+type LaunchFunctionResponseChunk struct {
+	Stage   LaunchStage `json:"Stage"`
+	Obj     string      `json:"Obj"`
+	State   LaunchState `json:"State"`
+	Message string      `json:"Message"`
+}
+
+type LaunchFunctionFinalStageMessage struct {
+	Endpoint string `comma:"endpoint"`
+	Name     string `comma:"name"`
+	ID       string `comma:"id"`
+}
diff --git a/internal/service/install_plugin.go b/internal/service/install_plugin.go
index 57739d28..9dbdb8d5 100644
--- a/internal/service/install_plugin.go
+++ b/internal/service/install_plugin.go
@@ -215,7 +215,7 @@ func InstallPluginRuntimeToTenant(
 					})
 					return
 				}
-				stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, metas[i])
+				stream, err = manager.InstallToAWSFromPkg(pkgFile, zipDecoder, source, metas[i])
 			} else if config.Platform == app.PLATFORM_LOCAL {
 				stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, metas[i])
 			} else {
diff --git a/internal/utils/http_requests/http_options.go b/internal/utils/http_requests/http_options.go
index 3957d238..2b2b5d13 100644
--- a/internal/utils/http_requests/http_options.go
+++ b/internal/utils/http_requests/http_options.go
@@ -41,9 +41,14 @@ func HttpPayloadJson(payload interface{}) HttpOptions {
 	return HttpOptions{"payloadJson", payload}
 }
 
+type HttpPayloadMultipartFile struct {
+	Filename string
+	Reader   io.Reader
+}
+
 // which is used for POST method only
 // payload follows the form data format, and files is a map from filename to file
-func HttpPayloadMultipart(payload map[string]string, files map[string]io.Reader) HttpOptions {
+func HttpPayloadMultipart(payload map[string]string, files map[string]HttpPayloadMultipartFile) HttpOptions {
 	return HttpOptions{"payloadMultipart", map[string]interface{}{
 		"payload": payload,
 		"files":   files,
diff --git a/internal/utils/http_requests/http_request.go b/internal/utils/http_requests/http_request.go
index bab8f296..b8de61b6 100644
--- a/internal/utils/http_requests/http_request.go
+++ b/internal/utils/http_requests/http_request.go
@@ -44,14 +44,14 @@ func buildHttpRequest(method string, url string, options ...HttpOptions) (*http.
 			buffer := new(bytes.Buffer)
 			writer := multipart.NewWriter(buffer)
 
-			files := option.Value.(map[string]any)["files"].(map[string]io.Reader)
-			for filename, reader := range files {
-				part, err := writer.CreateFormFile(filename, filename)
+			files := option.Value.(map[string]any)["files"].(map[string]HttpPayloadMultipartFile)
+			for filename, file := range files {
+				part, err := writer.CreateFormFile(filename, file.Filename)
 				if err != nil {
 					writer.Close()
 					return nil, err
 				}
-				_, err = io.Copy(part, reader)
+				_, err = io.Copy(part, file.Reader)
 				if err != nil {
 					writer.Close()
 					return nil, err
diff --git a/internal/utils/parser/comma.go b/internal/utils/parser/comma.go
new file mode 100644
index 00000000..5be9379b
--- /dev/null
+++ b/internal/utils/parser/comma.go
@@ -0,0 +1,80 @@
+package parser
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"strconv"
+)
+
+// ParserCommaSeparatedValues parses the comma separated values
+// and returns a map of key-value pairs
+// examples:
+// data: a=1,b=2
+//
+//	T: type struct {
+//		A int `comma:"a"`
+//		B string `comma:"b"`
+//	}
+//
+//	return:
+//	T{A: 1, B: "2"}
+func ParserCommaSeparatedValues[T any](data []byte) (T, error) {
+	var result T
+	if len(data) == 0 {
+		return result, nil
+	}
+
+	// Split by comma
+	pairs := bytes.Split(data, []byte(","))
+
+	// Create map to store key-value pairs
+	values := make(map[string]string)
+
+	// Parse each key-value pair
+	for _, pair := range pairs {
+		kv := bytes.Split(pair, []byte("="))
+		if len(kv) != 2 {
+			return result, fmt.Errorf("invalid key-value pair: %s", pair)
+		}
+		key := string(bytes.TrimSpace(kv[0]))
+		value := string(bytes.TrimSpace(kv[1]))
+		values[key] = value
+	}
+
+	// Convert map to struct using reflection
+	resultValue := reflect.ValueOf(&result).Elem()
+	resultType := resultValue.Type()
+
+	for i := 0; i < resultType.NumField(); i++ {
+		field := resultType.Field(i)
+		fieldValue := resultValue.Field(i)
+
+		// Get comma tag value
+		tag := field.Tag.Get("comma")
+		if tag == "" {
+			tag = field.Name
+		}
+
+		if value, ok := values[tag]; ok {
+			switch field.Type.Kind() {
+			case reflect.String:
+				fieldValue.SetString(value)
+			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+				if intVal, err := strconv.ParseInt(value, 10, 64); err == nil {
+					fieldValue.SetInt(intVal)
+				}
+			case reflect.Float32, reflect.Float64:
+				if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
+					fieldValue.SetFloat(floatVal)
+				}
+			case reflect.Bool:
+				if boolVal, err := strconv.ParseBool(value); err == nil {
+					fieldValue.SetBool(boolVal)
+				}
+			}
+		}
+	}
+
+	return result, nil
+}
diff --git a/internal/utils/parser/comma_test.go b/internal/utils/parser/comma_test.go
new file mode 100644
index 00000000..33d3214f
--- /dev/null
+++ b/internal/utils/parser/comma_test.go
@@ -0,0 +1,129 @@
+package parser
+
+import (
+	"testing"
+)
+
+func TestCommaSeparatedValues(t *testing.T) {
+	type testStruct struct {
+		A        int     `comma:"a"`
+		B        string  `comma:"b"`
+		C        bool    `comma:"c"`
+		D        float64 `comma:"d"`
+		E        string  // no tag
+		Endpoint string  `comma:"endpoint"`
+		Name     string  `comma:"name"`
+		ID       string  `comma:"id"`
+	}
+
+	tests := []struct {
+		name    string
+		input   []byte
+		want    testStruct
+		wantErr bool
+	}{
+		{
+			name:  "basic test",
+			input: []byte("a=1,b=hello,c=true,d=3.14,E=world"),
+			want: testStruct{
+				A: 1,
+				B: "hello",
+				C: true,
+				D: 3.14,
+				E: "world",
+			},
+			wantErr: false,
+		},
+		{
+			name:    "empty input",
+			input:   []byte(""),
+			want:    testStruct{},
+			wantErr: false,
+		},
+		{
+			name:  "partial fields",
+			input: []byte("a=42,c=false"),
+			want: testStruct{
+				A: 42,
+				C: false,
+			},
+			wantErr: false,
+		},
+		{
+			name:  "extra fields ignored",
+			input: []byte("a=1,b=test,extra=ignored"),
+			want: testStruct{
+				A: 1,
+				B: "test",
+			},
+			wantErr: false,
+		},
+		{
+			name:    "invalid format - missing value",
+			input:   []byte("a=1,b"),
+			wantErr: true,
+		},
+		{
+			name:    "invalid format - missing equals",
+			input:   []byte("a=1,b:2"),
+			wantErr: true,
+		},
+		{
+			name:  "whitespace handling",
+			input: []byte("a = 1, b = hello "),
+			want: testStruct{
+				A: 1,
+				B: "hello",
+			},
+			wantErr: false,
+		},
+		{
+			name:  "type conversion - invalid int",
+			input: []byte("a=notanumber,b=test"),
+			want: testStruct{
+				B: "test",
+			},
+			wantErr: false,
+		},
+		{
+			name:  "type conversion - invalid bool",
+			input: []byte("c=notabool,b=test"),
+			want: testStruct{
+				B: "test",
+			},
+			wantErr: false,
+		},
+		{
+			name:  "type conversion - invalid float",
+			input: []byte("d=notafloat,b=test"),
+			want: testStruct{
+				B: "test",
+			},
+			wantErr: false,
+		},
+		{
+			name:  "actual example",
+			input: []byte("endpoint=test-plugin.dify.uwu,name=c31a98df2ef139d6532d8da8caa2bb63,id=c31a98df2ef139d6532d8da8caa2bb63"),
+			want: testStruct{
+				Endpoint: "test-plugin.dify.uwu",
+				Name:     "c31a98df2ef139d6532d8da8caa2bb63",
+				ID:       "c31a98df2ef139d6532d8da8caa2bb63",
+			},
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := ParserCommaSeparatedValues[testStruct](tt.input)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParserCommaSeparatedValues() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !tt.wantErr && got != tt.want {
+				t.Errorf("ParserCommaSeparatedValues() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+
+}