Skip to content

Commit

Permalink
feat: update webhookBeforeMemberJoinGroup to batch method. (#2459)
Browse files Browse the repository at this point in the history
* update protocol in go mod.

* add debug log in writePongMsg.

* update log level.

* add Warn log in writePongMsg.

* add debug log.

* feat: update webhookBeforeMemberJoinGroup to batch method.

* feat: update version field implement.

* update webhook implement contents.

* update method field and contents.

* update callbackCommand field.

* fix: add correct fields.

* update struct tags.
  • Loading branch information
mo3et authored Jul 30, 2024
1 parent ed0ab58 commit fb68961
Show file tree
Hide file tree
Showing 23 changed files with 122 additions and 81 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

MONGO_IMAGE=mongo:6.0.2
REDIS_IMAGE=redis:7.0.0
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
Expand Down
3 changes: 0 additions & 3 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,9 @@ func (c *Client) writePongMsg(appData string) error {
return nil
}

log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
c.w.Lock()
defer c.w.Unlock()

log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData)
Expand All @@ -412,6 +410,5 @@ func (c *Client) writePongMsg(appData string) error {
log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage)
}

log.ZDebug(c.ctx, "write message is success", "appdata", appData, "closed err", c.closedErr)
return errs.Wrap(err)
}
58 changes: 40 additions & 18 deletions internal/rpc/group/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package group

import (
"context"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
Expand All @@ -27,7 +29,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"time"
)

// CallbackBeforeCreateGroup callback before create group.
Expand Down Expand Up @@ -100,27 +101,45 @@ func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after)
}

func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *model.GroupMember, groupEx string) error {
func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupID string, groupEx string) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand,
GroupID: groupMember.GroupID,
UserID: groupMember.UserID,
Ex: groupMember.Ex,
groupMembersMap := datautil.SliceToMap(groupMembers, func(e *model.GroupMember) string {
return e.UserID
})
var groupMembersCallback []*callbackstruct.CallbackGroupMember

for _, member := range groupMembers {
groupMembersCallback = append(groupMembersCallback, &callbackstruct.CallbackGroupMember{
UserID: member.UserID,
Ex: member.Ex,
})
}

cbReq := &callbackstruct.CallbackBeforeMembersJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMembersJoinGroupCommand,
GroupID: groupID,
MembersList: groupMembersCallback,
GroupEx: groupEx,
}
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
resp := &callbackstruct.CallbackBeforeMembersJoinGroupResp{}

if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}

if resp.MuteEndTime != nil {
groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime)
for _, memberCallbackResp := range resp.MemberCallbackList {
if _, ok := groupMembersMap[(*memberCallbackResp.UserID)]; ok {
if memberCallbackResp.MuteEndTime != nil {
groupMembersMap[(*memberCallbackResp.UserID)].MuteEndTime = time.UnixMilli(*memberCallbackResp.MuteEndTime)
}

datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].FaceURL, memberCallbackResp.FaceURL)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Ex, memberCallbackResp.Ex)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Nickname, memberCallbackResp.Nickname)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].RoleLevel, memberCallbackResp.RoleLevel)
}
}
datautil.NotNilReplace(&groupMember.FaceURL, resp.FaceURL)
datautil.NotNilReplace(&groupMember.Ex, resp.Ex)
datautil.NotNilReplace(&groupMember.Nickname, resp.Nickname)
datautil.NotNilReplace(&groupMember.RoleLevel, resp.RoleLevel)

return nil
})
}
Expand Down Expand Up @@ -244,10 +263,13 @@ func (s *groupServer) webhookBeforeInviteUserToGroup(ctx context.Context, before
return err
}

if len(resp.RefusedMembersAccount) > 0 {
// Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic
}
// Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic

// if len(resp.RefusedMembersAccount) > 0 {
// implement members are refused
// }

return nil
})
}
Expand Down
40 changes: 20 additions & 20 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, err
}

joinGroup := func(userID string, roleLevel int32) error {
joinGroupFunc := func(userID string, roleLevel int32) {
groupMember := &model.GroupMember{
GroupID: group.GroupID,
UserID: userID,
Expand All @@ -258,25 +258,23 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
MuteEndTime: time.UnixMilli(0),
}

if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return err
}
groupMembers = append(groupMembers, groupMember)
return nil
}
if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil {
return nil, err
}

joinGroupFunc(req.OwnerUserID, constant.GroupOwner)

for _, userID := range req.AdminUserIDs {
if err := joinGroup(userID, constant.GroupAdmin); err != nil {
return nil, err
}
joinGroupFunc(userID, constant.GroupAdmin)
}

for _, userID := range req.MemberUserIDs {
if err := joinGroup(userID, constant.GroupOrdinaryUsers); err != nil {
return nil, err
}
joinGroupFunc(userID, constant.GroupOrdinaryUsers)
}

if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}

if err := s.db.CreateGroup(ctx, []*model.Group{group}, groupMembers); err != nil {
return nil, err
}
Expand Down Expand Up @@ -442,12 +440,13 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
MuteEndTime: time.UnixMilli(0),
}

if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
groupMembers = append(groupMembers, member)
}

if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}

if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
Expand Down Expand Up @@ -811,9 +810,9 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
MuteEndTime: time.Unix(0, 0),
InviterUserID: groupRequest.InviterUserID,
OperatorUserID: mcontext.GetOpUserID(ctx),
Ex: groupRequest.Ex,
}
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {

if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{member}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
}
Expand Down Expand Up @@ -882,7 +881,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
MuteEndTime: time.UnixMilli(0),
}

if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{groupMember}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}

Expand All @@ -898,6 +897,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)

return &pbgroup.JoinGroupResp{}, nil
}

groupRequest := model.GroupRequest{
UserID: req.InviterUserID,
ReqMsg: req.ReqMessage,
Expand Down
2 changes: 1 addition & 1 deletion pkg/callbackstruct/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"
CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand"
CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand"
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
)
23 changes: 16 additions & 7 deletions pkg/callbackstruct/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,32 @@ type CallbackAfterCreateGroupResp struct {
CommonCallbackResp
}

type CallbackBeforeMemberJoinGroupReq struct {
type CallbackGroupMember struct {
UserID string `json:"userID"`
Ex string `json:"ex"`
}

type CallbackBeforeMembersJoinGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
Ex string `json:"ex"`
GroupEx string `json:"groupEx"`
GroupID string `json:"groupID"`
MembersList []*CallbackGroupMember `json:"memberList"`
GroupEx string `json:"groupEx"`
}

type CallbackBeforeMemberJoinGroupResp struct {
CommonCallbackResp
type MemberJoinGroupCallBack struct {
UserID *string `json:"userID"`
Nickname *string `json:"nickname"`
FaceURL *string `json:"faceURL"`
RoleLevel *int32 `json:"roleLevel"`
MuteEndTime *int64 `json:"muteEndTime"`
Ex *string `json:"ex"`
}

type CallbackBeforeMembersJoinGroupResp struct {
CommonCallbackResp
MemberCallbackList []*MemberJoinGroupCallBack `json:"memberCallbackList"`
}

type CallbackBeforeSetGroupMemberInfoReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -38,7 +39,7 @@ func NewApiCmd() *ApiCmd {
DiscoveryConfigFilename: &apiConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -40,7 +41,7 @@ func NewAuthRpcCmd() *AuthRpcCmd {
DiscoveryConfigFilename: &authConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -43,7 +44,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
DiscoveryConfigFilename: &conversationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -38,7 +39,7 @@ func NewCronTaskCmd() *CronTaskCmd {
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -44,7 +45,7 @@ func NewFriendRpcCmd() *FriendRpcCmd {
DiscoveryConfigFilename: &friendConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/cmd/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package cmd

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
Expand All @@ -45,7 +46,7 @@ func NewGroupRpcCmd() *GroupRpcCmd {
DiscoveryConfigFilename: &groupConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
Expand Down
Loading

0 comments on commit fb68961

Please sign in to comment.