Skip to content

Commit

Permalink
Work on Custom Message Type. Add required inputs and structures.
Browse files Browse the repository at this point in the history
  • Loading branch information
KGronek-Pubnub committed Nov 25, 2024
1 parent f246c39 commit 959ef8b
Show file tree
Hide file tree
Showing 22 changed files with 269 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ void UPubnubJsonUtilities::FetchHistoryJsonToData(FString ResponseJson, bool& Er
MessageValue->AsObject()->TryGetStringField(ANSI_TO_TCHAR("uuid"), CurrentMessage.UserID);
MessageValue->AsObject()->TryGetStringField(ANSI_TO_TCHAR("timetoken"), CurrentMessage.Timetoken);
MessageValue->AsObject()->TryGetStringField(ANSI_TO_TCHAR("message_type"), CurrentMessage.MessageType);
MessageValue->AsObject()->TryGetStringField(ANSI_TO_TCHAR("custom_message_type"), CurrentMessage.MessageType);
if(!MessageValue->AsObject()->TryGetStringField(ANSI_TO_TCHAR("meta"), CurrentMessage.Meta))
{
UE_LOG(LogTemp, Warning, TEXT("Reading Meta as Json Object"));
Expand Down
10 changes: 10 additions & 0 deletions Source/PubnubLibrary/Private/FunctionLibraries/PubnubUtilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ FString UPubnubUtilities::AddQuotesToString(const FString InString, bool SkipIfH

return InString;
}

FString UPubnubUtilities::PubnubCharMemBlockToString(const pubnub_char_mem_block PnChar)
{
if(!PnChar.ptr)
{
return "";
}

return FString::ConstructFromPtrSize(PnChar.ptr, PnChar.size);
}
80 changes: 56 additions & 24 deletions Source/PubnubLibrary/Private/PubnubSubsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ void UPubnubSubsystem::PublishMessage(FString Channel, FString Message, FPubnubP
});
}

void UPubnubSubsystem::Signal(FString Channel, FString Message)
void UPubnubSubsystem::Signal(FString Channel, FString Message, FPubnubSignalSettings SignalSettings)
{
if(!CheckIsPubnubInitialized() || !CheckQuickActionThreadValidity())
{return;}

QuickActionThread->AddFunctionToQueue( [this, Channel, Message]
QuickActionThread->AddFunctionToQueue( [this, Channel, Message, SignalSettings]
{
PublishMessage_priv(Channel, Message);
Signal_priv(Channel, Message, SignalSettings);
});
}

Expand Down Expand Up @@ -804,16 +804,22 @@ void UPubnubSubsystem::StartPubnubSubscribeLoop()
if(SubscribedChannels.IsEmpty() && SubscribedGroups.IsEmpty())
{return;}

//Subscribe to channels - this is blocking function
pubnub_subscribe(ctx_sub, TCHAR_TO_ANSI(*StringArrayToCommaSeparated(SubscribedChannels)), TCHAR_TO_ANSI(*StringArrayToCommaSeparated(SubscribedGroups)));

pubnub_subscribe_v2_options PubnubOptions = pubnub_subscribe_v2_defopts();
auto CharConverter = StringCast<ANSICHAR>(*StringArrayToCommaSeparated(SubscribedGroups));
PubnubOptions.channel_group = CharConverter.Get();

//Subscribe to channels - this is blocking function
pubnub_res SubscribeResult = pubnub_subscribe_v2(ctx_sub, TCHAR_TO_ANSI(*StringArrayToCommaSeparated(SubscribedChannels)), PubnubOptions);

//If context was released on deinitializing subsystem it should just return
if(!IsInitialized)
{return;}

//Check for subscribe result
pubnub_res SubscribeResult = pubnub_await(ctx_sub);
if (SubscribeResult != PNR_OK)
//pubnub_res SubscribeResult = pubnub_await(ctx_sub);
pubnub_await(ctx_sub);
if (SubscribeResult != PNR_OK && SubscribeResult != PNR_STARTED )
{
PubnubResponseError(SubscribeResult, "Failed to subscribe to channel.");
{return;}
Expand All @@ -822,28 +828,24 @@ void UPubnubSubsystem::StartPubnubSubscribeLoop()
//Check once again, as subsystem could be deinitialized during await
if(!IsInitialized)
{return;}

//At this stage we received messages, so read them and get channel from where they were sent
const char* MessageChar = pubnub_get(ctx_sub);
const char* ChannelChar = pubnub_get_channel(ctx_sub);
while(MessageChar != NULL)
pubnub_v2_message message = pubnub_get_v2(ctx_sub);
while(message.payload.ptr)
{
FString Message(MessageChar);
FString Channel(ChannelChar);
FPubnubMessageData MessageData = UEMessageFromPubnub(message);

//Skip system messages, we don't need to display them to user
if(Message != SystemPublishMessage)
if(MessageData.Message != SystemPublishMessage)
{
//Broadcast callback with message content
//Message needs to be called back on Game Thread
AsyncTask(ENamedThreads::GameThread, [this, Message, Channel]()
AsyncTask(ENamedThreads::GameThread, [this, MessageData]()
{
OnMessageReceived.Broadcast(Message, Channel);
OnMessageReceived.Broadcast(MessageData);
});
}

MessageChar = pubnub_get(ctx_sub);
ChannelChar = pubnub_get_channel(ctx_sub);
message = pubnub_get_v2(ctx_sub);
}
});
}
Expand Down Expand Up @@ -1145,10 +1147,13 @@ void UPubnubSubsystem::PublishMessage_priv(FString Channel, FString Message, FPu
pubnub_publish_options PubnubOptions;
auto CharConverter = StringCast<ANSICHAR>(*PublishSettings.MetaData);
PubnubOptions.meta = CharConverter.Get();

PublishUESettingsToPubnubPublishOptions(PublishSettings, PubnubOptions);

auto TypeCharConverter = StringCast<ANSICHAR>(*PublishSettings.CustomMessageType);
PubnubOptions.custom_message_type = TypeCharConverter.Get();

PublishUESettingsToPubnubPublishOptions(PublishSettings, PubnubOptions);
pubnub_publish_ex(ctx_pub, TCHAR_TO_ANSI(*Channel), TCHAR_TO_ANSI(*Message), PubnubOptions);

pubnub_res PublishResult = pubnub_await(ctx_pub);

if(PublishResult != PNR_OK)
Expand All @@ -1157,15 +1162,25 @@ void UPubnubSubsystem::PublishMessage_priv(FString Channel, FString Message, FPu
}
}

void UPubnubSubsystem::Signal_priv(FString Channel, FString Message)
void UPubnubSubsystem::Signal_priv(FString Channel, FString Message, FPubnubSignalSettings SignalSettings)
{
if(!CheckIsUserIDSet())
{return;}

if(CheckIsFieldEmpty(Channel, "ChannelName", "Signal") || CheckIsFieldEmpty(Message, "Message", "Signal"))
{return;}

pubnub_signal_options PubnubOptions = pubnub_signal_defopts();
auto CharConverter = StringCast<ANSICHAR>(*SignalSettings.CustomMessageType);
PubnubOptions.custom_message_type = CharConverter.Get();
pubnub_signal_ex(ctx_pub, TCHAR_TO_ANSI(*Channel), TCHAR_TO_ANSI(*Message), PubnubOptions);

pubnub_res PublishResult = pubnub_await(ctx_pub);

pubnub_signal(ctx_pub, TCHAR_TO_ANSI(*Channel), TCHAR_TO_ANSI(*Message));
if(PublishResult != PNR_OK)
{
PubnubPublishError();
}
}

void UPubnubSubsystem::SubscribeToChannel_priv(FString Channel)
Expand Down Expand Up @@ -2429,7 +2444,9 @@ void UPubnubSubsystem::PublishUESettingsToPubnubPublishOptions(FPubnubPublishSet
PubnubPublishOptions.store = PublishSettings.StoreInHistory;
PubnubPublishOptions.replicate = PublishSettings.Replicate;
PubnubPublishOptions.cipher_key = NULL;
PubnubPublishOptions.ttl = PublishSettings.Ttl;
PublishSettings.MetaData.IsEmpty() ? PubnubPublishOptions.meta = NULL : nullptr;
PublishSettings.CustomMessageType.IsEmpty() ? PubnubPublishOptions.custom_message_type = NULL : nullptr;
PubnubPublishOptions.method = (pubnub_method)(uint8)PublishSettings.PublishMethod;
}

Expand All @@ -2455,10 +2472,25 @@ void UPubnubSubsystem::FetchHistoryUESettingsToPbFetchHistoryOptions(FPubnubFetc
PubnubFetchHistoryOptions.include_message_type = FetchHistorySettings.IncludeMessageType;
PubnubFetchHistoryOptions.include_user_id = FetchHistorySettings.IncludeUserID;
PubnubFetchHistoryOptions.include_message_actions = FetchHistorySettings.IncludeMessageActions;
PubnubFetchHistoryOptions.include_custom_message_type = FetchHistorySettings.IncludeCustomMessageType;
FetchHistorySettings.Start.IsEmpty() ? PubnubFetchHistoryOptions.start = NULL : nullptr;
FetchHistorySettings.End.IsEmpty() ? PubnubFetchHistoryOptions.end = NULL : nullptr;
}

FPubnubMessageData UPubnubSubsystem::UEMessageFromPubnub(pubnub_v2_message PubnubMessage)
{
FPubnubMessageData MessageData;
MessageData.Message = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.payload);
MessageData.Channel = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.channel);
MessageData.UserID = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.publisher);
MessageData.Timetoken = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.tt);
MessageData.Metadata = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.metadata);
MessageData.MessageType = (EPubnubMessageType)(PubnubMessage.message_type);
MessageData.CustomMessageType = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.custom_message_type);
MessageData.MatchOrGroup = UPubnubUtilities::PubnubCharMemBlockToString(PubnubMessage.match_or_group);
return MessageData;
}

//This functions assumes that Channels and Permissions are already checked. It means that there is the same amount of permissions as channels or there is exactly one permission
TSharedPtr<FJsonObject> UPubnubSubsystem::AddChannelPermissionsToJson(TArray<FString> Channels, TArray<FPubnubChannelPermissions> ChannelPermissions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#pragma once

#include <pubnub_memory_block.h>
#include "CoreMinimal.h"
#include "Kismet/BlueprintFunctionLibrary.h"
#include "PubnubUtilities.generated.h"
Expand All @@ -16,4 +17,5 @@ class PUBNUBLIBRARY_API UPubnubUtilities : public UBlueprintFunctionLibrary
public:

static FString AddQuotesToString(const FString InString, bool SkipIfHasQuotes = true);
static FString PubnubCharMemBlockToString(const pubnub_char_mem_block PnChar);
};
15 changes: 15 additions & 0 deletions Source/PubnubLibrary/Public/PubnubEnumLibrary.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,18 @@ enum class EPubnubErrorType : uint8
PET_Error UMETA(DisplayName="Error"),
PET_Warning UMETA(DisplayName="Warning")
};

UENUM(BlueprintType)
enum class EPubnubMessageType : uint8
{
/* Indicates that message was received as a signal */
PMT_Signal,
/* Indicates that message was published */
PMT_Published,
/* Indicates action on published message */
PMT_Action,
/* Message about Objects */
PMT_Objects,
/* Message about Files */
PMT_Files,
};
50 changes: 50 additions & 0 deletions Source/PubnubLibrary/Public/PubnubStructLibrary.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ struct FPubnubPublishSettings
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString MetaData = "";
//Defines the method by which publish transaction will be performed. Can be HTTP GET or POST. If using POST, content can be GZIP compressed.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") EPubnubPublishMethod PublishMethod = EPubnubPublishMethod::PPM_SendViaGET;
//For how many hours message should be kept and available with history API. If 0, server default will be used.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") int Ttl = 0;
/** User-specified message type.
Important: String limited by **3**-**50** case-sensitive alphanumeric characters with only `-` and `_` special characters allowed.
*/
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString CustomMessageType = "";
};

USTRUCT(BlueprintType)
struct FPubnubSignalSettings
{
GENERATED_BODY()

/** User-specified message type.
Important: String limited by **3**-**50** case-sensitive alphanumeric characters with only `-` and `_` special characters allowed.
*/
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString CustomMessageType = "";
};

USTRUCT(BlueprintType)
Expand Down Expand Up @@ -128,6 +145,10 @@ struct FPubnubFetchHistorySettings
* false.
*/
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") bool IncludeMessageActions = false;
/** Include messages' custom type flag.
Message / signal and file messages may contain user-provided type.
*/
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") bool IncludeCustomMessageType = false;
};

USTRUCT(BlueprintType)
Expand Down Expand Up @@ -253,6 +274,8 @@ struct FPubnubHistoryMessageData
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Meta = "";
//Type of the message. Refer to Message types for more information.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString MessageType = "";
//User-specified message type.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString CustomMessageType = "";
//An array of FPubnubMessageActionData structs which are message actions that were added to the historical messages.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") TArray<FPubnubMessageActionData> MessageActions;
};
Expand Down Expand Up @@ -343,4 +366,31 @@ struct FPubnubGetChannelMembersWrapper
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Updated = "";
//Version identifier of the membership metadata.
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString ETag = "";
};

USTRUCT(BlueprintType)
struct FPubnubMessageData
{
GENERATED_BODY()

/** The message itself */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Message = "";
/** Channel that message was published to */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Channel = "";
/** The message information about publisher */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString UserID = "";
/** The time token of the message - when it was published. */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Timetoken = "";
/** The message metadata, as published */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString Metadata = "";
/** Indicates the message type: a signal, published, or something else */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") EPubnubMessageType MessageType = EPubnubMessageType::PMT_Published;
/** User-provided message type. */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString CustomMessageType = "";
/** Subscription match or the channel group */
UPROPERTY(BlueprintReadWrite, VisibleAnywhere, Category = "Pubnub") FString MatchOrGroup = "";
/** Region of the message - not interesting in most cases */
int region = 0;
/** Message flags */
int flags = 0;
};
11 changes: 5 additions & 6 deletions Source/PubnubLibrary/Public/PubnubSubsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class FPubnubFunctionThread;
class FPubnubLoopingThread;
class UPubnubChatSystem;

DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(FOnMessageReceived, FString, MessageJson, FString, Channel);
DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnMessageReceived, FPubnubMessageData, Message);
DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(FOnPubnubError, FString, ErrorMessage, EPubnubErrorType, ErrorType);
DECLARE_DYNAMIC_DELEGATE_OneParam(FOnPubnubResponse, FString, JsonResponse);
DECLARE_DYNAMIC_DELEGATE_OneParam(FOnPubnubIntResponse, int, IntValue);
Expand Down Expand Up @@ -113,7 +113,7 @@ class PUBNUBLIBRARY_API UPubnubSubsystem : public UGameInstanceSubsystem
* @param Message The message to send as the signal. This message can be any data type that can be serialized into JSON.
*/
UFUNCTION(BlueprintCallable, Category = "Pubnub|Publish")
void Signal(FString Channel, FString Message);
void Signal(FString Channel, FString Message, FPubnubSignalSettings SignalSettings = FPubnubSignalSettings());

/**
* Subscribes to a specified channel - start listening for messages on that channel.
Expand Down Expand Up @@ -814,7 +814,7 @@ class PUBNUBLIBRARY_API UPubnubSubsystem : public UGameInstanceSubsystem
void SetUserID_priv(FString UserID);
void SetSecretKey_priv();
void PublishMessage_priv(FString Channel, FString Message, FPubnubPublishSettings PublishSettings = FPubnubPublishSettings());
void Signal_priv(FString Channel, FString Message);
void Signal_priv(FString Channel, FString Message, FPubnubSignalSettings SignalSettings = FPubnubSignalSettings());
void SubscribeToChannel_priv(FString Channel);
void SubscribeToGroup_priv(FString GroupName);
void UnsubscribeFromChannel_priv(FString Channel);
Expand Down Expand Up @@ -885,12 +885,11 @@ class PUBNUBLIBRARY_API UPubnubSubsystem : public UGameInstanceSubsystem
void HereNowUESettingsToPubnubHereNowOptions(FPubnubListUsersFromChannelSettings &HereNowSettings, pubnub_here_now_options &PubnubHereNowOptions);
void SetStateUESettingsToPubnubSetStateOptions(FPubnubSetStateSettings &SetStateSettings, pubnub_set_state_options &PubnubSetStateOptions);
void FetchHistoryUESettingsToPbFetchHistoryOptions(FPubnubFetchHistorySettings &FetchHistorySettings, pubnub_fetch_history_options &PubnubFetchHistoryOptions);

FPubnubMessageData UEMessageFromPubnub(pubnub_v2_message PubnubMessage);

/* GRANT TOKEN HELPERS */

TSharedPtr<FJsonObject> AddChannelPermissionsToJson(TArray<FString> Channels, TArray<FPubnubChannelPermissions> ChannelPermissions);
TSharedPtr<FJsonObject> AddChannelGroupPermissionsToJson(TArray<FString> ChannelGroups, TArray<FPubnubChannelGroupPermissions> ChannelGroupPermissions);
TSharedPtr<FJsonObject> AddUserPermissionsToJson(TArray<FString> Users, TArray<FPubnubUserPermissions> UserPermissions);
};


4 changes: 3 additions & 1 deletion Source/ThirdParty/sdk/Include/core/pbcc_fetch_history.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ enum pubnub_res pbcc_fetch_history_prep(struct pbcc_context* pb,
const char* channel,
unsigned int max_per_channel,
enum pubnub_tribool include_meta,
enum pubnub_tribool include_custom_message_type,
enum pubnub_tribool include_message_type,
enum pubnub_tribool include_user_id,
enum pubnub_tribool include_message_actions,
Expand Down Expand Up @@ -59,7 +60,8 @@ enum pubnub_res pbcc_fetch_history_prep(struct pbcc_context* pb,
if (max_per_channel) { ADD_URL_PARAM(qparam, max, max_per_ch_cnt_buf); }

if (include_meta != pbccNotSet) { ADD_URL_PARAM(qparam, include_meta, include_meta == pbccTrue ? "true" : "false"); }
if (include_message_type != pbccNotSet) { ADD_URL_PARAM(qparam, include_message_type, include_meta == pbccTrue ? "true" : "false"); }
if (include_custom_message_type != pbccNotSet) { ADD_URL_PARAM(qparam, include_custom_message_type, include_custom_message_type == pbccTrue ? "true" : "false"); }
if (include_message_type != pbccNotSet) { ADD_URL_PARAM(qparam, include_message_type, include_message_type == pbccTrue ? "true" : "false"); }
if (include_user_id != pbccNotSet) { ADD_URL_PARAM(qparam, include_uuid, include_user_id == pbccTrue ? "true" : "false"); }
#if PUBNUB_CRYPTO_API
if (pb->secret_key == NULL) { ADD_URL_AUTH_PARAM(pb, qparam, auth); }
Expand Down
1 change: 1 addition & 0 deletions Source/ThirdParty/sdk/Include/core/pbcc_fetch_history.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ enum pubnub_res pbcc_fetch_history_prep(struct pbcc_context* pb,
const char* channel,
unsigned int max_per_channel,
enum pubnub_tribool include_meta,
enum pubnub_tribool include_custom_message_type,
enum pubnub_tribool include_message_type,
enum pubnub_tribool include_user_id,
enum pubnub_tribool include_message_actions,
Expand Down
7 changes: 6 additions & 1 deletion Source/ThirdParty/sdk/Include/core/pbcc_subscribe_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
enum pubnub_res pbcc_subscribe_v2_prep(struct pbcc_context* p,
char const* channel,
char const* channel_group,
unsigned* heartbeat,
const unsigned* heartbeat,
char const* filter_expr)
{
char region_str[20];
Expand Down Expand Up @@ -303,6 +303,11 @@ struct pubnub_v2_message pbcc_get_msg_v2(struct pbcc_context* p)
rslt.message_type = pbsbPublished;
}

if (jonmpOK == pbjson_get_object_value(&el, "cmt", &found)) {
rslt.custom_message_type.ptr = (char*)found.start + 1;
rslt.custom_message_type.size = found.end - found.start - 2;
}

jpresult = pbjson_get_object_value(&el, "p", &found);
if (jonmpOK == jpresult) {
struct pbjson_elem titel;
Expand Down
2 changes: 1 addition & 1 deletion Source/ThirdParty/sdk/Include/core/pbcc_subscribe_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct pbcc_context;
enum pubnub_res pbcc_subscribe_v2_prep(struct pbcc_context* p,
char const* channel,
char const* channel_group,
unsigned* heartbeat,
const unsigned* heartbeat,
char const* filter_expr);


Expand Down
Loading

0 comments on commit 959ef8b

Please sign in to comment.