5
5
import aiohttp
6
6
import asyncio
7
7
import concurrent .futures
8
+ import json
8
9
from collections import Counter
9
10
from typing import Any , Optional , Dict , List , IO
10
11
30
31
)
31
32
32
33
33
- def parse_feed_df (df : DataFrame , include_id : bool , id_field = "id" ) -> List [Dict [str , Any ]]:
34
+ def parse_feed_df (df : DataFrame , include_id : bool , id_field = "id" , id_prefix = "" ) -> List [Dict [str , Any ]]:
34
35
"""
35
36
Convert a df into batch format for feeding
36
37
37
38
:param df: DataFrame with the following required columns ["id"]. Additional columns are assumed to be fields.
38
39
:param include_id: Include id on the fields to be fed.
39
40
:param id_field: Name of the column containing the id field.
41
+ :param id_prefix: Add a string prefix to ID field, e.g. "id:namespace:schema::"
40
42
:return: List of Dict containing 'id' and 'fields'.
41
43
"""
42
44
required_columns = [id_field ]
@@ -46,7 +48,7 @@ def parse_feed_df(df: DataFrame, include_id: bool, id_field="id") -> List[Dict[s
46
48
records = df .to_dict (orient = "records" )
47
49
batch = [
48
50
{
49
- "id" : record [id_field ],
51
+ "id" : record [id_field ] if id_prefix == "" else id_prefix + str ( record [ id_field ]) ,
50
52
"fields" : record
51
53
if include_id
52
54
else {k : v for k , v in record .items () if k not in [id_field ]},
@@ -56,6 +58,21 @@ def parse_feed_df(df: DataFrame, include_id: bool, id_field="id") -> List[Dict[s
56
58
return batch
57
59
58
60
61
+ def df_to_vespafeed (df : DataFrame , schema_name : str , id_field = "id" , namespace = "" ) -> str :
62
+ """
63
+ Convert a df into a string in Vespa JSON feed format,
64
+ see https://docs.vespa.ai/en/reference/document-json-format.html
65
+
66
+ :param df: DataFrame with the following required columns ["id"]. Additional columns are assumed to be fields.
67
+ :param schema_name: Schema name
68
+ :param id_field: Name of the column containing the id field.
69
+ :param namespace: Set if namespace != schema_name
70
+ :return: JSON string in Vespa feed format
71
+ """
72
+ return json .dumps (parse_feed_df (df , True , id_field ,
73
+ "id:{}:{}::" .format (schema_name if namespace == "" else namespace , schema_name )))
74
+
75
+
59
76
def raise_for_status (response : Response ) -> None :
60
77
"""
61
78
Raises an appropriate error if necessary.
@@ -418,7 +435,7 @@ def feed_batch(
418
435
:return: List of HTTP POST responses
419
436
"""
420
437
mini_batches = [
421
- batch [i : i + batch_size ] for i in range (0 , len (batch ), batch_size )
438
+ batch [i : i + batch_size ] for i in range (0 , len (batch ), batch_size )
422
439
]
423
440
batch_http_responses = []
424
441
for idx , mini_batch in enumerate (mini_batches ):
0 commit comments