• 向量数据库Milvus字符串查询


            因为项目需要,用到了向量数据库Milvus,刚开始都没有遇到问题,直到一个表的主键是字符串(VARCHAR),在查询时刚好要以该主键作为查询条件,此时会出现异常,特此记录一下。

            记住,字符串查询,构建表达式时要加上单引号,比如下面的'{face_id}',其实face_id本来就是一个字符串类型了,如果不加会出现如下的异常:
            # pymilvus.exceptions.MilvusException:

      具体看下面的代码(milvus_demo.py),其中exists()函数中构建查询表达式时做了特殊处理:

    1. from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility, Partition
    2. import time
    3. from datetime import datetime
    4. from typing import List
    5. #用于测试字符串查询的demo
    6. # MILVUS向量数据库地址
    7. MILVUS_HOST_ONLINE = '127.0.0.1'
    8. MILVUS_PORT = 19530
    9. # 检索时返回的匹配内容条数
    10. VECTOR_SEARCH_TOP_K = 100
    11. class MilvusAvatar:
    12. # table_name 表名
    13. # partition_names 分区名,使用默认即可
    14. def __init__(self, mode, table_name, *, partition_names=["default"], threshold=1.1, client_timeout=3):
    15. self.table_name = table_name
    16. self.partition_names = partition_names
    17. self.host = MILVUS_HOST_ONLINE
    18. self.port = MILVUS_PORT
    19. self.client_timeout = client_timeout
    20. self.threshold = threshold
    21. self.sess: Collection = None
    22. self.partitions: List[Partition] = []
    23. self.top_k = VECTOR_SEARCH_TOP_K
    24. self.search_params = {"metric_type": "L2", "params": {"nprobe": 256}}
    25. self.create_params = {"metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 2048}}
    26. self.init()
    27. @property
    28. def fields(self):
    29. fields = [
    30. FieldSchema(name='face_id', dtype=DataType.VARCHAR, max_length=640, is_primary=True, auto_id = False),
    31. FieldSchema(name='media_id', dtype=DataType.INT64),
    32. FieldSchema(name='file_path', dtype=DataType.VARCHAR, max_length=640), #原图片保存路径
    33. FieldSchema(name='name', dtype=DataType.VARCHAR, max_length=640), #姓名
    34. FieldSchema(name='count', dtype=DataType.INT64), #数量
    35. FieldSchema(name='save_path', dtype=DataType.VARCHAR, max_length=640), #现保存的绝对路径,包含文件名
    36. FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=512)
    37. ]
    38. return fields
    39. @property
    40. def output_fields(self):
    41. return ['face_id','media_id', 'file_path', 'name', 'count', 'save_path','embedding']
    42. def init(self):
    43. try:
    44. connections.connect(host=self.host, port=self.port) # timeout=3 [cannot set]
    45. if utility.has_collection(self.table_name):
    46. self.sess = Collection(self.table_name)
    47. print(f'collection {self.table_name} exists')
    48. else:
    49. schema = CollectionSchema(self.fields)
    50. print(f'create collection {self.table_name} {schema}')
    51. self.sess = Collection(self.table_name, schema)
    52. self.sess.create_index(field_name="embedding", index_params=self.create_params)
    53. for index in self.partition_names:
    54. if not self.sess.has_partition(index):
    55. self.sess.create_partition(index)
    56. self.partitions = [Partition(self.sess, index) for index in self.partition_names]
    57. print('partitions: %s', self.partition_names)
    58. self.sess.load()
    59. except Exception as e:
    60. print(e)
    61. def query_expr_sync(self, expr, output_fields=None, client_timeout=None):
    62. if client_timeout is None:
    63. client_timeout = self.client_timeout
    64. if not output_fields:
    65. output_fields = self.output_fields
    66. print(f"MilvusAvatar query_expr_sync:{expr},output_fields:{output_fields}")
    67. print(f"MilvusAvatar num_entities:{self.sess.num_entities}")
    68. if self.sess.num_entities == 0:
    69. return []
    70. return self.sess.query(partition_names=self.partition_names,
    71. output_fields=output_fields,
    72. expr=expr,
    73. _async= False,
    74. offset=0,
    75. limit=1000)
    76. # emb 为一个人脸特征向量
    77. def insert_avatar_sync(self, face_id, media_id, file_path, name, save_path, embedding):
    78. print(f'now insert_avatar {file_path}')
    79. print(f'now insert_avatar {file_path}')
    80. data = [[] for _ in range(len(self.sess.schema))]
    81. data[0].append(face_id)
    82. data[1].append(media_id)
    83. data[2].append(file_path)
    84. data[3].append(name)
    85. data[4].append(1)
    86. data[5].append(save_path)
    87. data[6].append(embedding)
    88. # 执行插入操作
    89. try:
    90. print('Inserting into Milvus...')
    91. self.partitions[0].insert(data=data)
    92. print(f'{file_path}')
    93. print(f"MilvusAvatar insert_avatar num_entities:{self.sess.num_entities}")
    94. except Exception as e:
    95. print(f'Milvus insert media_id:{media_id}, file_path:{file_path} failed: {e}')
    96. print(f'Milvus insert media_id:{media_id}, file_path:{file_path} failed: {e}')
    97. return False
    98. return True
    99. # embs是一个数组
    100. def search_emb_sync(self, embs, expr='', top_k=None, client_timeout=None):
    101. if self.sess is None:
    102. return None
    103. if not top_k:
    104. top_k = self.top_k
    105. milvus_records = self.sess.search(data=embs, partition_names=self.kb_ids, anns_field="embedding",
    106. param=self.search_params, limit=top_k,
    107. output_fields=self.output_fields, expr=expr, timeout=client_timeout)
    108. print(f"milvus_records:{milvus_records}")
    109. return milvus_records
    110. def exists(self,face_id):
    111. print(f"exists:{face_id},{type(face_id)}")
    112. # 记住,字符串查询,构建表达式时要加上单引号,比如下面的'{face_id}',其实face_id本来就是一个字符串类型了,如果不加会出现如下的异常:
    113. # pymilvus.exceptions.MilvusException:
    114. res = self.query_expr_sync(expr=f"face_id == '{face_id}'", output_fields=self.output_fields)
    115. #print(f"exists:{res},{len(res)}")
    116. if len(res) > 0:
    117. return True
    118. return False
    119. # 修改照片数
    120. def add_count(self, face_id):
    121. res = self.query_expr_sync(expr=f"face_id == '{face_id}'", output_fields=self.output_fields)
    122. self.sess.delete(expr=f"face_id == '{face_id}'")
    123. for result in res:
    124. media_id = result['media_id']
    125. file_path = result['file_path']
    126. name = result['name']
    127. count = int(result['count'])
    128. save_path = result['save_path']
    129. embedding = result['embedding']
    130. data = [[] for _ in range(len(self.sess.schema))]
    131. data[0].append(face_id)
    132. data[1].append(media_id)
    133. data[2].append(file_path)
    134. data[3].append(name)
    135. data[4].append(count + 1)
    136. data[5].append(save_path)
    137. data[6].append(embedding)
    138. print(f"add_count face_id:{face_id},file_path:{file_path}, count:{count}")
    139. # 执行插入操作
    140. try:
    141. print('Inserting into Milvus...')
    142. self.partitions[0].insert(data=data)
    143. except Exception as e:
    144. print(f'Milvus insert media_id:{media_id}, file_path:{file_path} failed: {e}')
    145. return False
    146. def delete_collection(self):
    147. print("delete_collection")
    148. self.sess.release()
    149. utility.drop_collection(self.table_name)
    150. def delete_partition(self, partition_name):
    151. print("delete_partition")
    152. part = Partition(self.sess, partition_name)
    153. part.release()
    154. self.sess.drop_partition(partition_name)
    155. def query_all(self,limit=None):
    156. res = self.sess.query(partition_names = self.partition_names,
    157. output_fields = ["face_id","media_id", "name", "count", "save_path"],
    158. expr= f"face_id != ''",
    159. _async = False,
    160. offset = 0,
    161. limit = None)
    162. print(res)
    163. return res
    164. if __name__ == "__main__":
    165. milvus_avatar= MilvusAvatar("local", "avatar", partition_names=["avatar"])
    166. media_id = 2
    167. index = 0
    168. face_id = f"{media_id}_{index}"
    169. file_path = "/home/data/bbh.jpg"
    170. save_path = "/home/data/bbh_avatar.jpg"
    171. embedding = [i/1000 for i in range(512)]
    172. milvus_avatar.insert_avatar_sync(face_id, media_id, file_path, "bbh", save_path, embedding)
    173. #result = milvus_avatar.query_all()
    174. #print(result)
    175. print(milvus_avatar.exists(face_id))

    执行:python milvus_demo.py

    如果是针对非字符串字段进行查询,则无需做上面的特殊处理。

  • 相关阅读:
    数据库系统原理题-期末
    COMSOL Multiphysics在复合砌块热湿传递仿真中的应用
    LeetCode 环形链表 II(C语言)
    CSS 的盒子Day03(2)
    NLP - 使用 transformers 翻译
    【Docker】redis分片集群搭建:3主3从,容错迁移,扩缩容
    Fast way to filter a file line by line and save as new format in a new file
    『FastTunnel』荣获GVP的开源内网穿透工具,动手搭建属于自己的内网穿透平台
    react +antd table 滚动事件实现防抖
    H12-821_29
  • 原文地址:https://blog.csdn.net/happyweb/article/details/136220250