Skip to content

Commit

Permalink
refactor: serverless connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Jan 8, 2025
1 parent 0f94beb commit b413e6c
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 486 deletions.
23 changes: 12 additions & 11 deletions internal/core/plugin_manager/install_to_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

// InstallToAWSFromPkg installs a plugin to AWS Lambda
func (p *PluginManager) InstallToAWSFromPkg(
originalPackager []byte,
decoder decoder.PluginDecoder,
source string,
meta map[string]any,
Expand All @@ -32,7 +33,7 @@ func (p *PluginManager) InstallToAWSFromPkg(
return nil, err
}

response, err := serverless.UploadPlugin(decoder)
response, err := serverless.LaunchPlugin(originalPackager, decoder)
if err != nil {
return nil, err
}
Expand All @@ -49,17 +50,17 @@ func (p *PluginManager) InstallToAWSFromPkg(
newResponse.Close()
}()

lambdaUrl := ""
lambdaFunctionName := ""
functionUrl := ""
functionName := ""

response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
response.Async(func(r serverless.LaunchFunctionResponse) {
if r.Event == serverless.Info {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventInfo,
Data: "Installing...",
})
} else if r.Event == serverless.Done {
if lambdaUrl == "" || lambdaFunctionName == "" {
if functionUrl == "" || functionName == "" {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Internal server error, failed to get lambda url or function name",
Expand All @@ -76,8 +77,8 @@ func (p *PluginManager) InstallToAWSFromPkg(
serverlessModel := &models.ServerlessRuntime{
Checksum: checksum,
Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
FunctionURL: lambdaUrl,
FunctionName: lambdaFunctionName,
FunctionURL: functionUrl,
FunctionName: functionName,
PluginUniqueIdentifier: uniqueIdentity.String(),
Declaration: declaration,
}
Expand Down Expand Up @@ -106,10 +107,10 @@ func (p *PluginManager) InstallToAWSFromPkg(
Event: PluginInstallEventError,
Data: "Internal server error",
})
} else if r.Event == serverless.LambdaUrl {
lambdaUrl = r.Message
} else if r.Event == serverless.Lambda {
lambdaFunctionName = r.Message
} else if r.Event == serverless.FunctionUrl {
functionUrl = r.Message
} else if r.Event == serverless.Function {
functionName = r.Message
} else {
newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
}
Expand Down
151 changes: 107 additions & 44 deletions internal/core/plugin_manager/serverless_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"net/url"

"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

var ()

type LambdaFunction struct {
type ServerlessFunction struct {
FunctionName string `json:"function_name" validate:"required"`
FunctionARN string `json:"function_arn" validate:"required"`
FunctionDRN string `json:"function_drn" validate:"required"`
FunctionURL string `json:"function_url" validate:"required"`
}

Expand Down Expand Up @@ -47,72 +48,79 @@ func Ping() error {
}

var (
ErrNoLambdaFunction = errors.New("no lambda function found")
ErrFunctionNotFound = errors.New("no function found")
)

// Fetch the lambda function from serverless connector, return error if failed
func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
request := map[string]any{
"plugin": map[string]any{
"config": map[string]any{
"identity": identity,
"checksum": checksum,
},
},
}
// Fetch the function from serverless connector, return error if failed
func FetchFunction(manifest plugin_entities.PluginDeclaration, checksum string) (*ServerlessFunction, error) {
filename := fmt.Sprintf("%s-%s_%s@%s.difypkg", manifest.Author, manifest.Name, manifest.Version, checksum)

url, err := url.JoinPath(baseurl.String(), "/v1/lambda/fetch")
url, err := url.JoinPath(baseurl.String(), "/v1/runner/instances")
if err != nil {
return nil, err
}

response, err := http_requests.PostAndParse[entities.GenericResponse[LambdaFunction]](
response, err := http_requests.GetAndParse[RunnerInstances](
client,
url,
http_requests.HttpHeader(map[string]string{
"Authorization": SERVERLESS_CONNECTOR_API_KEY,
}),
http_requests.HttpPayloadJson(request),
http_requests.HttpParams(map[string]string{
"filename": filename,
}),
)

if err != nil {
return nil, err
}

if response.Code != 0 {
if response.Code == -404 {
return nil, ErrNoLambdaFunction
}
return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
if response.Error != "" {
return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Error)
}

return &response.Data, nil
if len(response.Items) == 0 {
return nil, ErrFunctionNotFound
}

return &ServerlessFunction{
FunctionName: response.Items[0].Name,
FunctionDRN: response.Items[0].ResourceName,
FunctionURL: response.Items[0].Endpoint,
}, nil
}

type LaunchAWSLambdaFunctionEvent string
type LaunchFunctionEvent string

const (
Error LaunchAWSLambdaFunctionEvent = "error"
Info LaunchAWSLambdaFunctionEvent = "info"
Lambda LaunchAWSLambdaFunctionEvent = "lambda"
LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
Done LaunchAWSLambdaFunctionEvent = "done"
Error LaunchFunctionEvent = "error"
Info LaunchFunctionEvent = "info"
Function LaunchFunctionEvent = "function"
FunctionUrl LaunchFunctionEvent = "function_url"
Done LaunchFunctionEvent = "done"
)

type LaunchAWSLambdaFunctionResponse struct {
Event LaunchAWSLambdaFunctionEvent `json:"event"`
Message string `json:"message"`
type LaunchFunctionResponse struct {
Event LaunchFunctionEvent `json:"event"`
Message string `json:"message"`
}

// Launch the lambda function from serverless connector, it will receive the context_tar as the input
// Setup the function from serverless connector, it will receive the context as the input
// and build it a docker image, then run it on serverless platform like AWS Lambda
// it returns a event stream, the caller should consider it as a async operation
func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
func SetupFunction(
manifest plugin_entities.PluginDeclaration,
checksum string,
context io.Reader,
) (*stream.Stream[LaunchFunctionResponse], error) {
url, err := url.JoinPath(baseurl.String(), "/v1/launch")
if err != nil {
return nil, err
}

response, err := http_requests.PostAndParseStream[LaunchAWSLambdaFunctionResponse](
// join a filename
filename := fmt.Sprintf("%s-%s_%s@%s.difypkg", manifest.Author, manifest.Name, manifest.Version, checksum)
serverless_connector_response, err := http_requests.PostAndParseStream[LaunchFunctionResponseChunk](
client,
url,
http_requests.HttpHeader(map[string]string{
Expand All @@ -121,20 +129,75 @@ func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*str
http_requests.HttpReadTimeout(240000),
http_requests.HttpWriteTimeout(240000),
http_requests.HttpPayloadMultipart(
map[string]string{
"identity": identity,
"checksum": checksum,
},
map[string]io.Reader{
"context": context_tar,
map[string]string{},
map[string]http_requests.HttpPayloadMultipartFile{
"context": {
Filename: filename,
Reader: context,
},
},
),
http_requests.HttpRaiseErrorWhenStreamDataNotMatch(true),
)

if err != nil {
return nil, err
}

response := stream.NewStream[LaunchFunctionResponse](10)

routine.Submit(map[string]string{
"module": "serverless_connector",
"func": "SetupFunction",
}, func() {
defer response.Close()
serverless_connector_response.Async(func(chunk LaunchFunctionResponseChunk) {
if chunk.State == LAUNCH_STATE_FAILED {
response.Write(LaunchFunctionResponse{
Event: Error,
Message: chunk.Message,
})
return
}

switch chunk.Stage {
case LAUNCH_STAGE_START, LAUNCH_STAGE_BUILD:
response.Write(LaunchFunctionResponse{
Event: Info,
Message: "Building plugin...",
})
case LAUNCH_STAGE_RUN:
if chunk.State == LAUNCH_STATE_SUCCESS {
data, err := parser.ParserCommaSeparatedValues[LaunchFunctionFinalStageMessage]([]byte(chunk.Message))
if err != nil {
response.Write(LaunchFunctionResponse{
Event: Error,
Message: err.Error(),
})
return
}

response.Write(LaunchFunctionResponse{
Event: Function,
Message: data.Name,
})
response.Write(LaunchFunctionResponse{
Event: FunctionUrl,
Message: data.Endpoint,
})
} else {
response.Write(LaunchFunctionResponse{
Event: Info,
Message: "Launching plugin...",
})
}
case LAUNCH_STAGE_END:
response.Write(LaunchFunctionResponse{
Event: Done,
Message: "Plugin launched",
})
}
})
})

return response, nil
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package serverless

import (
"os"
"bytes"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
Expand All @@ -13,9 +13,9 @@ var (
AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
)

// UploadPlugin uploads the plugin to the AWS Lambda
// return the lambda url and name
func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
// LaunchPlugin uploads the plugin to specific serverless connector
// return the function url and name
func LaunchPlugin(originPackage []byte, decoder decoder.PluginDecoder) (*stream.Stream[LaunchFunctionResponse], error) {
checksum, err := decoder.Checksum()
if err != nil {
return nil, err
Expand All @@ -32,41 +32,31 @@ func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambda
return nil, err
}

identity := manifest.Identity()
function, err := FetchLambda(identity, checksum)
function, err := FetchFunction(manifest, checksum)
if err != nil {
if err != ErrNoLambdaFunction {
if err != ErrFunctionNotFound {
return nil, err
}
} else {
// found, return directly
response := stream.NewStream[LaunchAWSLambdaFunctionResponse](3)
response.Write(LaunchAWSLambdaFunctionResponse{
Event: LambdaUrl,
response := stream.NewStream[LaunchFunctionResponse](3)
response.Write(LaunchFunctionResponse{
Event: FunctionUrl,
Message: function.FunctionURL,
})
response.Write(LaunchAWSLambdaFunctionResponse{
Event: Lambda,
response.Write(LaunchFunctionResponse{
Event: Function,
Message: function.FunctionName,
})
response.Write(LaunchAWSLambdaFunctionResponse{
response.Write(LaunchFunctionResponse{
Event: Done,
Message: "",
})
response.Close()
return response, nil
}

// create lambda function
packager := NewPackager(decoder)
context, err := packager.Pack()
if err != nil {
return nil, err
}
defer os.Remove(context.Name())
defer context.Close()

response, err := LaunchLambda(identity, checksum, context)
response, err := SetupFunction(manifest, checksum, bytes.NewReader(originPackage))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit b413e6c

Please sign in to comment.