Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(project): improve huge knowledge base #386

Merged
merged 13 commits into from
Sep 10, 2024
Merged
23 changes: 3 additions & 20 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
models/
repodir/
workdir/
write_toml.py
modeling_internlm2.py
Expand All @@ -9,7 +7,6 @@ logs/
logs/work.txt
server.log
**/__pycache__
pk/
badcase.txt
config.bak
config.ini
Expand All @@ -28,33 +25,19 @@ nohup.out
start-web.sh
web/proxy/config-template.ini
web/env.sh
sft-data
config-alignment.ini
logs/work.txt
web/tools/query.jsonl
query.jsonl
web/tools/groups/
tests/history_recv_send.txt
web/tools/chat.txt
web/tools/filter.jsonl
unittest/token.json
config.test
wkteam/
config-wechat.ini
sft/groups/
evaluation/queries/
candidates/
evaluation/candidates.zip
evaluation/feature_stores.zip
evaluation/query.log
bceodir/
odir/
repodir-full/
web.log
workdir-full/
evaluation/rejection/gt_bad.txt
evaluation/rejection/gt_good.txt
workdir832/
workdir.bak/
workdir-20240729-kg-included/
bm25.pkl
repodir/
repodir.full/
workdir.full/
1 change: 1 addition & 0 deletions config-cpu.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reranker_model_path = "https://api.siliconflow.cn/v1/rerank"
# if using `siliconcloud` API as `embedding_model_path` or `reranker_model_path`, give the token
api_token = ""
api_rpm = 800
api_tpm = 40000
work_dir = "workdir"

[web_search]
Expand Down
1 change: 1 addition & 0 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reranker_model_path = "maidalun1020/bce-reranker-base_v1"
# if using `siliconcloud` API as `embedding_model_path` or `reranker_model_path`, give the token
api_token = ""
api_rpm = 1000
api_tpm = 40000
work_dir = "workdir"

[web_search]
Expand Down
79 changes: 79 additions & 0 deletions evaluation/end2end/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from huixiangdou.service import ParallelPipeline, start_llm_server
from huixiangdou.primitive import Query
import json
import asyncio
import pdb
from typing import List
from rouge import Rouge
from loguru import logger

assistant = ParallelPipeline(work_dir='/home/khj/hxd-ci/workdir', config_path='/home/khj/hxd-ci/config.ini')

def format_refs(refs: List[str]):
refs_filter = list(set(refs))
if len(refs) < 1:
return ''

text = '**References:**\r\n'
for file_or_url in refs_filter:
text += '* {}\r\n'.format(file_or_url)
text += '\r\n'
return text

async def run(query_text: str):
query = Query(query_text)
sentence = ''
refs = None
async for sess in assistant.generate(query=query, enable_web_search=False):
if len(sess.delta) > 0:
sentence += sess.delta
if refs is None:
refs = sess.references
return sentence, refs

gts = []
dts = []

output_filepath = 'out.jsonl'

finished_query = []
with open(output_filepath) as fin:
json_str = ""
for line in fin:
json_str += line

if '}\n' == line:
print(json_str)
json_obj = json.loads(json_str)
finished_query.append(json_obj['query'].strip())
json_str = ""

with open('evaluation/end2end/qa.jsonl') as fin:
for json_str in fin:
json_obj = json.loads(json_str)
query = json_obj['query'].strip()
if query in finished_query:
continue

gt = json_obj['resp']
gts.append(gt)

loop = asyncio.get_event_loop()
dt, refs = loop.run_until_complete(run(query_text=query))
dts.append(dt)

distance = assistant.retriever.embedder.distance(text1=gt, text2=dt).tolist()

rouge = Rouge()
scores = rouge.get_scores(gt, dt)
json_obj['distance'] = distance
json_obj['rouge_scores'] = scores
json_obj['dt'] = dt
json_obj['dt_refs'] = refs

out_json_str = json.dumps(json_obj, ensure_ascii=False, indent=2)
logger.info(out_json_str)

with open(output_filepath, 'a') as fout:
fout.write(out_json_str)
fout.write('\n')
3 changes: 1 addition & 2 deletions evaluation/rerank/step1_create_candidates.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ def process(param: tuple):
json_str = json.dumps({
'query': query,
'candidates': candidates
},
ensure_ascii=False)
}, ensure_ascii=False)

with open(os.path.join('candidates', fsid + '.jsonl'), 'a') as f:
f.write(json_str)
Expand Down
2 changes: 1 addition & 1 deletion huixiangdou/primitive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
MarkdownTextRefSplitter,
RecursiveCharacterTextSplitter,
nested_split_markdown, split_python_code)
from .rpm import RPM
from .limitter import RPM, TPM
from .bm250kapi import BM25Okapi
45 changes: 42 additions & 3 deletions huixiangdou/primitive/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import numpy as np
from loguru import logger
from .query import DistanceStrategy
from .rpm import RPM
from .limitter import RPM, TPM
from .chunk import Chunk

class Embedder:
"""Wrap text2vec (multimodal) model."""
Expand Down Expand Up @@ -43,10 +44,13 @@ def __init__(self, model_config: dict):

if 'Bearer' not in api_token:
api_token = 'Bearer ' + api_token
api_rpm = max(1, int(model_config['api_rpm']))
api_rpm = max(1000, int(model_config['api_rpm']))
api_tpm = max(40000, int(model_config['api_tpm']))

self.client = {
'api_token': api_token,
'api_rpm': RPM(api_rpm)
'api_rpm': RPM(api_rpm),
'api_tpm': TPM(api_tpm)
}

else:
Expand Down Expand Up @@ -74,6 +78,15 @@ def token_length(self, text: str) -> int:
else:
return len(text) // 2

def distance(self, text1:str, text2:str) -> float:
emb1 = self.embed_query(text=text1)
emb2 = self.embed_query(text=text2)

if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
distance = np.linalg.norm(emb1 - emb2)
return distance
raise ValueError('Unsupported distance strategy')

def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
"""Embed input text or image as feature, output np.ndarray with np.float32"""
if 'bge' in self._type:
Expand All @@ -91,6 +104,7 @@ def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
return emb
else:
self.client['api_rpm'].wait(silent=True)
self.client['api_tpm'].wait(silent=True, token_count=len(text))

# siliconcloud bce API
if text is None:
Expand All @@ -115,3 +129,28 @@ def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
emb_list = json_obj['data'][0]['embedding']
emb = np.array(emb_list).astype(np.float32).reshape(1, -1)
return emb

def embed_query_batch_text(self, chunks: List[Chunk] = []) -> np.ndarray:
"""Embed input text or image as feature, output np.ndarray with np.float32"""
if 'bge' in self._type:
import torch
with torch.no_grad():
features = []
for c in chunks:
feature = self.client.encode(text=c.content_or_path)
features.append(feature.cpu().numpy())
return np.concatenate(features).reshape(len(chunks), -1).astype(np.float32)

elif 'bce' in self._type:
texts = []
for c in chunks:
texts.append(c.content_or_path)
emb = self.client.encode(texts, show_progress_bar=False, normalize_embeddings=True)
return emb.astype(np.float32)

else:
features = []
for c in chunks:
feature = self.embed_query(text=c.content_or_path)
features.append(feature)
return np.concatenate(features).reshape(len(chunks), -1).astype(np.float32)
103 changes: 79 additions & 24 deletions huixiangdou/primitive/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .embedder import Embedder
from .query import Query, DistanceStrategy
from .chunk import Chunk


# heavily modified from langchain
Expand Down Expand Up @@ -118,6 +119,20 @@ def similarity_search_with_query(self,
logger.info('highest score {}, threshold {}'.format(highest_score, threshold))
return ret

@classmethod
def split_by_batchsize(self, chunks: List[Chunk] = [], batchsize:int = 4):
texts = [c for c in chunks if c.modal == 'text']
images = [c for c in chunks if c.modal == 'image']

block_text = []
for i in range(0, len(texts), batchsize):
block_text.append(texts[i:i+batchsize])

block_image = []
for i in range(0, len(images), batchsize):
block_image.append(images[i:i+batchsize])
return block_text, block_image

@classmethod
def save_local(self, folder_path: str, chunks: List[Chunk],
embedder: Embedder) -> None:
Expand All @@ -131,32 +146,72 @@ def save_local(self, folder_path: str, chunks: List[Chunk],

faiss = dependable_faiss_import()
index = None
batchsize = 1

for chunk in tqdm(chunks):
np_feature = None
try:
if chunk.modal == 'text':
np_feature = embedder.embed_query(text=chunk.content_or_path)
elif chunk.modal == 'image':
try:
batchsize_str = os.getenv('HUIXIANGDOU_BATCHSIZE')
if batchsize_str is None:
logger.info('`export HUIXIANGDOU_BATCHSIZE=64` for faster feature building.')
else:
batchsize = int(batchsize_str)
except Exception as e:
logger.error(str(e))
batchsize = 1

if batchsize == 1:
for chunk in tqdm(chunks, 'chunks'):
np_feature = None
try:
if chunk.modal == 'text':
np_feature = embedder.embed_query(text=chunk.content_or_path)
elif chunk.modal == 'image':
np_feature = embedder.embed_query(path=chunk.content_or_path)
else:
raise ValueError(f'Unimplement chunk type: {chunk.modal}')
except Exception as e:
logger.error('{}'.format(e))

if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_feature)
else:
# batching
block_text, block_image = self.split_by_batchsize(chunks=chunks, batchsize=batchsize)
for subchunks in tqdm(block_text, 'build_text'):
np_features = embedder.embed_query_batch_text(chunks=subchunks)
if index is None:
dimension = np_features[0].shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_features)

for subchunks in tqdm(block_image, 'build_image'):
for chunk in subchunks:
np_feature = embedder.embed_query(path=chunk.content_or_path)
else:
raise ValueError(f'Unimplement chunk type: {chunk.modal}')
except Exception as e:
logger.error('{}'.format(e))

if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)

index.add(np_feature)
if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_feature)

path = Path(folder_path)
path.mkdir(exist_ok=True, parents=True)
Expand Down
Loading
Loading