构建安全文书档案外包管理系统的技术实操全案

系统架构设计思路

为了实现文书档案管理外包的数字化与安全化,我们将采用 B/S 架构。后端选用 Python 的 FastAPI 框架,因为它具备高性能、异步支持以及自动生成 API 文档的能力,非常适合快速构建企业级接口。数据存储方面,使用 SQLite 数据库,它无需配置独立的服务器环境,文件级存储便于备份与迁移。文件存储将采用操作系统本地目录结构,通过严格的权限校验逻辑实现不同外包商之间的数据隔离。

本系统将实现以下核心功能模块:

  • 档案上传模块:供内部人员上传原始文书档案。
  • 任务分发模块:将档案分配给特定的外包商。
  • 外包工作台:外包商查看任务、下载档案、上传处理结果。
  • 审计日志模块:记录所有关键操作,确保数据流转可追溯。

环境准备与依赖安装

在开始编码前,请确保你的操作系统已安装 Python 3.8 或更高版本。为了避免环境污染,我们首先创建一个独立的项目目录并安装必要的依赖包。

打开终端(Terminal 或 CMD),依次执行以下命令:

1. 创建项目目录并进入

```bash mkdir archive_outsourcing_system cd archive_outsourcing_system ```

2. 创建虚拟环境(推荐操作)

```bash python -m venv venv ```

3. 激活虚拟环境

Windows 系统执行:

```bash venv\Scripts\activate ```

MacOS 或 Linux 系统执行:

```bash source venv/bin/activate ```

4. 安装核心依赖库

我们需要安装 FastAPI 框架、Uvicorn 服务器(ASGI)、Python-Multipart(用于处理文件上传)以及 aiofiles(用于异步文件操作)。

```bash pip install fastapi uvicorn[standard] python-multipart aiofiles ```

数据库模型设计

为了保持系统零门槛且易于部署,我们将数据库逻辑直接封装在启动脚本中。系统需要两张核心表:documents(档案表)和 operation_logs(审计日志表)。

在项目根目录下创建一个名为 main.py 的文件,并输入以下代码。这段代码包含了数据库初始化、Pydantic 模型定义以及辅助函数。

```python import sqlite3 import os import shutil import uuid from datetime import datetime from typing import Optional, List from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Header from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel 配置常量 DATABASE_FILE = "archive_system.db" UPLOAD_DIR = "uploaded_files" PROCESSED_DIR = "processed_files" os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True) app = FastAPI(title="文书档案外包管理系统") 数据库初始化 def init_db(): conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() 档案表:记录文件信息、状态及归属 cursor.execute(''' CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, file_path TEXT NOT NULL, status TEXT DEFAULT 'pending', assigned_vendor TEXT, upload_time TEXT, processed_path TEXT ) ''') 日志表:记录谁在什么时候做了什么 cursor.execute(''' CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, action TEXT, doc_id TEXT, timestamp TEXT ) ''') conn.commit() conn.close() 启动时自动初始化数据库 @app.on_event("startup") def startup_event(): init_db() 数据模型 class DocumentInfo(BaseModel): id: str original_filename: str status: str assigned_vendor: Optional[str] = None upload_time: str class AssignRequest(BaseModel): doc_id: str vendor_id: str class ProcessResult(BaseModel): doc_id: str result_comment: str 辅助函数 def log_action(user_id: str, action: str, doc_id: str): conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute("INSERT INTO logs (user_id, action, doc_id, timestamp) VALUES (?, ?, ?, ?)", (user_id, action, doc_id, datetime.now().isoformat())) conn.commit() conn.close() ```

核心功能接口实现

接下来,我们将在 main.py 文件中继续添加核心业务逻辑的接口代码。请将以下代码追加到文件末尾。

1. 内部端:档案上传与任务分配

此接口允许内部管理员上传文件,并将任务指派给特定的外包商(通过 vendor_id 标识,例如 "vendor_001")。

```python 接口:上传档案 @app.post("/api/admin/upload") async def upload_document( file: UploadFile = File(...), vendor_id: str = Form(...), admin_token: str = Header(...) ): 简单的安全校验:实际生产中应使用更复杂的 JWT 或 OAuth if admin_token != "ADMIN_SECRET_KEY": raise HTTPException(status_code=401, detail="无权操作") 生成唯一文件名 file_id = str(uuid.uuid4()) file_ext = os.path.splitext(file.filename)[1] save_path = os.path.join(UPLOAD_DIR, f"{file_id}{file_ext}") 保存文件 try: with open(save_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) except Exception as e: raise HTTPException(status_code=500, detail=f"文件保存失败: {str(e)}") 写入数据库 conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() upload_time = datetime.now().isoformat() cursor.execute( "INSERT INTO documents (id, original_filename, file_path, status, assigned_vendor, upload_time) VALUES (?, ?, ?, ?, ?, ?)", (file_id, file.filename, save_path, "assigned", vendor_id, upload_time) ) conn.commit() conn.close() 记录日志 log_action("admin", "upload_and_assign", file_id) return {"message": "上传成功", "doc_id": file_id, "assigned_to": vendor_id} 接口:查看所有档案状态 @app.get("/api/admin/documents", response_model=List[DocumentInfo]) def get_all_documents(admin_token: str = Header(...)): if admin_token != "ADMIN_SECRET_KEY": raise HTTPException(status_code=401, detail="无权操作") conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute("SELECT id, original_filename, status, assigned_vendor, upload_time FROM documents") rows = cursor.fetchall() conn.close() return [ DocumentInfo( id=row[0], original_filename=row[1], status=row[2], assigned_vendor=row[3], upload_time=row[4] ) for row in rows ] ```

2. 外包端:任务获取与结果回传

外包商通过此接口查看分配给自己的任务,并下载原始文件。处理完成后,上传结果文件。

```python 接口:外包商获取任务列表 @app.get("/api/vendor/tasks", response_model=List[DocumentInfo]) def get_vendor_tasks(vendor_id: str = Header(...)): conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() 只查询分配给当前 vendor_id 且未完成的任务 cursor.execute( "SELECT id, original_filename, status, assigned_vendor, upload_time FROM documents WHERE assigned_vendor = ? AND status != 'completed'", (vendor_id,) ) rows = cursor.fetchall() conn.close() return [ DocumentInfo( id=row[0], original_filename=row[1], status=row[2], assigned_vendor=row[3], upload_time=row[4] ) for row in rows ] 接口:下载原始档案 @app.get("/api/vendor/download/{doc_id}") def download_file(doc_id: str, vendor_id: str = Header(...)): conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute("SELECT file_path, assigned_vendor FROM documents WHERE id = ?", (doc_id,)) row = cursor.fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail="文档不存在") file_path, assigned_vendor = row 权限校验:只能下载分配给自己的文件 if assigned_vendor != vendor_id: log_action(vendor_id, "unauthorized_access_attempt", doc_id) raise HTTPException(status_code=403, detail="无权访问此文档") if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="物理文件丢失") log_action(vendor_id, "download", doc_id) return FileResponse(path=file_path, filename=os.path.basename(file_path)) 接口:上传处理结果 @app.post("/api/vendor/submit_result") async def submit_result( doc_id: str = Form(...), result_file: UploadFile = File(...), vendor_id: str = Header(...) ): 1. 验证任务归属 conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute("SELECT id, assigned_vendor FROM documents WHERE id = ?", (doc_id,)) row = cursor.fetchone() if not row or row[1] != vendor_id: conn.close() raise HTTPException(status_code=403, detail="任务不存在或无权操作") 2. 保存结果文件 result_ext = os.path.splitext(result_file.filename)[1] result_save_path = os.path.join(PROCESSED_DIR, f"{doc_id}_result{result_ext}") try: with open(result_save_path, "wb") as buffer: shutil.copyfileobj(result_file.file, buffer) except Exception as e: conn.close() raise HTTPException(status_code=500, detail=f"结果保存失败: {str(e)}") 3. 更新数据库状态 cursor.execute( "UPDATE documents SET status = 'completed', processed_path = ? WHERE id = ?", (result_save_path, doc_id) ) conn.commit() conn.close() log_action(vendor_id, "submit_result", doc_id) return {"message": "结果提交成功", "status": "completed"} ```

系统启动与全链路测试

构建安全文书档案外包管理系统的技术实操全案

代码编写完成后,我们无需任何额外的 Web 服务器配置,直接利用 Uvicorn 启动服务。

1. 启动服务

在终端执行:

```bash uvicorn main:app --reload --host 0.0.0.0 --port 8000 ```

看到 Uvicorn running on http://0.0.0.0:8000 提示即表示启动成功。此时访问 http://localhost:8000/docs 可以看到自动生成的 Swagger API 文档界面。

2. 模拟内部管理员上传档案

我们使用 curl 命令模拟上传一个名为 contract.pdf 的文件,并将其分配给外包商 vendor_001。注意 Header 中的 ADMIN_SECRET_KEY 是我们在代码中硬编码的简单凭证。

```bash curl -X POST "http://localhost:8000/api/admin/upload" \ -H "accept: application/json" \ -H "admin-token: ADMIN_SECRET_KEY" \ -H "Content-Type: multipart/form-data" \ -F "file=@/path/to/your/contract.pdf" \ -F "vendor_id=vendor_001" ```

执行成功后,请记下返回的 JSON 中的 "doc_id",后续测试需要用到。

3. 模拟外包商查看任务

外包商 vendor_001 请求任务列表:

```bash curl -X GET "http://localhost:8000/api/vendor/tasks" \ -H "vendor-id: vendor_001" ```

你将看到刚才上传的文档信息。如果换一个 Header 中的 vendor-id,将无法看到该文档,从而验证了数据隔离的有效性。

4. 模拟外包商提交处理结果

假设外包商处理完毕,需要上传结果文件(例如 contract_processed.pdf):

```bash curl -X POST "http://localhost:8000/api/vendor/submit_result" \ -H "vendor-id: vendor_001" \ -F "doc_id=刚才获取的doc_id" \ -F "result_file=@/path/to/your/contract_processed.pdf" ```

5. 验证系统完整性

再次调用管理员查看接口:

```bash curl -X GET "http://localhost:8000/api/admin/documents" \ -H "admin-token: ADMIN_SECRET_KEY" ```

你会发现对应文档的 status 字段已变为 completed。此时,你可以检查项目目录下的 processed_files 文件夹,里面应该已经存在了外包商上传的结果文件。

实操注意事项总结

以上代码构建了一个最小可行性产品(MVP)级别的文书档案外包管理系统。在实际落地部署时,请务必关注以下技术细节:

  • 安全性增强:示例中的 Header Token 校验仅用于演示。在生产环境中,必须替换为 OAuth2.0 或 JWT 认证机制,并开启 HTTPS (SSL) 加密传输,防止档案内容在网络传输中被窃听。
  • 文件存储优化:当前使用本地文件系统。当档案量达到百万级时,建议接入 MinIO 或阿里云 OSS 等对象存储服务,以解决磁盘扩容和高并发读写问题。
  • 并发控制:FastAPI 自带异步支持,但 SQLite 在极高并发写入下可能存在锁竞争。如果外包团队规模超过百人,建议将数据库替换为 PostgreSQL 或 MySQL。
AI咨询
热线电话

028-85154420

15388110056

全国售前咨询电话

扫码咨询
安答联动微信公众号二维码

微信扫码关注安答联动

申请试用
热线电话
申请试用

安答联动档案管理系统