基于Elasticsearch搭建档案检索系统全流程实操

一、环境准备与基础组件安装

在开始构建档案检索系统之前,我们需要准备好运行环境。为了保证环境的纯净和部署的便捷性,本指南采用 Docker 部署 Elasticsearch,使用 Python 作为后端开发语言。

1. 部署 Elasticsearch 服务

打开终端,直接执行以下 Docker 命令。该命令会下载并启动一个单节点的 Elasticsearch 8.11.0 实例,同时关闭了安全认证(仅用于本地开发环境,生产环境严禁关闭),并暴露了 9200 端口。

注意:请确保你的机器已安装 Docker 且内存至少分配给 2GB。

docker run -d \
--name es-archive \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:8.11.0

等待约 30 秒让服务完全启动,输入 curl http://localhost:9200,若返回包含 JSON 数据的响应,则说明服务启动成功。

2. 安装 Python 依赖库

在本地项目目录中,我们需要创建一个虚拟环境并安装必要的 Python 库。我们将使用 flask 提供 Web 接口,使用 elasticsearch 官方库连接数据库。

mkdir archive-search && cd archive-search
pip install flask elasticsearch requests

二、定义档案索引结构

档案数据通常包含标题、正文内容、创建时间、文件类型等字段。为了实现高效的全文检索,我们需要在 Elasticsearch 中创建一个明确的索引映射。

创建一个名为 init_index.py 的文件,并写入以下代码。这段代码会连接 Elasticsearch,删除旧索引(如果存在),并创建一个名为 archives 的新索引。

from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
定义索引映射结构
index_mappings = {
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"content": {
"type": "text",
"analyzer": "ik_max_word"
},
"category": {
"type": "keyword"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"file_type": {
"type": "keyword"
}
}
}
}
if es.indices.exists(index='archives'):
es.indices.delete(index='archives')
es.indices.create(index='archives', body=index_mappings)
print("索引 archives 创建成功")

执行初始化:在终端运行 python init_index.py。如果提示 ik_max_word 分析器找不到,请先在 ES 中安装 IK 分词器插件。为简化本指南操作,上述配置使用了标准分词器(若未安装 IK),你可以将 analyzer 字段删除,ES 将默认使用 standard 分析器,虽然中文分词效果稍弱,但足以运行系统。

三、模拟档案数据写入

有了索引结构,接下来我们需要写入一些测试数据。在实际生产中,这些数据通常来自数据库同步或文件解析。这里我们编写一个脚本来模拟生成 20 条档案数据。

创建文件 insert_data.py

import random
import time
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
categories = ["人事档案", "财务报表", "技术文档", "会议纪要"]
file_types = ["PDF", "DOCX", "TXT"]
模拟生成数据
actions = []
base_time = datetime.now()
for i in range(1, 21):
category = random.choice(categories)
file_type = random.choice(file_types)
随机生成过去一年内的时间
create_time = base_time - timedelta(days=random.randint(0, 365))
time_str = create_time.strftime("%Y-%m-%d %H:%M:%S")
doc = {
"title": f"{category}关于{2023+i}年度项目规划的记录_{i}",
"content": f"这是关于{category}的详细记录内容。文档编号是{i},包含了核心数据的分析与总结。需要重点关注后续的执行进度。",
"category": category,
"file_type": file_type,
"create_time": time_str
}
指定索引ID,方便后续管理
es.index(index="archives", id=i, document=doc)
print("20条模拟档案数据写入完成")

执行 python insert_data.py 完成数据填充。

四、开发后端检索接口

基于Elasticsearch搭建档案检索系统全流程实操

现在开始构建核心的检索服务。我们将使用 Flask 创建一个简单的 API,接收前端的查询关键词,并在 Elasticsearch 中执行复合查询(同时匹配标题和内容),并开启高亮显示。

创建文件 app.py

from flask import Flask, request, jsonify, render_template_string
from elasticsearch import Elasticsearch
app = Flask(__name__)
es = Elasticsearch("http://localhost:9200")
@app.route('/')
def index():
简单的前端页面代码,直接返回渲染
return render_template_string(open('search.html', encoding='utf-8').read())
@app.route('/search')
def search():
keyword = request.args.get('q', '').strip()
page = int(request.args.get('page', 1))
page_size = 5
if not keyword:
return jsonify({"total": 0, "data": []})
构建查询体
query_body = {
"query": {
"bool": {
"should": [
{"match": {"title": {"query": keyword, "boost": 2}}},
{"match": {"content": {"query": keyword}}}
]
}
},
"highlight": {
"pre_tags": [""],
"post_tags": [""],
"fields": {
"title": {},
"content": {"fragment_size": 150}
}
},
"from": (page - 1)  page_size,
"size": page_size,
"sort": [{"_score": {"order": "desc"}}]
}
try:
resp = es.search(index="archives", body=query_body)
total = resp['hits']['total']['value']
hits = resp['hits']['hits']
results = []
for hit in hits:
source = hit['_source']
处理高亮结果,如果无高亮则显示原字段
highlight = hit.get('highlight', {})
results.append({
"id": hit['_id'],
"title": highlight.get('title', [source['title']])[0],
"content": highlight.get('content', [source['content']])[0],
"category": source['category'],
"create_time": source['create_time'],
"file_type": source['file_type']
})
return jsonify({"total": total, "data": results})
except Exception as e:
return jsonify({"error": str(e), "total": 0, "data": []})
if __name__ == '__main__':
app.run(debug=True, port=5000)

五、构建前端交互页面

为了让读者能直接看到效果,我们创建一个单页面的 HTML 文件 search.html。这个页面包含一个搜索框、一个结果列表,以及调用后端 API 的 JavaScript 逻辑。

在同级目录下创建 search.html





企业档案检索系统



档案检索系统

六、系统运行与验证

所有代码已准备就绪,现在启动系统并进行验证。

1. 启动 Flask 服务

在终端执行以下命令启动 Web 服务:

python app.py

看到输出 Running on http://127.0.0.1:5000 即表示启动成功。

2. 访问与测试

打开浏览器,访问 http://127.0.0.1:5000。页面加载后会自动执行一次默认搜索(查询“项目规划”)。

  • 测试高亮:在输入框输入“财务”或“会议”,点击搜索。结果列表中,匹配到的文字背景会变红,且会显示包含关键词的上下文片段。
  • 测试分词:输入“项目”或“规划”,验证是否能检索到标题中包含这两个词的文档。
  • 测试空值:清空输入框点击搜索,应不返回任何数据或提示为空。

3. 常见问题排查

如果运行 python app.py 时报错 ConnectionRefusedError,请检查 Docker 容器中的 Elasticsearch 是否已完全启动,可以使用 docker ps 查看容器状态,或使用 docker logs es-archive 查看启动日志。

通过以上步骤,你已经从零开始搭建了一个具备中文分词、高亮显示、关键词匹配功能的轻量级档案检索系统。该架构可直接扩展为连接 MySQL 数据源或集成 PDF 解析器,用于处理真实的文件档案业务。

AI咨询
热线电话

028-85154420

15388110056

全国售前咨询电话

扫码咨询
安答联动微信公众号二维码

微信扫码关注安答联动

申请试用
热线电话
申请试用

安答联动档案管理系统