feat: add docstring to EndpointHandler.__call__ ; when multiple inputs are sent, the output now also contains a token_list k/v pair for easier human inspection
Browse files- embed_two_chunks.sh +1 -1
- handler.py +31 -10
- test_endpoint.py +11 -4
embed_two_chunks.sh
CHANGED
@@ -5,5 +5,5 @@ curl \
|
|
5 |
--request POST \
|
6 |
--url http://localhost:4999 \
|
7 |
--header 'Content-Type: application/json' \
|
8 |
-
--data '{"inputs": ["Please embed me", "
|
9 |
-w "\n"
|
|
|
5 |
--request POST \
|
6 |
--url http://localhost:4999 \
|
7 |
--header 'Content-Type: application/json' \
|
8 |
+
--data '{"inputs": ["Please embed me", "En en en mij ook, alsjeblieft !!!"]}' \
|
9 |
-w "\n"
|
handler.py
CHANGED
@@ -7,6 +7,7 @@ import logging
|
|
7 |
|
8 |
logger = logging.getLogger(__name__)
|
9 |
|
|
|
10 |
MODEL = "fdurant/colbert-xm-for-inference-api"
|
11 |
|
12 |
class EndpointHandler():
|
@@ -18,11 +19,25 @@ class EndpointHandler():
|
|
18 |
nbits=2, # The number bits that each dimension encodes to.
|
19 |
kmeans_niters=4, # Number of iterations for k-means clustering during quantization.
|
20 |
nranks=-1, # Number of ranks (processors) to use for distributed computing; -1 uses all available CPUs/GPUs.
|
21 |
-
checkpoint=MODEL,
|
22 |
)
|
23 |
self._checkpoint = Checkpoint(self._config.checkpoint, colbert_config=self._config, verbose=3)
|
24 |
|
25 |
def __call__(self, data: Any) -> List[Dict[str, Any]]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
inputs = data["inputs"]
|
27 |
texts = []
|
28 |
if isinstance(inputs, str):
|
@@ -35,31 +50,37 @@ class EndpointHandler():
|
|
35 |
|
36 |
if len(texts) == 1:
|
37 |
# It's a query
|
38 |
-
logger.
|
39 |
embedding = self._checkpoint.queryFromText(
|
40 |
queries=texts,
|
41 |
full_length_search=False, # Indicates whether to encode the query for a full-length search.
|
42 |
)
|
43 |
-
logger.
|
44 |
return [
|
45 |
{"input": inputs, "query_embedding": embedding.tolist()[0]}
|
46 |
]
|
47 |
elif len(texts) > 1:
|
48 |
# It's a batch of chunks
|
49 |
logger.info(f"Batch of chunks: {texts}")
|
50 |
-
embeddings,
|
51 |
docs=texts,
|
52 |
bsize=self._config.bsize, # Batch size
|
53 |
keep_dims=True, # Do NOT flatten the embeddings
|
54 |
return_tokens=True, # Return the tokens as well
|
55 |
)
|
56 |
-
|
57 |
-
|
58 |
-
logger.
|
59 |
-
logger.
|
|
|
|
|
|
|
|
|
|
|
|
|
60 |
return [
|
61 |
-
{"input": _input, "chunk_embedding": embedding.tolist(), "
|
62 |
-
for _input, embedding,
|
63 |
]
|
64 |
else:
|
65 |
raise ValueError("No data to process")
|
|
|
7 |
|
8 |
logger = logging.getLogger(__name__)
|
9 |
|
10 |
+
# Hardcoded, I know
|
11 |
MODEL = "fdurant/colbert-xm-for-inference-api"
|
12 |
|
13 |
class EndpointHandler():
|
|
|
19 |
nbits=2, # The number bits that each dimension encodes to.
|
20 |
kmeans_niters=4, # Number of iterations for k-means clustering during quantization.
|
21 |
nranks=-1, # Number of ranks (processors) to use for distributed computing; -1 uses all available CPUs/GPUs.
|
22 |
+
checkpoint=MODEL, # Path to the model checkpoint.
|
23 |
)
|
24 |
self._checkpoint = Checkpoint(self._config.checkpoint, colbert_config=self._config, verbose=3)
|
25 |
|
26 |
def __call__(self, data: Any) -> List[Dict[str, Any]]:
|
27 |
+
"""
|
28 |
+
data args:
|
29 |
+
inputs (:obj: `str`)
|
30 |
+
Return:
|
31 |
+
A :obj:`list` : will be serialized and returned.
|
32 |
+
When the input is a single query string, the returned list will contain a single dictionary with:
|
33 |
+
- input (:obj: `str`) : The input query.
|
34 |
+
- query_embedding (:obj: `list`) : The query embedding of shape (1, 32, 128).
|
35 |
+
When the input is a batch (= list) of chunk strings, the returned list will contain a dictionary for each chunk:
|
36 |
+
- input (:obj: `str`) : The input chunk.
|
37 |
+
- chunk_embedding (:obj: `list`) : The chunk embedding of shape (1, num_tokens, 128)
|
38 |
+
- token_ids (:obj: `list`) : The token ids.
|
39 |
+
- token_list (:obj: `list`) : The token list.
|
40 |
+
"""
|
41 |
inputs = data["inputs"]
|
42 |
texts = []
|
43 |
if isinstance(inputs, str):
|
|
|
50 |
|
51 |
if len(texts) == 1:
|
52 |
# It's a query
|
53 |
+
logger.debug(f"Query: {texts}")
|
54 |
embedding = self._checkpoint.queryFromText(
|
55 |
queries=texts,
|
56 |
full_length_search=False, # Indicates whether to encode the query for a full-length search.
|
57 |
)
|
58 |
+
logger.debug(f"Query embedding shape: {embedding.shape}")
|
59 |
return [
|
60 |
{"input": inputs, "query_embedding": embedding.tolist()[0]}
|
61 |
]
|
62 |
elif len(texts) > 1:
|
63 |
# It's a batch of chunks
|
64 |
logger.info(f"Batch of chunks: {texts}")
|
65 |
+
embeddings, token_id_lists = self._checkpoint.docFromText(
|
66 |
docs=texts,
|
67 |
bsize=self._config.bsize, # Batch size
|
68 |
keep_dims=True, # Do NOT flatten the embeddings
|
69 |
return_tokens=True, # Return the tokens as well
|
70 |
)
|
71 |
+
token_lists = []
|
72 |
+
for text, embedding, token_ids in zip(texts, embeddings, token_id_lists):
|
73 |
+
logger.debug(f"Chunk: {text}")
|
74 |
+
logger.debug(f"Chunk embedding shape: {embedding.shape}")
|
75 |
+
logger.debug(f"Chunk token ids: {token_ids}")
|
76 |
+
token_list = self._checkpoint.doc_tokenizer.tok.convert_ids_to_tokens(token_ids)
|
77 |
+
token_lists.append(token_list)
|
78 |
+
logger.debug(f"Chunk tokens: {token_list}")
|
79 |
+
# reconstructed_text = self._checkpoint.doc_tokenizer.tok.decode(token_count)
|
80 |
+
# logger.debug(f"Reconstructed text with special tokens: {reconstructed_text}")
|
81 |
return [
|
82 |
+
{"input": _input, "chunk_embedding": embedding.tolist(), "token_ids": token_ids.tolist(), "token_list": token_list}
|
83 |
+
for _input, embedding, token_ids, token_list in zip(texts, embeddings, token_id_lists, token_lists)
|
84 |
]
|
85 |
else:
|
86 |
raise ValueError("No data to process")
|
test_endpoint.py
CHANGED
@@ -40,7 +40,8 @@ def test_query_returns_expected_result():
|
|
40 |
|
41 |
def test_batch_returns_expected_result():
|
42 |
chunks = ["try me", "try me again and again and again"]
|
43 |
-
|
|
|
44 |
payload = {"inputs": chunks}
|
45 |
|
46 |
response = requests.request("POST", URL, json=payload, headers=HEADERS)
|
@@ -56,12 +57,18 @@ def test_batch_returns_expected_result():
|
|
56 |
|
57 |
# Check chunk embedding (actually a list of embeddings, one per token in the chunk)
|
58 |
chunk_embedding = response_chunk.get("chunk_embedding")
|
59 |
-
|
60 |
assert isinstance(chunk_embedding, list)
|
61 |
-
assert len(chunk_embedding) == len(
|
62 |
-
assert len(
|
|
|
63 |
|
64 |
# Check first of the token embeddings
|
65 |
first_token_embedding = chunk_embedding[0]
|
66 |
assert len(first_token_embedding) == 128
|
67 |
assert all(isinstance(value, float) for value in first_token_embedding)
|
|
|
|
|
|
|
|
|
|
|
|
40 |
|
41 |
def test_batch_returns_expected_result():
|
42 |
chunks = ["try me", "try me again and again and again"]
|
43 |
+
length_of_longest_chunk = 11 # Including special tokens and padding
|
44 |
+
doc_maxlen=512
|
45 |
payload = {"inputs": chunks}
|
46 |
|
47 |
response = requests.request("POST", URL, json=payload, headers=HEADERS)
|
|
|
57 |
|
58 |
# Check chunk embedding (actually a list of embeddings, one per token in the chunk)
|
59 |
chunk_embedding = response_chunk.get("chunk_embedding")
|
60 |
+
token_ids = response_chunk.get("token_ids")
|
61 |
assert isinstance(chunk_embedding, list)
|
62 |
+
assert len(chunk_embedding) == len(token_ids)
|
63 |
+
assert len(token_ids) == length_of_longest_chunk
|
64 |
+
assert len(token_ids) <= doc_maxlen
|
65 |
|
66 |
# Check first of the token embeddings
|
67 |
first_token_embedding = chunk_embedding[0]
|
68 |
assert len(first_token_embedding) == 128
|
69 |
assert all(isinstance(value, float) for value in first_token_embedding)
|
70 |
+
|
71 |
+
# Check token list
|
72 |
+
token_list = response_chunk.get("token_list")
|
73 |
+
assert len(token_ids) == len(token_list)
|
74 |
+
assert all(isinstance(token, str) for token in token_list)
|