ElasticSearch 工具

# -*- coding:UTF-8 -*-

# author:user
# contact: test@test.com
# datetime:2021/8/16 14:30
# software: PyCharm

"""
文件说明:
    ES工具
"""


import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch import helpers

class EsUtil(object):
    def __init__(self, ip='127.0.0.1'):
        self.ip = ip
        # 无状态密码登录
        self.es = Elasticsearch([ip], port=9200)
        # 用户名密码状态
        # self.es = Elasticsearch([ip], http_auth=("elastic", "password"), port=9200)

    def create_index(self, index_name, mapping):
        """
        mapping示例:
        "mappings":{
           "properties": {
              "question": {
                  "type": "text",
                  "analyzer": "ik_smart",
                  "search_analyzer": "ik_smart",
                  "index": "true"
                  },
              "answer": {
                  "type": "text",
                  "index": "false"
                  },
              "id": {
              "type": "integer"
                }
              }
            }
        """
        if self.es.indices.exists(index_name):
            raise ValueError(index_name+" 已经存在")
        self.es.indices.create(index=index_name, body=mapping)

    def exit_index(self, index_name):
        return self.es.indices.exists(index_name)

    def delete_index(self, index_name):
        self.es.indices.delete(index=index_name)

    def insert_data(self, index_name, idx, body):
        """
        body = {'question': '法外狂徒-张三','answer':'ssss', 'id': 1}
        """
        return self.es.index(index=index_name, id=idx, body=body)

    def insert_batch_data(self, action):
        """
        action = [{"_index": "repu","_id":"3", "_type":"_doc","_source":{"question": "安大略大","answer":"暗淡看","id":3}},
                  {"_index": "repu", "_id":"4", "_type":"_doc","_source":{"question": "安大略大","answer":"暗淡看","id":4}},
                  {"_index": "repu", "_id":"5", "_type":"_doc","_source":{"question": "安大略大","answer":"暗淡看","id":5}}]
        """
        return helpers.bulk(self.es, action)

    def delete_by_query(self, index_name, body):
        """
        body = {'query': {'match_all': {}}}
        body = {'query': {'match': {'id': 1}}}
        """
        self.es.delete_by_query(index=index_name,body=body)

    def get_data_by_id(self, index_name, idx):
        return self.es.get(index=index_name, id=idx)

    def get_data_by_body(self, index_name, body):
        """
        1. 精确查找 terms
        body = {'query': {'terms': {'question':['张三', '李四']}}}
        2. 查询id和question包含:法外狂徒-张三
        body = {'query': {'multi_match': {'query': '法外狂徒-张三','fields': ['question','id']}}}
        3. match: 匹配question包含 ‘法外狂徒-张三’的所有数据
        body = {'query':{'match': {'question':'法外狂徒-张三'}}}
        4. 精确查找 term  查询question='安大略'的所有数据
        body = {'query': {'term': {'question': '安大略'}}}
        """
        return self.es.search(index=index_name, body=body)

    def update_by_id(self, index_name, idx, body):
        """
        body = {"doc":{"question": "略大","answer":"暗淡看","id":3}}
        """
        return self.es.update(index=index_name,id=idx,body=body)

    def es_info(self):
        return self.es.info()



if __name__ == '__main__':
    obj = EsUtil()
    print(obj.es_info())

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐