Skip to content

Commit

Permalink
Merge pull request #525 from refly-ai/sprint/v0.3.1
Browse files Browse the repository at this point in the history
Sprint/v0.3.1
  • Loading branch information
mrcfps authored Feb 21, 2025
2 parents 676a60f + 8798553 commit 3ca7d11
Show file tree
Hide file tree
Showing 17 changed files with 784 additions and 27 deletions.
18 changes: 17 additions & 1 deletion apps/api/src/utils/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,23 @@ export class ResultAggregator {
break;
case 'structured_data':
if (event.structuredData) {
step.structuredData = { ...step.structuredData, ...event.structuredData };
const structuredData = event.structuredData;
if (structuredData?.isPartial !== undefined) {
const existingData = step.structuredData || {};
const existingSources = (existingData.sources || []) as any[];
step.structuredData = {
...existingData,
sources: [
...existingSources,
...(Array.isArray(structuredData.sources) ? structuredData.sources : []),
],
isPartial: structuredData.isPartial,
chunkIndex: structuredData.chunkIndex,
totalChunks: structuredData.totalChunks,
};
} else {
step.structuredData = { ...step.structuredData, ...event.structuredData };
}
}
break;
case 'log':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export const CustomEdge = memo(
/>
) : (
label && (
<div className="nowheel px-2 py-1 text-[10px] text-gray-700 bg-[#effcfa] rounded cursor-pointer break-all">
<div className="nowheel px-2 py-1 text-[10px] text-center text-gray-700 bg-opacity-0 rounded cursor-pointer break-all">
{label}
</div>
)
Expand Down
33 changes: 29 additions & 4 deletions packages/ai-workspace-common/src/hooks/canvas/use-invoke-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,35 @@ export const useInvokeAction = () => {
}

const updatedStep: ActionStep = findOrCreateStep(result.steps ?? [], step);
updatedStep.structuredData = {
...updatedStep.structuredData,
...structuredData,
};

// Handle chunked sources data
if (structuredData.sources && Array.isArray(structuredData.sources)) {
const existingData = updatedStep.structuredData || {};
const existingSources = (existingData.sources || []) as any[];

// If this is a chunk of sources, merge it with existing sources
if (structuredData.isPartial !== undefined) {
updatedStep.structuredData = {
...existingData,
sources: [...existingSources, ...structuredData.sources],
isPartial: structuredData.isPartial,
chunkIndex: structuredData.chunkIndex,
totalChunks: structuredData.totalChunks,
};
} else {
// Handle non-chunked data as before
updatedStep.structuredData = {
...existingData,
...structuredData,
};
}
} else {
// Handle non-sources structured data
updatedStep.structuredData = {
...updatedStep.structuredData,
...structuredData,
};
}

const updatedResult = {
...result,
Expand Down
4 changes: 2 additions & 2 deletions packages/ai-workspace-common/src/utils/sse-post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const ssePost = async ({
try {
const response = await makeSSERequest(payload, controller);

const baseResp = await extractBaseResp(response);
const baseResp = await extractBaseResp(response, { success: true });
if (!baseResp.success) {
onSkillError?.({ error: baseResp, event: 'error' });
return;
Expand Down Expand Up @@ -107,7 +107,7 @@ export const ssePost = async ({
message: message.substring(6),
error: err,
});
return;
// return;
}

if (skillEvent?.event === 'start') {
Expand Down
4 changes: 4 additions & 0 deletions packages/i18n/src/en-US/skill-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const translations = {
title: 'Select Related Results',
description: 'Total of {{totalResults}} results, completed in {{duration}}ms',
},
generateAnswer: {
title: 'Generate Answer',
description: 'Start to generate answer...',
},
};

export default translations;
4 changes: 4 additions & 0 deletions packages/i18n/src/zh-Hans/skill-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const translations = {
title: '选择关联结果',
description: '总共 {{totalResults}} 个结果, 耗时 {{duration}} 毫秒',
},
generateAnswer: {
title: '生成答案',
description: '开始生成答案...',
},
};

export default translations;
70 changes: 70 additions & 0 deletions packages/skill-template/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,76 @@ export abstract class BaseSkill extends StructuredTool {
emitter.emit(eventData.event, eventData);
}

/**
* Emit large data in chunks with delay to prevent overwhelming the event system
* @param data The data to emit
* @param config The skill runnable config
* @param options Options for chunking and delay
*/
async emitLargeDataEvent<T>(
data: {
event?: string;
data: T[];
buildEventData: (
chunk: T[],
meta: { isPartial: boolean; chunkIndex: number; totalChunks: number },
) => Partial<SkillEvent>;
},
config: SkillRunnableConfig,
options: {
maxChunkSize?: number;
delayBetweenChunks?: number;
} = {},
): Promise<void> {
const { maxChunkSize = 500, delayBetweenChunks = 10 } = options;

// If no data or emitter, return early
if (!data.data?.length || !config?.configurable?.emitter) {
return;
}

// Split data into chunks based on size
const chunks: T[][] = [];
let currentChunk: T[] = [];
let currentSize = 0;

for (const item of data.data) {
const itemSize = JSON.stringify(item).length;

if (currentSize + itemSize > maxChunkSize && currentChunk.length > 0) {
chunks.push(currentChunk);
currentChunk = [];
currentSize = 0;
}

currentChunk.push(item);
currentSize += itemSize;
}

// Push the last chunk if not empty
if (currentChunk.length > 0) {
chunks.push(currentChunk);
}

// Emit chunks with delay
const emitPromises = chunks.map(
(chunk, i) =>
new Promise<void>((resolve) => {
setTimeout(() => {
const eventData = data.buildEventData(chunk, {
isPartial: i < chunks.length - 1,
chunkIndex: i,
totalChunks: chunks.length,
});
this.emitEvent(eventData, config);
resolve();
}, i * delayBetweenChunks);
}),
);

await Promise.all(emitPromises);
}

async _call(
input: typeof this.graphState,
_runManager?: CallbackManagerForToolRun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ export const callMultiLingualWebSearch = async (
// Keep original results if deduplication fails
}

ctxThis.emitEvent({ structuredData: { multiLingualSearchResult: finalResults } }, config);

// Return results with analysis
return {
sources: finalResults,
Expand Down
16 changes: 10 additions & 6 deletions packages/skill-template/src/scheduler/utils/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ export const buildFinalRequestMessages = ({
...chatHistory,
...messages,
...contextMessages,
new HumanMessage({
content: [
{ type: 'text', text: userPrompt },
...(images?.map((image) => ({ type: 'image_url', image_url: { url: image } })) || []),
],
}),
new HumanMessage(
images?.length
? {
content: [
{ type: 'text', text: userPrompt },
...(images.map((image) => ({ type: 'image_url', image_url: { url: image } })) || []),
],
}
: userPrompt,
),
];

return requestMessages;
Expand Down
17 changes: 16 additions & 1 deletion packages/skill-template/src/skills/common-qna.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,22 @@ export class CommonQnA extends BaseSkill {
config.metadata.step = { name: 'answerQuestion' };

if (sources.length > 0) {
this.emitEvent({ structuredData: { sources: truncateSource(sources) } }, config);
// Truncate sources before emitting
const truncatedSources = truncateSource(sources);
await this.emitLargeDataEvent(
{
data: truncatedSources,
buildEventData: (chunk, { isPartial, chunkIndex, totalChunks }) => ({
structuredData: {
sources: chunk,
isPartial,
chunkIndex,
totalChunks,
},
}),
},
config,
);
}

const model = this.engine.chatModel({ temperature: 0.1 });
Expand Down
18 changes: 17 additions & 1 deletion packages/skill-template/src/skills/edit-doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,23 @@ export class EditDoc extends BaseSkill {
this.engine.logger.log(`context: ${safeStringifyJSON(context)}`);

if (sources.length > 0) {
this.emitEvent({ structuredData: { sources: truncateSource(sources) } }, config);
// Split sources into smaller chunks based on size and emit them separately
const truncatedSources = truncateSource(sources);
await this.emitLargeDataEvent(
{
data: truncatedSources,
buildEventData: (chunk, { isPartial, chunkIndex, totalChunks }) => ({
structuredData: {
// Build your event data here
sources: chunk,
isPartial,
chunkIndex,
totalChunks,
},
}),
},
config,
);
}
}

Expand Down
20 changes: 19 additions & 1 deletion packages/skill-template/src/skills/generate-doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ ${recentHistory.map((msg) => `${(msg as HumanMessage)?.getType?.()}: ${msg.conte
buildContextUserPrompt: generateDocument.buildGenerateDocumentContextUserPrompt,
};

const { optimizedQuery, requestMessages, context, usedChatHistory } =
const { optimizedQuery, requestMessages, context, sources, usedChatHistory } =
await this.commonPreprocess(state, config, module);

// Generate title first
Expand Down Expand Up @@ -263,6 +263,24 @@ ${recentHistory.map((msg) => `${(msg as HumanMessage)?.getType?.()}: ${msg.conte
config,
);

if (sources.length > 0) {
const truncatedSources = truncateSource(sources);
await this.emitLargeDataEvent(
{
data: truncatedSources,
buildEventData: (chunk, { isPartial, chunkIndex, totalChunks }) => ({
structuredData: {
sources: chunk,
isPartial,
chunkIndex,
totalChunks,
},
}),
},
config,
);
}

const responseMessage = await model.invoke(requestMessages, {
...config,
metadata: {
Expand Down
18 changes: 17 additions & 1 deletion packages/skill-template/src/skills/library-search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,23 @@ export class LibrarySearch extends BaseSkill {
};

if (sources.length > 0) {
this.emitEvent({ structuredData: { sources: truncateSource(sources) } }, config);
// Split sources into smaller chunks based on size and emit them separately
const truncatedSources = truncateSource(sources);
await this.emitLargeDataEvent(
{
data: truncatedSources,
buildEventData: (chunk, { isPartial, chunkIndex, totalChunks }) => ({
structuredData: {
// Build your event data here
sources: chunk,
isPartial,
chunkIndex,
totalChunks,
},
}),
},
config,
);
}

const requestMessages = buildFinalRequestMessages({
Expand Down
Loading

0 comments on commit 3ca7d11

Please sign in to comment.