distances.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. from enum import Enum
  2. from typing import Optional, Union
  3. import numpy as np
  4. from qdrant_client.conversions import common_types as types
  5. from qdrant_client.http import models
  6. EPSILON = 1.1920929e-7 # https://doc.rust-lang.org/std/f32/constant.EPSILON.html
  7. # https://github.com/qdrant/qdrant/blob/7164ac4a5987d28f1c93f5712aef8e09e7d93555/lib/segment/src/spaces/simple_avx.rs#L99C10-L99C10
  8. class DistanceOrder(str, Enum):
  9. BIGGER_IS_BETTER = "bigger_is_better"
  10. SMALLER_IS_BETTER = "smaller_is_better"
  11. class RecoQuery:
  12. def __init__(
  13. self,
  14. positive: Optional[list[list[float]]] = None,
  15. negative: Optional[list[list[float]]] = None,
  16. strategy: Optional[models.RecommendStrategy] = None,
  17. ):
  18. assert strategy is not None, "Recommend strategy must be provided"
  19. self.strategy = strategy
  20. positive = positive if positive is not None else []
  21. negative = negative if negative is not None else []
  22. self.positive: list[types.NumpyArray] = [np.array(vector) for vector in positive]
  23. self.negative: list[types.NumpyArray] = [np.array(vector) for vector in negative]
  24. assert not np.isnan(self.positive).any(), "Positive vectors must not contain NaN"
  25. assert not np.isnan(self.negative).any(), "Negative vectors must not contain NaN"
  26. class ContextPair:
  27. def __init__(self, positive: list[float], negative: list[float]):
  28. self.positive: types.NumpyArray = np.array(positive)
  29. self.negative: types.NumpyArray = np.array(negative)
  30. assert not np.isnan(self.positive).any(), "Positive vector must not contain NaN"
  31. assert not np.isnan(self.negative).any(), "Negative vector must not contain NaN"
  32. class DiscoveryQuery:
  33. def __init__(self, target: list[float], context: list[ContextPair]):
  34. self.target: types.NumpyArray = np.array(target)
  35. self.context = context
  36. assert not np.isnan(self.target).any(), "Target vector must not contain NaN"
  37. class ContextQuery:
  38. def __init__(self, context_pairs: list[ContextPair]):
  39. self.context_pairs = context_pairs
  40. DenseQueryVector = Union[
  41. DiscoveryQuery,
  42. ContextQuery,
  43. RecoQuery,
  44. ]
  45. def distance_to_order(distance: models.Distance) -> DistanceOrder:
  46. """
  47. Convert distance to order
  48. Args:
  49. distance: distance to convert
  50. Returns:
  51. order
  52. """
  53. if distance == models.Distance.EUCLID:
  54. return DistanceOrder.SMALLER_IS_BETTER
  55. elif distance == models.Distance.MANHATTAN:
  56. return DistanceOrder.SMALLER_IS_BETTER
  57. return DistanceOrder.BIGGER_IS_BETTER
  58. def cosine_similarity(query: types.NumpyArray, vectors: types.NumpyArray) -> types.NumpyArray:
  59. """
  60. Calculate cosine distance between query and vectors
  61. Args:
  62. query: query vector
  63. vectors: vectors to calculate distance with
  64. Returns:
  65. distances
  66. """
  67. vectors_norm = np.linalg.norm(vectors, axis=-1)[:, np.newaxis]
  68. vectors /= np.where(vectors_norm != 0.0, vectors_norm, EPSILON)
  69. if len(query.shape) == 1:
  70. query_norm = np.linalg.norm(query)
  71. query /= np.where(query_norm != 0.0, query_norm, EPSILON)
  72. return np.dot(vectors, query)
  73. query_norm = np.linalg.norm(query, axis=-1)[:, np.newaxis]
  74. query /= np.where(query_norm != 0.0, query_norm, EPSILON)
  75. return np.dot(query, vectors.T)
  76. def dot_product(query: types.NumpyArray, vectors: types.NumpyArray) -> types.NumpyArray:
  77. """
  78. Calculate dot product between query and vectors
  79. Args:
  80. query: query vector.
  81. vectors: vectors to calculate distance with
  82. Returns:
  83. distances
  84. """
  85. if len(query.shape) == 1:
  86. return np.dot(vectors, query)
  87. else:
  88. return np.dot(query, vectors.T)
  89. def euclidean_distance(query: types.NumpyArray, vectors: types.NumpyArray) -> types.NumpyArray:
  90. """
  91. Calculate euclidean distance between query and vectors
  92. Args:
  93. query: query vector.
  94. vectors: vectors to calculate distance with
  95. Returns:
  96. distances
  97. """
  98. if len(query.shape) == 1:
  99. return np.linalg.norm(vectors - query, axis=-1)
  100. else:
  101. return np.linalg.norm(vectors - query[:, np.newaxis], axis=-1)
  102. def manhattan_distance(query: types.NumpyArray, vectors: types.NumpyArray) -> types.NumpyArray:
  103. """
  104. Calculate manhattan distance between query and vectors
  105. Args:
  106. query: query vector.
  107. vectors: vectors to calculate distance with
  108. Returns:
  109. distances
  110. """
  111. if len(query.shape) == 1:
  112. return np.sum(np.abs(vectors - query), axis=-1)
  113. else:
  114. return np.sum(np.abs(vectors - query[:, np.newaxis]), axis=-1)
  115. def calculate_distance(
  116. query: types.NumpyArray, vectors: types.NumpyArray, distance_type: models.Distance
  117. ) -> types.NumpyArray:
  118. assert not np.isnan(query).any(), "Query vector must not contain NaN"
  119. if distance_type == models.Distance.COSINE:
  120. return cosine_similarity(query, vectors)
  121. elif distance_type == models.Distance.DOT:
  122. return dot_product(query, vectors)
  123. elif distance_type == models.Distance.EUCLID:
  124. return euclidean_distance(query, vectors)
  125. elif distance_type == models.Distance.MANHATTAN:
  126. return manhattan_distance(query, vectors)
  127. else:
  128. raise ValueError(f"Unknown distance type {distance_type}")
  129. def calculate_distance_core(
  130. query: types.NumpyArray, vectors: types.NumpyArray, distance_type: models.Distance
  131. ) -> types.NumpyArray:
  132. """
  133. Calculate same internal distances as in core, rather than the final displayed distance
  134. """
  135. assert not np.isnan(query).any(), "Query vector must not contain NaN"
  136. if distance_type == models.Distance.EUCLID:
  137. return -np.square(vectors - query, dtype=np.float32).sum(axis=1, dtype=np.float32)
  138. if distance_type == models.Distance.MANHATTAN:
  139. return -np.abs(vectors - query, dtype=np.float32).sum(axis=1, dtype=np.float32)
  140. else:
  141. return calculate_distance(query, vectors, distance_type)
  142. def fast_sigmoid(x: np.float32) -> np.float32:
  143. if np.isnan(x) or np.isinf(x):
  144. # To avoid divisions on NaNs or inf, which gets: RuntimeWarning: invalid value encountered in scalar divide
  145. return x
  146. return x / np.add(1.0, abs(x))
  147. def scaled_fast_sigmoid(x: np.float32) -> np.float32:
  148. return 0.5 * (np.add(fast_sigmoid(x), 1.0))
  149. def calculate_recommend_best_scores(
  150. query: RecoQuery, vectors: types.NumpyArray, distance_type: models.Distance
  151. ) -> types.NumpyArray:
  152. def get_best_scores(examples: list[types.NumpyArray]) -> types.NumpyArray:
  153. vector_count = vectors.shape[0]
  154. # Get scores to all examples
  155. scores: list[types.NumpyArray] = []
  156. for example in examples:
  157. score = calculate_distance_core(example, vectors, distance_type)
  158. scores.append(score)
  159. # Keep only max for each vector
  160. if len(scores) == 0:
  161. scores.append(np.full(vector_count, -np.inf))
  162. best_scores = np.array(scores, dtype=np.float32).max(axis=0)
  163. return best_scores
  164. pos = get_best_scores(query.positive)
  165. neg = get_best_scores(query.negative)
  166. # Choose from best positive or best negative,
  167. # in in both cases we apply sigmoid and then negate depending on the order
  168. return np.where(
  169. pos > neg,
  170. np.fromiter((scaled_fast_sigmoid(xi) for xi in pos), pos.dtype),
  171. np.fromiter((-scaled_fast_sigmoid(xi) for xi in neg), neg.dtype),
  172. )
  173. def calculate_recommend_sum_scores(
  174. query: RecoQuery, vectors: types.NumpyArray, distance_type: models.Distance
  175. ) -> types.NumpyArray:
  176. def get_sum_scores(examples: list[types.NumpyArray]) -> types.NumpyArray:
  177. vector_count = vectors.shape[0]
  178. scores: list[types.NumpyArray] = []
  179. for example in examples:
  180. score = calculate_distance_core(example, vectors, distance_type)
  181. scores.append(score)
  182. if len(scores) == 0:
  183. scores.append(np.zeros(vector_count))
  184. sum_scores = np.array(scores, dtype=np.float32).sum(axis=0)
  185. return sum_scores
  186. pos = get_sum_scores(query.positive)
  187. neg = get_sum_scores(query.negative)
  188. return pos - neg
  189. def calculate_discovery_ranks(
  190. context: list[ContextPair],
  191. vectors: types.NumpyArray,
  192. distance_type: models.Distance,
  193. ) -> types.NumpyArray:
  194. overall_ranks = np.zeros(vectors.shape[0], dtype=np.int32)
  195. for pair in context:
  196. # Get distances to positive and negative vectors
  197. pos = calculate_distance_core(pair.positive, vectors, distance_type)
  198. neg = calculate_distance_core(pair.negative, vectors, distance_type)
  199. pair_ranks = np.array(
  200. [
  201. 1 if is_bigger else 0 if is_equal else -1
  202. for is_bigger, is_equal in zip(pos > neg, pos == neg)
  203. ]
  204. )
  205. overall_ranks += pair_ranks
  206. return overall_ranks
  207. def calculate_discovery_scores(
  208. query: DiscoveryQuery, vectors: types.NumpyArray, distance_type: models.Distance
  209. ) -> types.NumpyArray:
  210. ranks = calculate_discovery_ranks(query.context, vectors, distance_type)
  211. # Get distances to target
  212. distances_to_target = calculate_distance_core(query.target, vectors, distance_type)
  213. sigmoided_distances = np.fromiter(
  214. (scaled_fast_sigmoid(xi) for xi in distances_to_target), np.float32
  215. )
  216. return ranks + sigmoided_distances
  217. def calculate_context_scores(
  218. query: ContextQuery, vectors: types.NumpyArray, distance_type: models.Distance
  219. ) -> types.NumpyArray:
  220. overall_scores = np.zeros(vectors.shape[0], dtype=np.float32)
  221. for pair in query.context_pairs:
  222. # Get distances to positive and negative vectors
  223. pos = calculate_distance_core(pair.positive, vectors, distance_type)
  224. neg = calculate_distance_core(pair.negative, vectors, distance_type)
  225. difference = pos - neg - EPSILON
  226. pair_scores = np.fromiter(
  227. (fast_sigmoid(xi) for xi in np.minimum(difference, 0.0)), np.float32
  228. )
  229. overall_scores += pair_scores
  230. return overall_scores