大数据档案馆技术架构实操:从零搭建可扩展数据存储系统
一、系统架构设计
大数据档案馆采用分层架构设计,确保数据存储、处理、检索的完整链路稳定可靠。核心分为数据接入层、存储层、计算层和服务层。
1.1 数据接入层配置
使用Apache Kafka作为数据接入总线,配置3节点集群保证高可用。每个节点需要8核CPU、32GB内存、1TB SSD存储。
下载并安装Kafka:
``` wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz tar -xzf kafka_2.13-3.5.1.tgz cd kafka_2.13-3.5.1 ```配置server.properties文件:
``` broker.id=1 listeners=PLAINTEXT://:9092 log.dirs=/data/kafka-logs num.partitions=3 default.replication.factor=2 min.insync.replicas=2 zookeeper.connect=node1:2181,node2:2181,node3:2181 ```1.2 存储层选型
采用HDFS + HBase组合方案:HDFS存储原始文件,HBase存储结构化数据。HDFS配置5节点集群,每个节点12TB HDD存储;HBase配置3节点集群,每个节点256GB内存。
二、环境部署步骤
2.1 基础环境准备
所有节点统一使用Ubuntu 20.04 LTS系统,配置主机名解析:
``` sudo vim /etc/hosts 192.168.1.101 node1 192.168.1.102 node2 192.168.1.103 node3 192.168.1.104 node4 192.168.1.105 node5 ```配置SSH免密登录:在node1生成密钥并分发到所有节点
``` ssh-keygen -t rsa ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node3 ssh-copy-id node4 ssh-copy-id node5 ```2.2 Hadoop集群部署
下载Hadoop 3.3.6:
``` wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz tar -zxvf hadoop-3.3.6.tar.gz mv hadoop-3.3.6 /opt/hadoop ```配置core-site.xml:
```配置hdfs-site.xml:
```初始化并启动HDFS:
``` hdfs namenode -format start-dfs.sh ```2.3 HBase集群部署
下载HBase 2.5.6:
``` wget https://archive.apache.org/dist/hbase/2.5.6/hbase-2.5.6-bin.tar.gz tar -zxvf hbase-2.5.6-bin.tar.gz mv hbase-2.5.6 /opt/hbase ```配置hbase-site.xml:
```启动HBase集群:
``` start-hbase.sh ```三、数据接入与处理
3.1 数据采集配置
使用Flume从业务系统采集数据到Kafka,配置flume-agent.conf:
``` agent.sources = syslog agent.channels = memoryChannel agent.sinks = kafkaSink agent.sources.syslog.type = syslogtcp agent.sources.syslog.port = 5140 agent.sources.syslog.host = 0.0.0.0 agent.sources.syslog.channels = memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.capacity = 10000 agent.channels.memoryChannel.transactionCapacity = 1000 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.kafka.topic = archive_data agent.sinks.kafkaSink.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 agent.sinks.kafkaSink.kafka.flumeBatchSize = 100 agent.sinks.kafkaSink.channel = memoryChannel ```3.2 数据清洗与转换
使用Spark Streaming处理Kafka数据,创建process.py:
``` from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StringType, TimestampType spark = SparkSession.builder \ .appName("ArchiveDataProcessor") \ .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \ .getOrCreate() schema = StructType() \ .add("id", StringType()) \ .add("content", StringType()) \ .add("timestamp", TimestampType()) \ .add("source", StringType()) df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("subscribe", "archive_data") \ .load() parsed_df = df.select( from_json(col("value").cast("string"), schema).alias("data") ).select("data.") query = parsed_df \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "hdfs://node1:9000/archive/raw") \ .option("checkpointLocation", "hdfs://node1:9000/checkpoints") \ .start() query.awaitTermination() ```四、元数据管理实现
4.1 HBase表设计
创建档案元数据表:
``` create 'archive_metadata', {NAME => 'basic', VERSIONS => 1}, {NAME => 'technical', VERSIONS => 1}, {NAME => 'access', VERSIONS => 3} ```表结构说明:
- basic列族:存储文件基础信息(文件名、大小、类型)
- technical列族:存储技术属性(编码格式、分辨率、时长)
- access列族:存储访问控制信息(权限、访问记录)
4.2 元数据写入示例

使用Python连接HBase写入元数据:
``` import happybase import json connection = happybase.Connection('node1') table = connection.table('archive_metadata') def save_metadata(file_id, metadata): row_key = f"file_{file_id}" data = { 'basic:filename': metadata['filename'], 'basic:size': str(metadata['size']), 'basic:type': metadata['filetype'], 'technical:format': metadata['format'], 'technical:resolution': metadata.get('resolution', ''), 'access:permission': metadata['permission'] } table.put(row_key, data) ```五、检索系统搭建
5.1 Elasticsearch集成
安装Elasticsearch 8.11.0:
``` wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-linux-x86_64.tar.gz tar -zxvf elasticsearch-8.11.0-linux-x86_64.tar.gz cd elasticsearch-8.11.0 ```配置elasticsearch.yml:
``` cluster.name: archive-cluster node.name: node1 path.data: /data/elasticsearch path.logs: /var/log/elasticsearch network.host: 0.0.0.0 http.port: 9200 cluster.initial_master_nodes: ["node1"] ```启动Elasticsearch:
``` ./bin/elasticsearch -d ```5.2 索引创建与数据同步
创建档案索引:
``` curl -X PUT "http://node1:9200/archive_docs" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "filename": {"type": "text", "analyzer": "ik_max_word"}, "content": {"type": "text", "analyzer": "ik_max_word"}, "filetype": {"type": "keyword"}, "timestamp": {"type": "date"}, "source": {"type": "keyword"} } } }' ```使用Logstash同步HDFS数据到Elasticsearch,配置logstash.conf:
``` input { hdfs { path => "hdfs://node1:9000/archive/raw/.parquet" type => "archive" } } filter { parquet { columns => ["id", "filename", "content", "timestamp", "source"] } } output { elasticsearch { hosts => ["http://node1:9200"] index => "archive_docs" document_id => "%{id}" } } ```六、访问控制配置
6.1 Kerberos认证集成
安装Kerberos服务端:
``` sudo apt-get install krb5-kdc krb5-admin-server sudo krb5_newrealm ```创建HDFS服务主体:
``` kadmin.local -q "addprinc -randkey hdfs/node1@EXAMPLE.COM" kadmin.local -q "ktadd -k /etc/security/keytab/hdfs.keytab hdfs/node1@EXAMPLE.COM" ```配置core-site.xml启用Kerberos:
```6.2 细粒度权限控制
配置HDFS目录权限:
``` hdfs dfs -mkdir /archive hdfs dfs -chmod 750 /archive hdfs dfs -chown archive_admin:archive_users /archive ```设置HBase表ACL:
``` grant 'user1', 'R', 'archive_metadata' grant 'user2', 'RW', 'archive_metadata' ```七、监控与维护
7.1 监控指标采集
使用Prometheus监控集群,配置prometheus.yml:
``` global: scrape_interval: 15s scrape_configs: - job_name: 'hadoop' static_configs: - targets: ['node1:50070', 'node2:50070', 'node3:50070'] - job_name: 'hbase' static_configs: - targets: ['node1:16010', 'node2:16010', 'node3:16010'] - job_name: 'kafka' static_configs: - targets: ['node1:9092', 'node2:9092', 'node3:9092'] ```7.2 日常维护脚本
创建数据备份脚本backup.sh:
``` !/bin/bash BACKUP_DATE=$(date +%Y%m%d) hdfs dfs -mkdir /backup/$BACKUP_DATE hdfs dfs -cp /archive/ /backup/$BACKUP_DATE/ hbase backup create full /backup/hbase_$BACKUP_DATE ```创建集群健康检查脚本health_check.sh:
``` !/bin/bash echo "检查HDFS状态..." hdfs dfsadmin -report | grep "Live datanodes" echo "检查HBase状态..." echo "status" | hbase shell | grep "active master" echo "检查Kafka状态..." kafka-topics.sh --list --bootstrap-server node1:9092 ```八、故障处理指南
8.1 常见问题解决
HDFS数据节点丢失:
``` hdfs dfsadmin -report hadoop-daemon.sh start datanode ```HBase RegionServer宕机:
``` hbase-daemon.sh start regionserver hbase hbck -details ```Kafka分区不可用:
``` kafka-topics.sh --describe --topic archive_data --bootstrap-server node1:9092 kafka-topics.sh --alter --topic archive_data --partitions 4 --bootstrap-server node1:9092 ```8.2 数据恢复流程
从备份恢复HDFS数据:
``` hdfs dfs -rm -r /archive/ hdfs dfs -cp /backup/20240101/ /archive/ ```恢复HBase表:
``` hbase restore /backup/hbase_20240101 archive_metadata ```