档案数据加密管理软件实操指南:从零到一构建安全防护系统

一、核心需求与方案选择

档案数据加密管理需要解决三个核心问题:存储加密、传输加密、权限控制。我们将采用开源方案构建完整系统。

1.1 技术栈选型

基于成熟度与安全性考虑,选择以下技术栈:

  • 存储加密:使用AES-256-GCM算法,支持硬件加速
  • 密钥管理:使用HashiCorp Vault或AWS KMS
  • 传输加密:TLS 1.3协议
  • 权限控制:基于角色的访问控制(RBAC)
  • 数据库:PostgreSQL 13+,启用透明数据加密

二、环境准备与基础配置

2.1 系统环境要求

确保服务器满足以下最低配置:

  • 操作系统:Ubuntu 20.04 LTS或CentOS 8
  • 内存:8GB以上
  • 存储:SSD硬盘,预留50GB以上空间
  • 网络:支持HTTPS端口(默认443)

2.2 安装依赖包

执行以下命令安装基础依赖:

``` sudo apt-get update sudo apt-get install -y \ build-essential \ libssl-dev \ libffi-dev \ python3-dev \ python3-pip \ postgresql-13 \ postgresql-contrib-13 ```

2.3 配置PostgreSQL加密

编辑PostgreSQL配置文件:

``` sudo nano /etc/postgresql/13/main/postgresql.conf ```

添加以下配置:

``` 启用数据加密 ssl = on ssl_cert_file = '/etc/ssl/certs/ssl-cert-snakeoil.pem' ssl_key_file = '/etc/ssl/private/ssl-cert-snakeoil.key' 设置加密算法 ssl_ciphers = 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256' 重启PostgreSQL服务 sudo systemctl restart postgresql ```

三、核心加密模块实现

3.1 安装加密库

使用Python的cryptography库:

``` pip3 install cryptography==3.4.7 pip3 install pycryptodome==3.10.1 ```

3.2 实现AES-256-GCM加密类

档案数据加密管理软件实操指南:从零到一构建安全防护系统

创建encryption.py文件:

``` from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os class FileEncryptor: def __init__(self, master_key: bytes): self.master_key = master_key def generate_key(self, salt: bytes = None) -> bytes: """生成加密密钥""" if salt is None: salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000 ) return kdf.derive(self.master_key) def encrypt_file(self, file_path: str, output_path: str): """加密文件""" 读取文件内容 with open(file_path, 'rb') as f: data = f.read() 生成随机nonce nonce = os.urandom(12) 生成加密密钥 key = self.generate_key() 使用AES-GCM加密 aesgcm = AESGCM(key) encrypted_data = aesgcm.encrypt(nonce, data, None) 保存加密文件(包含nonce) with open(output_path, 'wb') as f: f.write(nonce + encrypted_data) return key def decrypt_file(self, file_path: str, key: bytes, output_path: str): """解密文件""" with open(file_path, 'rb') as f: encrypted_data = f.read() 分离nonce和加密数据 nonce = encrypted_data[:12] ciphertext = encrypted_data[12:] 使用AES-GCM解密 aesgcm = AESGCM(key) decrypted_data = aesgcm.decrypt(nonce, ciphertext, None) 保存解密文件 with open(output_path, 'wb') as f: f.write(decrypted_data) ```

3.3 实现密钥管理

创建key_manager.py文件:

``` import json import hashlib from cryptography.fernet import Fernet import os class KeyManager: def __init__(self, keystore_path: str = '/etc/encryption/keystore.json'): self.keystore_path = keystore_path self._ensure_keystore() def _ensure_keystore(self): """确保密钥存储文件存在""" if not os.path.exists(self.keystore_path): os.makedirs(os.path.dirname(self.keystore_path), exist_ok=True) self._create_new_keystore() def _create_new_keystore(self): """创建新的密钥存储""" master_key = Fernet.generate_key() keystore = { 'master_key': master_key.decode(), 'file_keys': {} } self._save_keystore(keystore) def _load_keystore(self) -> dict: """加载密钥存储""" with open(self.keystore_path, 'r') as f: return json.load(f) def _save_keystore(self, keystore: dict): """保存密钥存储""" with open(self.keystore_path, 'w') as f: json.dump(keystore, f, indent=2) def get_master_key(self) -> bytes: """获取主密钥""" keystore = self._load_keystore() return keystore['master_key'].encode() def store_file_key(self, file_id: str, key: bytes, metadata: dict = None): """存储文件密钥""" keystore = self._load_keystore() keystore['file_keys'][file_id] = { 'key': key.hex(), 'metadata': metadata or {} } self._save_keystore(keystore) def get_file_key(self, file_id: str) -> bytes: """获取文件密钥""" keystore = self._load_keystore() if file_id not in keystore['file_keys']: raise KeyError(f"Key for file {file_id} not found") return bytes.fromhex(keystore['file_keys'][file_id]['key']) ```

四、权限控制系统实现

4.1 数据库表设计

创建SQL文件schema.sql:

``` -- 用户表 CREATE TABLE users ( id SERIAL PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, email VARCHAR(100) UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT TRUE ); -- 角色表 CREATE TABLE roles ( id SERIAL PRIMARY KEY, role_name VARCHAR(50) UNIQUE NOT NULL, description TEXT ); -- 权限表 CREATE TABLE permissions ( id SERIAL PRIMARY KEY, permission_name VARCHAR(100) UNIQUE NOT NULL, description TEXT ); -- 用户角色关联表 CREATE TABLE user_roles ( user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, role_id INTEGER REFERENCES roles(id) ON DELETE CASCADE, PRIMARY KEY (user_id, role_id) ); -- 角色权限关联表 CREATE TABLE role_permissions ( role_id INTEGER REFERENCES roles(id) ON DELETE CASCADE, permission_id INTEGER REFERENCES permissions(id) ON DELETE CASCADE, PRIMARY KEY (role_id, permission_id) ); -- 文件元数据表 CREATE TABLE file_metadata ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), original_name VARCHAR(255) NOT NULL, encrypted_name VARCHAR(255) NOT NULL, file_size BIGINT NOT NULL, mime_type VARCHAR(100), uploader_id INTEGER REFERENCES users(id), uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, encryption_key_id VARCHAR(100) NOT NULL, access_policy JSONB ); -- 文件访问日志表 CREATE TABLE file_access_logs ( id SERIAL PRIMARY KEY, file_id UUID REFERENCES file_metadata(id) ON DELETE CASCADE, user_id INTEGER REFERENCES users(id), access_type VARCHAR(20) NOT NULL, access_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ip_address INET, user_agent TEXT ); ```

4.2 权限验证中间件

创建auth_middleware.py:

``` from functools import wraps from flask import request, jsonify import jwt import datetime class AuthMiddleware: def __init__(self, secret_key: str): self.secret_key = secret_key def generate_token(self, user_id: int, username: str, roles: list) -> str: """生成JWT令牌""" payload = { 'user_id': user_id, 'username': username, 'roles': roles, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24), 'iat': datetime.datetime.utcnow() } return jwt.encode(payload, self.secret_key, algorithm='HS256') def verify_token(self, token: str) -> dict: """验证JWT令牌""" try: payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) return payload except jwt.ExpiredSignatureError: raise Exception("Token has expired") except jwt.InvalidTokenError: raise Exception("Invalid token") def require_permission(self, permission_name: str): """权限验证装饰器""" def decorator(f): @wraps(f) def decorated_function(args, kwargs): token = request.headers.get('Authorization') if not token or not token.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token'}), 401 try: token = token.split(' ')[1] payload = self.verify_token(token) 检查权限 if not self._check_permission(payload['roles'], permission_name): return jsonify({'error': 'Insufficient permissions'}), 403 将用户信息添加到请求上下文 request.user_id = payload['user_id'] request.username = payload['username'] request.roles = payload['roles'] return f(args, kwargs) except Exception as e: return jsonify({'error': str(e)}), 401 return decorated_function return decorator def _check_permission(self, user_roles: list, required_permission: str) -> bool: """检查用户是否拥有指定权限""" 这里需要实现从数据库查询角色权限的逻辑 简化示例:假设拥有admin角色就有所有权限 return 'admin' in user_roles ```

五、完整系统集成

5.1 主应用程序配置

创建app.py文件:

``` from flask import Flask, request, jsonify, send_file from encryption import FileEncryptor from key_manager import KeyManager from auth_middleware import AuthMiddleware import uuid import os app = Flask(__name__) app.config['UPLOAD_FOLDER'] = '/var/www/uploads' app.config['ENCRYPTED_FOLDER'] = '/var/www/encrypted' app.config['MAX_CONTENT_LENGTH'] = 100 1024 1024 100MB限制 初始化组件 key_manager = KeyManager() master_key = key_manager.get_master_key() encryptor = FileEncryptor(master_key) auth = AuthMiddleware('your-secret-key-here') 确保目录存在 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['ENCRYPTED_FOLDER'], exist_ok=True) @app.route('/api/upload', methods=['POST']) @auth.require_permission('upload_file') def upload_file(): """文件上传接口""" if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 生成文件ID file_id = str(uuid.uuid4()) 保存原始文件 original_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{file_id}.orig") file.save(original_path) 加密文件 encrypted_path = os.path.join(app.config['ENCRYPTED_FOLDER'], f"{file_id}.enc") encryption_key = encryptor.encrypt_file(original_path, encrypted_path) 存储密钥 key_manager.store_file_key(file_id, encryption_key, { 'original_name': file.filename, 'file_size': os.path.getsize(original_path), 'uploader': request.username }) 删除原始文件(可选) os.remove(original_path) return jsonify({ 'file_id': file_id, 'status': 'encrypted', 'encrypted_path': encrypted_path }) @app.route('/api/download/', methods=['GET']) @auth.require_permission('download_file') def download_file(file_id): """文件下载接口""" try: 获取加密密钥 encryption_key = key_manager.get_file_key(file_id) 解密文件 encrypted_path = os.path.join(app.config['ENCRYPTED_FOLDER'], f"{file_id}.enc") decrypted_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{file_id}.dec") encryptor.decrypt_file(encrypted_path, encryption_key, decrypted_path) 发送文件 return send_file( decrypted_path, as_attachment=True, download_name=f"decrypted_{file_id}" ) except KeyError: return jsonify({'error': 'File not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 finally: 清理临时解密文件 if os.path.exists(decrypted_path): os.remove(decrypted_path) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, ssl_context='adhoc') ```

5.2 系统部署脚本

创建deploy.sh部署脚本:

``` !/bin/bash 停止现有服务 sudo systemctl stop file-encryption-service 更新代码 cd /opt/file-encryption-system git pull origin main 安装依赖 pip3 install -r requirements.txt 设置数据库 sudo -u postgres psql -f schema.sql 设置文件权限 sudo chown -R www-data:www-data /var/www/uploads sudo chown -R www-data:www-data /var/www/encrypted sudo chmod 750 /var/www/uploads sudo chmod 750 /var/www/encrypted 设置密钥存储权限 sudo mkdir -p /etc/encryption sudo chown root:www-data /etc/encryption sudo chmod 750 /etc/encryption 创建系统服务 sudo tee /etc/systemd/system/file-encryption-service.service << EOF [Unit] Description=File Encryption Service After=network.target postgresql.service [Service] User=www-data Group=www-data WorkingDirectory=/opt/file-encryption-system ExecStart=/usr/bin/python3 app.py Restart=always RestartSec=10 [Install] WantedBy=multi-user.target EOF 启用并启动服务 sudo systemctl daemon-reload sudo systemctl enable file-encryption-service sudo systemctl start file-encryption-service 检查服务状态 sudo systemctl status file-encryption-service ```

5.3 Nginx反向代理配置

创建Nginx配置文件/etc/nginx/sites-available/file-encryption:

``` server { listen 443 ssl http2; server_name your-domain.com; ssl_certificate /etc/ssl/certs/your-cert.pem; ssl_certificate_key /etc/ssl/private/your-key.pem; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512; ssl_prefer_server_ciphers off; location / { proxy_pass http://127.0.0.1:5000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; 文件上传大小限制 client_max_body_size 100M; } 静态文件缓存 location /static/ { alias /opt/file-encryption-system/static/; expires 30d; add_header Cache-Control "public, immutable"; } } ```

六、安全审计与监控

AI咨询
热线电话

028-85154420

15388110056

全国售前咨询电话

扫码咨询
安答联动微信公众号二维码

微信扫码关注安答联动

申请试用
热线电话
申请试用

安答联动档案管理系统