综合档案管理系统大数据分析实战:从数据清洗到可视化
一、环境与数据准备
本节将完成基础环境搭建与原始数据获取。
1.1 基础软件安装
在Ubuntu 20.04系统上执行以下命令安装必要组件:
安装Python环境及数据分析库
sudo apt update
sudo apt install python3-pip
pip3 install pandas==1.4.2 numpy==1.22.3 matplotlib==3.5.1 seaborn==0.11.2
pip3 install jupyterlab==3.3.2 scikit-learn==1.0.2
安装数据库与连接工具
sudo apt install postgresql-12
sudo apt install postgresql-client-12
pip3 install psycopg2-binary==2.9.3 sqlalchemy==1.4.32
1.2 档案数据表结构
创建档案核心数据表,执行以下SQL语句:

CREATE TABLE archive_records (
record_id VARCHAR(50) PRIMARY KEY,
archive_type VARCHAR(20) NOT NULL,
create_date DATE NOT NULL,
department VARCHAR(100),
file_size_mb DECIMAL(10,2),
access_level INT CHECK (access_level BETWEEN 1 AND 5),
digitization_status BOOLEAN DEFAULT FALSE,
keyword_tags TEXT[],
storage_location VARCHAR(200),
last_access_date DATE,
access_count INT DEFAULT 0
);
CREATE TABLE archive_operations (
operation_id SERIAL PRIMARY KEY,
record_id VARCHAR(50) REFERENCES archive_records(record_id),
operation_type VARCHAR(20) CHECK (operation_type IN ('借阅','归还','查询','下载','上传')),
operation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
operator_id VARCHAR(30),
operation_duration_seconds INT
);
1.3 模拟数据生成
创建数据生成脚本generate_archive_data.py:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import psycopg2
def generate_sample_data():
np.random.seed(42)
生成档案记录数据
departments = ['人事部', '财务部', '技术部', '行政部', '市场部']
archive_types = ['文书档案', '科技档案', '会计档案', '声像档案', '实物档案']
records = []
for i in range(1, 1001):
create_date = datetime.now() - timedelta(days=np.random.randint(1, 3650))
records.append({
'record_id': f'ARC_{str(i).zfill(6)}',
'archive_type': np.random.choice(archive_types),
'create_date': create_date.date(),
'department': np.random.choice(departments),
'file_size_mb': round(np.random.uniform(0.1, 500), 2),
'access_level': np.random.randint(1, 6),
'digitization_status': np.random.random() > 0.3,
'keyword_tags': [f'tag{np.random.randint(1,20)}' for _ in range(np.random.randint(1,5))],
'storage_location': f'柜区{np.random.randint(1,10)}-层{np.random.randint(1,8)}',
'last_access_date': (create_date + timedelta(days=np.random.randint(0, 1000))).date() if np.random.random() > 0.4 else None,
'access_count': np.random.randint(0, 50)
})
生成操作记录数据
operations = []
operation_types = ['借阅', '归还', '查询', '下载', '上传']
for i in range(1, 5001):
record_idx = np.random.randint(0, 1000)
op_time = datetime.now() - timedelta(days=np.random.randint(0, 30),
hours=np.random.randint(0, 24),
minutes=np.random.randint(0, 60))
operations.append({
'record_id': records[record_idx]['record_id'],
'operation_type': np.random.choice(operation_types),
'operation_time': op_time,
'operator_id': f'USER{np.random.randint(1001, 1100)}',
'operation_duration_seconds': np.random.randint(10, 3600)
})
return pd.DataFrame(records), pd.DataFrame(operations)
if __name__ == "__main__":
records_df, operations_df = generate_sample_data()
records_df.to_csv('archive_records.csv', index=False, encoding='utf-8-sig')
operations_df.to_csv('archive_operations.csv', index=False, encoding='utf-8-sig')
运行脚本生成CSV文件:
python3 generate_archive_data.py
二、数据清洗与预处理
2.1 数据质量检查
创建data_cleaning.py脚本进行数据清洗:
import pandas as pd
import numpy as np
def load_and_clean_data():
加载数据
records_df = pd.read_csv('archive_records.csv')
operations_df = pd.read_csv('archive_operations.csv')
print("原始数据统计:")
print(f"档案记录数:{len(records_df)}")
print(f"操作记录数:{len(operations_df)}")
检查缺失值
print("\n缺失值统计:")
print(records_df.isnull().sum())
print(operations_df.isnull().sum())
处理缺失值
records_df['last_access_date'] = pd.to_datetime(records_df['last_access_date'])
records_df['last_access_date'].fillna(records_df['create_date'], inplace=True)
数据类型转换
records_df['create_date'] = pd.to_datetime(records_df['create_date'])
operations_df['operation_time'] = pd.to_datetime(operations_df['operation_time'])
处理异常值
records_df = records_df[records_df['file_size_mb'] <= 1000] 移除异常大文件
records_df = records_df[records_df['access_count'] >= 0] 移除负值
数据去重
records_df = records_df.drop_duplicates(subset=['record_id'], keep='first')
operations_df = operations_df.drop_duplicates()
计算衍生字段
records_df['archive_age_days'] = (pd.Timestamp.now() - records_df['create_date']).dt.days
records_df['access_frequency'] = records_df['access_count'] / records_df['archive_age_days'].clip(lower=1)
operations_df['operation_hour'] = operations_df['operation_time'].dt.hour
operations_df['operation_weekday'] = operations_df['operation_time'].dt.weekday
operations_df['operation_month'] = operations_df['operation_time'].dt.month
return records_df, operations_df
if __name__ == "__main__":
clean_records, clean_operations = load_and_clean_data()
print(f"\n清洗后数据量:")
print(f"档案记录:{len(clean_records)} 条")
print(f"操作记录:{len(clean_operations)} 条")
2.2 数据存储优化
创建数据库连接并将清洗后的数据存入PostgreSQL:
from sqlalchemy import create_engine
import psycopg2
def store_to_database(records_df, operations_df):
创建数据库连接
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
存储数据
records_df.to_sql('archive_records', engine, if_exists='replace', index=False)
operations_df.to_sql('archive_operations', engine, if_exists='replace', index=False)
创建索引优化查询性能
with engine.connect() as conn:
conn.execute("""
CREATE INDEX idx_archive_type ON archive_records(archive_type);
CREATE INDEX idx_department ON archive_records(department);
CREATE INDEX idx_create_date ON archive_records(create_date);
CREATE INDEX idx_operation_time ON archive_operations(operation_time);
CREATE INDEX idx_operation_type ON archive_operations(operation_type);
""")
print("数据存储完成,索引创建成功")
执行存储
store_to_database(clean_records, clean_operations)
三、核心分析指标计算
3.1 档案利用率分析
创建analysis_utilization.py脚本:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
def calculate_utilization_metrics():
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
查询数据
query = """
SELECT
r.department,
r.archive_type,
COUNT(r.record_id) as total_records,
SUM(CASE WHEN r.access_count > 0 THEN 1 ELSE 0 END) as accessed_records,
AVG(r.access_count) as avg_access_count,
SUM(r.file_size_mb) as total_size_mb,
SUM(CASE WHEN r.digitization_status THEN 1 ELSE 0 END) as digitized_count
FROM archive_records r
GROUP BY r.department, r.archive_type
ORDER BY r.department, total_records DESC
"""
metrics_df = pd.read_sql_query(query, engine)
计算利用率指标
metrics_df['access_rate'] = (metrics_df['accessed_records'] / metrics_df['total_records'] 100).round(2)
metrics_df['digitization_rate'] = (metrics_df['digitized_count'] / metrics_df['total_records'] 100).round(2)
metrics_df['size_per_record'] = (metrics_df['total_size_mb'] / metrics_df['total_records']).round(2)
return metrics_df
def calculate_operation_patterns():
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
操作时间模式分析
time_query = """
SELECT
EXTRACT(HOUR FROM operation_time) as hour_of_day,
operation_type,
COUNT() as operation_count,
AVG(operation_duration_seconds) as avg_duration_seconds
FROM archive_operations
GROUP BY EXTRACT(HOUR FROM operation_time), operation_type
ORDER BY hour_of_day, operation_type
"""
time_patterns = pd.read_sql_query(time_query, engine)
热门档案分析
hot_query = """
SELECT
r.record_id,
r.archive_type,
r.department,
COUNT(o.operation_id) as operation_count,
MAX(o.operation_time) as last_operation_time
FROM archive_records r
JOIN archive_operations o ON r.record_id = o.record_id
GROUP BY r.record_id, r.archive_type, r.department
HAVING COUNT(o.operation_id) > 10
ORDER BY operation_count DESC
LIMIT 20
"""
hot_records = pd.read_sql_query(hot_query, engine)
return time_patterns, hot_records
if __name__ == "__main__":
metrics = calculate_utilization_metrics()
time_patterns, hot_records = calculate_operation_patterns()
print("档案利用率指标:")
print(metrics.head(10))
print("\n热门档案TOP10:")
print(hot_records.head(10))
3.2 存储优化分析
def storage_optimization_analysis():
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
存储空间分析
storage_query = """
SELECT
storage_location,
COUNT() as record_count,
SUM(file_size_mb) as total_size_mb,
AVG(access_count) as avg_access_count,
MAX(last_access_date) as last_access_date
FROM archive_records
WHERE storage_location IS NOT NULL
GROUP BY storage_location
ORDER BY total_size_mb DESC
"""
storage_df = pd.read_sql_query(storage_query, engine)
低利用率档案识别
low_usage_query = """
SELECT
record_id,
archive_type,
department,
create_date,
last_access_date,
access_count,
file_size_mb,
(EXTRACT(DAY FROM NOW() - last_access_date)) as days_since_last_access
FROM archive_records
WHERE access_count = 0
OR (EXTRACT(DAY FROM NOW() - last_access_date) > 365 AND access_count < 5)
ORDER BY file_size_mb DESC
LIMIT 50
"""
low_usage_df = pd.read_sql_query(low_usage_query, engine)
return storage_df, low_usage_df
四、可视化展示实现
4.1 基础图表配置
创建visualization.py脚本:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sqlalchemy import create_engine
import numpy as np
设置中文字体和样式
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
def create_dashboard():
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
获取数据
metrics_query = "SELECT FROM archive_records LIMIT 1000"
operations_query = "SELECT FROM archive_operations LIMIT 5000"
records_df = pd.read_sql_query(metrics_query, engine)
operations_df = pd.read_sql_query(operations_query, engine)
创建画布
fig = plt.figure(figsize=(20, 12))
1. 档案类型分布
ax1 = plt.subplot(2, 3, 1)
type_counts = records_df['archive_type'].value_counts()
ax1.pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%')
ax1.set_title('档案类型分布')
2. 部门档案数量
ax2 = plt.subplot(2, 3, 2)
dept_counts = records_df['department'].value_counts()
sns.barplot(x=dept_counts.values, y=dept_counts.index, ax=ax2)
ax2.set_title('各部门档案数量')
ax2.set_xlabel('档案数量')
3. 访问量时间趋势
ax3 = plt.subplot(2, 3, 3)
operations_df['operation_date'] = operations_df['operation_time'].dt.date
daily_operations = operations_df.groupby('operation_date').size()
ax3.plot(daily_operations.index, daily_operations.values)
ax3.set_title('每日操作量趋势')
ax3.set_xlabel('日期')
ax3.set_ylabel('操作次数')
plt.xticks(rotation=45)
4. 操作类型分布
ax4 = plt.subplot(2, 3, 4)
operation_counts = operations_df['operation_type'].value_counts()
sns.barplot(x=operation_counts.index, y=operation_counts.values, ax=ax4)
ax4.set_title('操作类型分布')
ax4.set_ylabel('操作次数')
plt.xticks(rotation=45)
5. 档案大小分布
ax5 = plt.subplot(2, 3, 5)
sns.histplot(data=records_df, x='file_size_mb', bins=30, ax=ax5)
ax5.set_title('档案大小分布')
ax5.set_xlabel('文件大小(MB)')
ax5.set_ylabel('数量')
6. 访问热度分析
ax6 = plt.subplot(2, 3, 6)
hot_records = records_df.nlargest(10, 'access_count')[['record_id', 'access_count']]
sns.barplot(data=hot_records, x='access_count', y='record_id', ax=ax6)
ax6.set_title('访问量TOP10档案')
ax6.set_xlabel('访问次数')
plt.tight_layout()
plt.savefig('archive_dashboard.png', dpi=300, bbox_inches='tight')
plt.show()
def create_operation_heatmap():
engine = create_engine('postgresql://username:password@localhost:5432/archive_db')
获取操作时间数据
query = """
SELECT
EXTRACT(HOUR FROM operation_time) as hour,
EXTRACT(DOW FROM operation_time) as weekday,
COUNT() as operation_count
FROM archive_operations
GROUP BY EXTRACT(HOUR FROM operation_time), EXTRACT(DOW FROM operation_time)
"""
heatmap_data = pd.read_sql_query(query, engine)
创建热力图数据透视表
pivot_data = heatmap_data.pivot(index='weekday', columns='hour', values='operation_count')
创建热力图
plt.figure(figsize=(12, 6))
sns.heatmap(pivot_data.fillna(0), cmap='YlOrRd', annot=True, fmt='.0f')
设置坐标轴标签