手把手教你用Python搭建不可篡改的档案溯源系统
一、系统设计核心原理
本系统采用哈希链技术实现档案溯源。核心逻辑是将每一个新生成的档案内容的哈希值,作为下一个档案记录的“前置哈希”字段存储。如果历史档案被篡改,其哈希值必然改变,导致与后续档案中的“前置哈希”不匹配,从而通过算法自动定位被篡改的环节。我们将使用 Python 的 Flask 框架作为 Web 服务,SQLite 作为轻量级数据库,无需任何复杂配置即可运行。
二、环境准备与依赖安装
在开始编写代码前,需要确保你的电脑上已经安装了 Python 环境(建议版本 3.8 及以上)。我们将创建一个独立的项目目录来存放所有文件,保持项目结构清晰。
1. 创建项目目录
打开终端或命令行工具,依次执行以下命令来创建项目文件夹并进入该目录:
```bash mkdir archive_traceability cd archive_traceability ```2. 安装依赖库
本系统仅依赖 Flask 框架来处理 Web 请求和文件上传。请执行以下命令进行安装:
```bash pip install flask ``>安装完成后,为了后续能够存储上传的文件,我们需要在项目根目录下创建一个名为 uploads 的文件夹:
```bash mkdir uploads ```三、后端核心代码实现
在项目根目录下新建一个名为 app.py 的文件。这个文件将包含数据库初始化、哈希计算、文件上传处理以及核心的溯源验证逻辑。请将以下完整代码直接复制到文件中。
```python import os import sqlite3 import hashlib import datetime from flask import Flask, request, render_template, redirect, url_for, flash app = Flask(__name__) app.secret_key = 'secret_key_for_flash_messages' app.config['UPLOAD_FOLDER'] = 'uploads' 数据库初始化函数 def init_db(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() 创建档案表,包含ID、文件名、内容哈希、前置哈希、时间戳 cursor.execute(''' CREATE TABLE IF NOT EXISTS archives ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, content_hash TEXT NOT NULL, prev_hash TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() 计算文件内容的SHA256哈希值 def calculate_hash(file_path): sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() 获取链中最新一个节点的哈希值 def get_last_hash(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('SELECT content_hash FROM archives ORDER BY id DESC LIMIT 1') result = cursor.fetchone() conn.close() if result: return result[0] return "0" 创世区块的前置哈希为0 @app.route('/') def index(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() 查询所有档案记录按时间倒序排列 cursor.execute('SELECT FROM archives ORDER BY id DESC') archives = cursor.fetchall() conn.close() return render_template('index.html', archives=archives) @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: flash('没有文件部分') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件') return redirect(request.url) if file: filename = file.filename file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) 核心溯源逻辑:计算当前文件哈希并获取上一个哈希 current_hash = calculate_hash(file_path) prev_hash = get_last_hash() 存入数据库 conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('INSERT INTO archives (filename, content_hash, prev_hash) VALUES (?, ?, ?)', (filename, current_hash, prev_hash)) conn.commit() conn.close() flash('档案上传并上链成功!') return redirect(url_for('index')) @app.route('/verify') def verify_chain(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('SELECT id, content_hash, prev_hash FROM archives ORDER BY id ASC') rows = cursor.fetchall() conn.close() is_valid = True broken_link_id = None prev_hash = "0" 遍历所有记录验证哈希链完整性 for row in rows: curr_id, curr_hash, stored_prev_hash = row 验证当前记录存储的前置哈希是否等于上一条记录的实际哈希 if stored_prev_hash != prev_hash: is_valid = False broken_link_id = curr_id break prev_hash = curr_hash if is_valid: flash('【验证通过】所有档案未被篡改,溯源链完整。') else: flash(f'【验证失败】检测到篡改!断链位置出现在ID为 {broken_link_id} 的档案处。') return redirect(url_for('index')) if __name__ == '__main__': if not os.path.exists('traceability.db'): init_db() app.run(debug=True, port=5000) ```四、前端页面实现

为了方便操作,我们需要创建一个简单的 HTML 页面。在项目根目录下创建一个名为 templates 的文件夹,然后在其中新建一个名为 index.html 的文件。注意:Flask 默认会在 templates 文件夹下寻找 HTML 文件。
```html上传新档案
档案列表
| ID | 文件名 | 本文件哈希 | 前置哈希 | 时间 |
|---|---|---|---|---|
| {{ archive[0] }} | {{ archive[1] }} | {{ archive[2] }} | {{ archive[3] }} | {{ archive[4] }} |
五、系统运行与实操测试
代码和页面准备就绪后,我们启动系统并进行真实的防篡改测试。
1. 启动服务
在终端中确保位于项目根目录,执行以下命令启动 Web 服务:
```bash python app.py ```当看到 Running on http://127.0.0.1:5000 提示时,说明服务已启动。打开浏览器访问该地址。
2. 正常上传测试
- 在浏览器页面点击“选择文件”,上传一个任意文件(例如 contract_v1.pdf),点击上传。
- 观察列表,你会发现该记录的“前置哈希”为 0,这是创世区块。
- 再次上传第二个文件(例如 contract_v2.pdf)。
- 观察第二条记录,它的“前置哈希”字段应该完全等于第一条记录的“本文件哈希”。此时链条已形成。
3. 模拟篡改攻击
这是最关键的一步,我们将模拟黑客修改了数据库中的历史文件,看系统是否能发现。
- 停止 Python 服务(在终端按 Ctrl+C)。
- 使用任意文本编辑器或 SQLite 管理工具打开项目目录下的 traceability.db 文件。如果没有工具,可以使用命令行操作:
- 进入 SQLite 命令行后,执行以下 SQL 语句来模拟修改第一个文件的哈希值(这是黑客篡改文件后的结果):
- 输入 .quit 退出数据库。
- 重新启动 Python 服务:
python app.py。 - 刷新浏览器页面,点击底部的红色按钮 “执行全网完整性校验”。
4. 观察验证结果
此时页面顶部应出现红色的警告框:【验证失败】检测到篡改!断链位置出现在ID为 2 的档案处。
这就证明了系统的有效性:虽然我们修改了 ID 为 1 的数据,但系统在检查 ID 为 2 的数据时,发现 ID 为 2 的“前置哈希”与数据库中 ID 为 1 的“本文件哈希”不一致,从而成功定位了安全漏洞。这就是档案溯源技术的核心价值。