手把手教你用Python搭建不可篡改的档案溯源系统

一、系统设计核心原理

本系统采用哈希链技术实现档案溯源。核心逻辑是将每一个新生成的档案内容的哈希值,作为下一个档案记录的“前置哈希”字段存储。如果历史档案被篡改,其哈希值必然改变,导致与后续档案中的“前置哈希”不匹配,从而通过算法自动定位被篡改的环节。我们将使用 Python 的 Flask 框架作为 Web 服务,SQLite 作为轻量级数据库,无需任何复杂配置即可运行。

二、环境准备与依赖安装

在开始编写代码前,需要确保你的电脑上已经安装了 Python 环境(建议版本 3.8 及以上)。我们将创建一个独立的项目目录来存放所有文件,保持项目结构清晰。

1. 创建项目目录

打开终端或命令行工具,依次执行以下命令来创建项目文件夹并进入该目录:

```bash mkdir archive_traceability cd archive_traceability ```

2. 安装依赖库

本系统仅依赖 Flask 框架来处理 Web 请求和文件上传。请执行以下命令进行安装:

```bash pip install flask ``>

安装完成后,为了后续能够存储上传的文件,我们需要在项目根目录下创建一个名为 uploads 的文件夹:

```bash mkdir uploads ```

三、后端核心代码实现

在项目根目录下新建一个名为 app.py 的文件。这个文件将包含数据库初始化、哈希计算、文件上传处理以及核心的溯源验证逻辑。请将以下完整代码直接复制到文件中。

```python import os import sqlite3 import hashlib import datetime from flask import Flask, request, render_template, redirect, url_for, flash app = Flask(__name__) app.secret_key = 'secret_key_for_flash_messages' app.config['UPLOAD_FOLDER'] = 'uploads' 数据库初始化函数 def init_db(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() 创建档案表,包含ID、文件名、内容哈希、前置哈希、时间戳 cursor.execute(''' CREATE TABLE IF NOT EXISTS archives ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, content_hash TEXT NOT NULL, prev_hash TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() 计算文件内容的SHA256哈希值 def calculate_hash(file_path): sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() 获取链中最新一个节点的哈希值 def get_last_hash(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('SELECT content_hash FROM archives ORDER BY id DESC LIMIT 1') result = cursor.fetchone() conn.close() if result: return result[0] return "0" 创世区块的前置哈希为0 @app.route('/') def index(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() 查询所有档案记录按时间倒序排列 cursor.execute('SELECT FROM archives ORDER BY id DESC') archives = cursor.fetchall() conn.close() return render_template('index.html', archives=archives) @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: flash('没有文件部分') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件') return redirect(request.url) if file: filename = file.filename file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) 核心溯源逻辑:计算当前文件哈希并获取上一个哈希 current_hash = calculate_hash(file_path) prev_hash = get_last_hash() 存入数据库 conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('INSERT INTO archives (filename, content_hash, prev_hash) VALUES (?, ?, ?)', (filename, current_hash, prev_hash)) conn.commit() conn.close() flash('档案上传并上链成功!') return redirect(url_for('index')) @app.route('/verify') def verify_chain(): conn = sqlite3.connect('traceability.db') cursor = conn.cursor() cursor.execute('SELECT id, content_hash, prev_hash FROM archives ORDER BY id ASC') rows = cursor.fetchall() conn.close() is_valid = True broken_link_id = None prev_hash = "0" 遍历所有记录验证哈希链完整性 for row in rows: curr_id, curr_hash, stored_prev_hash = row 验证当前记录存储的前置哈希是否等于上一条记录的实际哈希 if stored_prev_hash != prev_hash: is_valid = False broken_link_id = curr_id break prev_hash = curr_hash if is_valid: flash('【验证通过】所有档案未被篡改,溯源链完整。') else: flash(f'【验证失败】检测到篡改!断链位置出现在ID为 {broken_link_id} 的档案处。') return redirect(url_for('index')) if __name__ == '__main__': if not os.path.exists('traceability.db'): init_db() app.run(debug=True, port=5000) ```

四、前端页面实现

手把手教你用Python搭建不可篡改的档案溯源系统

为了方便操作,我们需要创建一个简单的 HTML 页面。在项目根目录下创建一个名为 templates 的文件夹,然后在其中新建一个名为 index.html 的文件。注意:Flask 默认会在 templates 文件夹下寻找 HTML 文件。

```html 档案溯源系统 档案溯源管理系统 {% with messages = get_flashed_messages() %} {% if messages %} {% for message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}

上传新档案

档案列表

{% for archive in archives %} {% endfor %}
ID 文件名 本文件哈希 前置哈希 时间
{{ archive[0] }} {{ archive[1] }} {{ archive[2] }} {{ archive[3] }} {{ archive[4] }}
执行全网完整性校验 ```

五、系统运行与实操测试

代码和页面准备就绪后,我们启动系统并进行真实的防篡改测试。

1. 启动服务

在终端中确保位于项目根目录,执行以下命令启动 Web 服务:

```bash python app.py ```

当看到 Running on http://127.0.0.1:5000 提示时,说明服务已启动。打开浏览器访问该地址。

2. 正常上传测试

  1. 在浏览器页面点击“选择文件”,上传一个任意文件(例如 contract_v1.pdf),点击上传。
  2. 观察列表,你会发现该记录的“前置哈希”为 0,这是创世区块。
  3. 再次上传第二个文件(例如 contract_v2.pdf)。
  4. 观察第二条记录,它的“前置哈希”字段应该完全等于第一条记录的“本文件哈希”。此时链条已形成。

3. 模拟篡改攻击

这是最关键的一步,我们将模拟黑客修改了数据库中的历史文件,看系统是否能发现。

  1. 停止 Python 服务(在终端按 Ctrl+C)。
  2. 使用任意文本编辑器或 SQLite 管理工具打开项目目录下的 traceability.db 文件。如果没有工具,可以使用命令行操作:
```bash sqlite3 traceability.db ```
  1. 进入 SQLite 命令行后,执行以下 SQL 语句来模拟修改第一个文件的哈希值(这是黑客篡改文件后的结果):
```sql UPDATE archives SET content_hash = 'fake_tampered_hash_data_12345' WHERE id = 1; ```
  1. 输入 .quit 退出数据库。
  2. 重新启动 Python 服务:python app.py
  3. 刷新浏览器页面,点击底部的红色按钮 “执行全网完整性校验”

4. 观察验证结果

此时页面顶部应出现红色的警告框:【验证失败】检测到篡改!断链位置出现在ID为 2 的档案处。

这就证明了系统的有效性:虽然我们修改了 ID 为 1 的数据,但系统在检查 ID 为 2 的数据时,发现 ID 为 2 的“前置哈希”与数据库中 ID 为 1 的“本文件哈希”不一致,从而成功定位了安全漏洞。这就是档案溯源技术的核心价值。

AI咨询
热线电话

028-85154420

15388110056

全国售前咨询电话

扫码咨询
安答联动微信公众号二维码

微信扫码关注安答联动

申请试用
热线电话
申请试用

安答联动档案管理系统