大数据档案馆技术架构实操:从零搭建可扩展数据存储系统

一、系统架构设计

大数据档案馆采用分层架构设计,确保数据存储、处理、检索的完整链路稳定可靠。核心分为数据接入层、存储层、计算层和服务层。

1.1 数据接入层配置

使用Apache Kafka作为数据接入总线,配置3节点集群保证高可用。每个节点需要8核CPU、32GB内存、1TB SSD存储。

下载并安装Kafka:

``` wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz tar -xzf kafka_2.13-3.5.1.tgz cd kafka_2.13-3.5.1 ```

配置server.properties文件:

``` broker.id=1 listeners=PLAINTEXT://:9092 log.dirs=/data/kafka-logs num.partitions=3 default.replication.factor=2 min.insync.replicas=2 zookeeper.connect=node1:2181,node2:2181,node3:2181 ```

1.2 存储层选型

采用HDFS + HBase组合方案:HDFS存储原始文件,HBase存储结构化数据。HDFS配置5节点集群,每个节点12TB HDD存储;HBase配置3节点集群,每个节点256GB内存。

二、环境部署步骤

2.1 基础环境准备

所有节点统一使用Ubuntu 20.04 LTS系统,配置主机名解析:

``` sudo vim /etc/hosts 192.168.1.101 node1 192.168.1.102 node2 192.168.1.103 node3 192.168.1.104 node4 192.168.1.105 node5 ```

配置SSH免密登录:在node1生成密钥并分发到所有节点

``` ssh-keygen -t rsa ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node3 ssh-copy-id node4 ssh-copy-id node5 ```

2.2 Hadoop集群部署

下载Hadoop 3.3.6:

``` wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz tar -zxvf hadoop-3.3.6.tar.gz mv hadoop-3.3.6 /opt/hadoop ```

配置core-site.xml:

``` fs.defaultFS hdfs://node1:9000 hadoop.tmp.dir /data/hadoop/tmp ```

配置hdfs-site.xml:

``` dfs.replication 3 dfs.namenode.name.dir /data/hadoop/namenode dfs.datanode.data.dir /data/hadoop/datanode ```

初始化并启动HDFS

``` hdfs namenode -format start-dfs.sh ```

2.3 HBase集群部署

下载HBase 2.5.6:

``` wget https://archive.apache.org/dist/hbase/2.5.6/hbase-2.5.6-bin.tar.gz tar -zxvf hbase-2.5.6-bin.tar.gz mv hbase-2.5.6 /opt/hbase ```

配置hbase-site.xml:

``` hbase.rootdir hdfs://node1:9000/hbase hbase.cluster.distributed true hbase.zookeeper.quorum node1,node2,node3 ```

启动HBase集群:

``` start-hbase.sh ```

三、数据接入与处理

3.1 数据采集配置

使用Flume从业务系统采集数据到Kafka,配置flume-agent.conf:

``` agent.sources = syslog agent.channels = memoryChannel agent.sinks = kafkaSink agent.sources.syslog.type = syslogtcp agent.sources.syslog.port = 5140 agent.sources.syslog.host = 0.0.0.0 agent.sources.syslog.channels = memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.capacity = 10000 agent.channels.memoryChannel.transactionCapacity = 1000 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.kafka.topic = archive_data agent.sinks.kafkaSink.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 agent.sinks.kafkaSink.kafka.flumeBatchSize = 100 agent.sinks.kafkaSink.channel = memoryChannel ```

3.2 数据清洗与转换

使用Spark Streaming处理Kafka数据,创建process.py:

``` from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StringType, TimestampType spark = SparkSession.builder \ .appName("ArchiveDataProcessor") \ .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \ .getOrCreate() schema = StructType() \ .add("id", StringType()) \ .add("content", StringType()) \ .add("timestamp", TimestampType()) \ .add("source", StringType()) df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("subscribe", "archive_data") \ .load() parsed_df = df.select( from_json(col("value").cast("string"), schema).alias("data") ).select("data.") query = parsed_df \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "hdfs://node1:9000/archive/raw") \ .option("checkpointLocation", "hdfs://node1:9000/checkpoints") \ .start() query.awaitTermination() ```

四、元数据管理实现

4.1 HBase表设计

创建档案元数据表:

``` create 'archive_metadata', {NAME => 'basic', VERSIONS => 1}, {NAME => 'technical', VERSIONS => 1}, {NAME => 'access', VERSIONS => 3} ```

表结构说明:

  • basic列族:存储文件基础信息(文件名、大小、类型)
  • technical列族:存储技术属性(编码格式、分辨率、时长)
  • access列族:存储访问控制信息(权限、访问记录)

4.2 元数据写入示例

大数据档案馆技术架构实操:从零搭建可扩展数据存储系统

使用Python连接HBase写入元数据:

``` import happybase import json connection = happybase.Connection('node1') table = connection.table('archive_metadata') def save_metadata(file_id, metadata): row_key = f"file_{file_id}" data = { 'basic:filename': metadata['filename'], 'basic:size': str(metadata['size']), 'basic:type': metadata['filetype'], 'technical:format': metadata['format'], 'technical:resolution': metadata.get('resolution', ''), 'access:permission': metadata['permission'] } table.put(row_key, data) ```

五、检索系统搭建

5.1 Elasticsearch集成

安装Elasticsearch 8.11.0:

``` wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-linux-x86_64.tar.gz tar -zxvf elasticsearch-8.11.0-linux-x86_64.tar.gz cd elasticsearch-8.11.0 ```

配置elasticsearch.yml:

``` cluster.name: archive-cluster node.name: node1 path.data: /data/elasticsearch path.logs: /var/log/elasticsearch network.host: 0.0.0.0 http.port: 9200 cluster.initial_master_nodes: ["node1"] ```

启动Elasticsearch

``` ./bin/elasticsearch -d ```

5.2 索引创建与数据同步

创建档案索引:

``` curl -X PUT "http://node1:9200/archive_docs" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "filename": {"type": "text", "analyzer": "ik_max_word"}, "content": {"type": "text", "analyzer": "ik_max_word"}, "filetype": {"type": "keyword"}, "timestamp": {"type": "date"}, "source": {"type": "keyword"} } } }' ```

使用Logstash同步HDFS数据到Elasticsearch,配置logstash.conf:

``` input { hdfs { path => "hdfs://node1:9000/archive/raw/.parquet" type => "archive" } } filter { parquet { columns => ["id", "filename", "content", "timestamp", "source"] } } output { elasticsearch { hosts => ["http://node1:9200"] index => "archive_docs" document_id => "%{id}" } } ```

六、访问控制配置

6.1 Kerberos认证集成

安装Kerberos服务端:

``` sudo apt-get install krb5-kdc krb5-admin-server sudo krb5_newrealm ```

创建HDFS服务主体:

``` kadmin.local -q "addprinc -randkey hdfs/node1@EXAMPLE.COM" kadmin.local -q "ktadd -k /etc/security/keytab/hdfs.keytab hdfs/node1@EXAMPLE.COM" ```

配置core-site.xml启用Kerberos:

``` hadoop.security.authentication kerberos hadoop.security.authorization true ```

6.2 细粒度权限控制

配置HDFS目录权限:

``` hdfs dfs -mkdir /archive hdfs dfs -chmod 750 /archive hdfs dfs -chown archive_admin:archive_users /archive ```

设置HBase表ACL:

``` grant 'user1', 'R', 'archive_metadata' grant 'user2', 'RW', 'archive_metadata' ```

七、监控与维护

7.1 监控指标采集

使用Prometheus监控集群,配置prometheus.yml:

``` global: scrape_interval: 15s scrape_configs: - job_name: 'hadoop' static_configs: - targets: ['node1:50070', 'node2:50070', 'node3:50070'] - job_name: 'hbase' static_configs: - targets: ['node1:16010', 'node2:16010', 'node3:16010'] - job_name: 'kafka' static_configs: - targets: ['node1:9092', 'node2:9092', 'node3:9092'] ```

7.2 日常维护脚本

创建数据备份脚本backup.sh:

``` !/bin/bash BACKUP_DATE=$(date +%Y%m%d) hdfs dfs -mkdir /backup/$BACKUP_DATE hdfs dfs -cp /archive/ /backup/$BACKUP_DATE/ hbase backup create full /backup/hbase_$BACKUP_DATE ```

创建集群健康检查脚本health_check.sh:

``` !/bin/bash echo "检查HDFS状态..." hdfs dfsadmin -report | grep "Live datanodes" echo "检查HBase状态..." echo "status" | hbase shell | grep "active master" echo "检查Kafka状态..." kafka-topics.sh --list --bootstrap-server node1:9092 ```

八、故障处理指南

8.1 常见问题解决

HDFS数据节点丢失

``` hdfs dfsadmin -report hadoop-daemon.sh start datanode ```

HBase RegionServer宕机

``` hbase-daemon.sh start regionserver hbase hbck -details ```

Kafka分区不可用

``` kafka-topics.sh --describe --topic archive_data --bootstrap-server node1:9092 kafka-topics.sh --alter --topic archive_data --partitions 4 --bootstrap-server node1:9092 ```

8.2 数据恢复流程

从备份恢复HDFS数据:

``` hdfs dfs -rm -r /archive/ hdfs dfs -cp /backup/20240101/ /archive/ ```

恢复HBase表:

``` hbase restore /backup/hbase_20240101 archive_metadata ```
AI咨询
热线电话

028-85154420

15388110056

全国售前咨询电话

扫码咨询
安答联动微信公众号二维码

微信扫码关注安答联动

申请试用
热线电话
申请试用

安答联动档案管理系统