1.编辑xcall脚本
编辑xcall脚本,可以查看所有虚拟机的进程情况,这对后续很重要。
1 2 3 4 5 6 #! /bin/bash for i in 虚拟机① 虚拟机② 虚拟机③do echo --------- $i ---------- ssh $i "$*" done
编辑完成后,输入
如果正常的话,会显示每个虚拟机的进程
2.配置Hadoop
2.1准备
本人的hadoop是3.1.3,因为后续需要用到flume1.10.1与hadoop3.1.31不适配,所以将它升级到hadoop-3.3.4,升级之前,请将虚拟机进行快照保存,防止升级的时候出错。
2.2 安装hadoop-3.3.4
首先,安装hadoop-3.3.4到与hadoop-3.1.3相同的目录下,然后,把hadoop3.1.3中etc/hadoop/里的core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml、workers共五个文件与hadoop-3.3.4中相同文件进行替换。
替换完成后,输入以下命令,将hadoop分发到其他虚拟机上
然后更新环境变量:
1 sudo vim /etc/profile.d/my_env.sh
添加以下环境变量的内容,如果存在之前的hadoop-3.1.3的信息,请将它删掉
1 2 3 4 export HADOOP_HOME=/opt/module/hadoop-3.3.4export PATH=$PATH :$HADOOP_HOME /binexport PATH=$PATH :$HADOOP_HOME /sbin
修改完成后输入指令刷新
1 source /etc/profile.d/my_env.sh
然后输入命令,分发内容,别忘了到每台虚拟机上刷新内容
1 xsync /etc/profile.d/my_env.sh
配置好所有虚拟机环境变量后,到虚拟机①上的hadoop-3.3.4目录下,输入
1 bin/hdfs namenode -forma
这条很重要,否则无法打开hdfs可视化页面。
2.3配置hadoop集群脚本
打开myhadoop.sh脚本
输入以下集群管理内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #!/bin/bash if [ $# -lt 1 ]then echo "No Args Input..." exit ; fi case $1 in "start" ) echo " =================== 启动 hadoop集群 ===================" echo " --------------- 启动 hdfs ---------------" ssh 虚拟机① "/opt/module/hadoop-3.3.4/sbin/start-dfs.sh" echo " --------------- 启动 yarn ---------------" ssh 虚拟机② "/opt/module/hadoop-3.3.4/sbin/start-yarn.sh" echo " --------------- 启动 historyserver ---------------" ssh 虚拟机③ "/opt/module/hadoop-3.3.4/bin/mapred --daemon start historyserver" ;; "stop" ) echo " =================== 关闭 hadoop集群 ===================" echo " --------------- 关闭 historyserver ---------------" ssh 虚拟机① "/opt/module/hadoop-3.3.4/bin/mapred --daemon stop historyserver" echo " --------------- 关闭 yarn ---------------" ssh 虚拟机② "/opt/module/hadoop-3.3.4/sbin/stop-yarn.sh" echo " --------------- 关闭 hdfs ---------------" ssh 虚拟机③ "/opt/module/hadoop-3.3.4/sbin/stop-dfs.sh" ;; *) echo "Input Args Error..." ;; esac
之后修改myhadoop.sh的权限,使它可以运行
1 chmod 777 ~/bin/myhadoop.sh
随后输入
启动hadoop,然后输入查看运行情况,最后进入网页端查看情况:http://主机名:9870/explorer.html#/
3.数据模拟
将application.yml、gmall-remake-mock-2023-05-15-3.jar、path.json、logback.xml上传到主机1的/opt/module/applog目录下,并配置相关参数。
设置好相关参数后,将applog文件分发到各个虚拟机上,在主机1上输入指令,查看是否会在当前目录下生成log/app.log
1 java -jar gmall-remake-mock-2023-05-15-3.jar test 100 2022-06-08
然后编写lg.sh脚本
1 2 3 4 5 #!/bin/bash for i in 主机1 主机2; do echo "========== $i ==========" ssh $i "cd /opt/module/applog/; nohup java -jar gmall-remake-mock-2023-05-15-3.jar $1 $2 $3 >/dev/null 2>&1 &" done
执行脚本,确认虚拟机①和虚拟机②上的applog里生成了相应的文件。
1 lg.sh test 1000 2002-12-24
4.配置Zookeeper
下载Zookeeper文件,改名为zookeeper,然后在它的根目录下创建zkData文件夹
在zkData目录下创建yid文件,里面加上与server对应的编号(比如说1)。
重命名/opt/module/zookeeper/conf这个目录下的zoo_sample.cfg为zoo.cfg
1 mv /opt/module/zookeeper/conf/zoo_sample.cfg /opt/module/zookeeper/conf/zoo.cfg
随后配置zoo.cfg文件,在里面加入
1 dataDir=/opt/module/zookeeper/zkData
并加上配置
1 2 3 4 #######################cluster########################## server.1=主机1:2888:3888 server.2=主机2:2888:3888 server.3=主机3:2888:3888
完成配置后,分发该文件,然后根据上述,修改yid文件,与各自虚拟机编号对应
1 xsync /opt/module/zookeeper
创建脚本 ‘zk.sh ’
在其中输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #!/bin/bash case $1 in "start" ){ for i in 主机1 主机2 主机3 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh start" done };; "stop" ){ for i in 主机1 主机2 主机3 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop" done };; "status" ){ for i in 主机1 主机2 主机3 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper/bin/zkServer.sh status" done };; esac
5.配置kafka
下载kafka安装包,随后修改config下的server.properties文件,分发kafka文件到每个虚拟机上,虚拟机①的broke.id为1,虚拟机②和虚拟机③分别为2,3。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #broker的全局唯一编号,不能重复,只能是数字。 broker.id=1 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个topic创建时的副本数,默认时1个副本 offsets.topic.replication.factor=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个segment文件的大小,默认最大1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认5分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理) zookeeper.connect=主机1:2181,主机2:2181,主机3:2181/kafka
编写启动\停止的集群脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #! /bin/bash case $1 in "start" ){ for i in 主机1 主机2 主机3 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop" ){ for i in 主机1 主机2 主机3 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac
如果遇到了bug,启动Zookeeper再启动kafka,kafka无法正常启动,把kafka目录下的datas文件里的内容删了就行。
6.配置Flume
采集日志并进行校验,其中,配置Source与KafkaChannel,Source用来接受数据并进行格式处理,处理好后传递给Channel。因为已经有Kafka了,所以sink就省略了。其中,在Source与Channel之间配置拦截器,进行筛选
下载好Flume后,首先,对flume中的conf目录下的log4j2.xml配置文件,配置日志文件路径
1 vim flume/conf/log4j2.xml
输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 <?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <Configuration status="ERROR"> <Properties> <Property name="LOG_DIR">/opt/module/flume/log</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_ERR"> <PatternLayout pattern="%d (%t) [%p - %l] %m%n" /> </Console> <RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i"> <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" /> <Policies> <!-- Roll every night at midnight or when the file reaches 100MB --> <SizeBasedTriggeringPolicy size="100 MB"/> <CronTriggeringPolicy schedule="0 0 0 * * ?"/> </Policies> <DefaultRolloverStrategy min="1" max="20"> <Delete basePath="${LOG_DIR}/archive"> <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. --> <IfFileName glob="flume.log.*"> <!-- Only allow 1 GB of files to accumulate --> <IfAccumulatedFileSize exceeds="1 GB"/> </IfFileName> </Delete> </DefaultRolloverStrategy> </RollingFile> </Appenders> <Loggers> <Logger name="org.apache.flume.lifecycle" level="info"/> <Logger name="org.jboss" level="WARN"/> <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/> <Logger name="org.apache.hadoop" level="INFO"/> <Logger name="org.apache.hadoop.hive" level="ERROR"/> # 引入控制台输出,方便学习查看日志 <Root level="INFO"> <AppenderRef ref="LogFile" /> <AppenderRef ref="Console" /> </Root> </Loggers> </Configuration>
然后到flume的根目录下创建job/file_to_kafka.conf文件
在文件中输入以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #定义组件 a1.sources = r1 a1.channels = c1 #配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json #配置channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 主机1:9092,主机2:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #组装 a1.sources.r1.channels = c1
随后,编写脚本 fl1.sh ,可以方便flume的启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #!/bin/bash case $1 in "start" ){ for i in 主机1 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &" done };; "stop" ){ for i in 主机1 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac
7.安装MySQL
创建software/mysql文件夹,然后将文件放入其中
进入管理员模式,输入并等待执行
执行完毕后,输入
接着输入密码,然后输入
1 2 alter user 'root' @'%' identified with mysql_native_password by '密码' ; flush privileges;
这样一来,本地的MySQL就安装好了,随后打开Navicat进行数据库的连接,并导入gmall.sql文件
8.业务数据采集模块
安装EZDML,可以根据需求建立模型
9.配置Maxwell
下载并安装Maxwell,MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启
加入以下内容
1 2 3 4 5 6 7 8 #数据库id server-id = 1 #启动binlog,该参数的值会作为binlog的文件名 log-bin=mysql-bin #binlog类型,maxwell要求为row类型 binlog_format=row #启用binlog的数据库,需根据实际情况作出修改 binlog-do-db=gmall
然后重启Mysql服务
1 sudo systemctl restart mysqld
创建数据库、创建Maxwell用户并赋予其必要权限
msyql> CREATE DATABASE maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
修改Maxwell配置文件名称
1 2 cd /opt/module/maxwellcp config.properties.example config.properties
修改Maxwell配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 producer =kafka kafka.bootstrap.servers =主机1:9092,主机2:9092,主机3:9092 kafka_topic =topic_db host =cenos01 user =maxwell password =maxwell jdbc_options =useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true filter =exclude:gmall.z_log producer_partition_by =primary_key
10.启用Maxwell
启动Kafka集群,创建Maxwell启停脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #!/bin/bash MAXWELL_HOME=/opt/module/maxwell status_maxwell (){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result } start_maxwell (){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME /bin/maxwell --config $MAXWELL_HOME /config.properties --daemon else echo "Maxwell正在运行" fi } stop_maxwell (){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi } case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell start_maxwell ;; esac
创建后,执行脚本,如果成功的话,可以看到以下信息
[ldy@主机1 applog]$ xcall jps
--------- 主机1 ----------
67175 jar
23161 QuorumPeerMain
23817 Kafka
80840 Jps
47246 Maxwell
65534 jar
--------- 主机2 ----------
36306 QuorumPeerMain
14966 Jps
124602 jar
68989 Kafka
--------- 主机3 ----------
36105 QuorumPeerMain
68779 Kafka
14396 Jps
11.增量数据同步
首先,确认applog下的application.yml文件中的内容正确
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
url: jdbc:mysql://主机1:3306/gmall?characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=GMT%2B8
username: root
password: "你的数据库密码"
driver-class-name: com.mysql.cj.jdbc.Driver
max-active: 20
test-on-borrow: true
然后执行
观察数据库中是否有数据生成,如果生成了,表示成功。
12.历史数据全量同步
历史数据的全量同步命令如下
1 /opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table 表名 --config /opt/module/maxwell/config.properties
它的原理就是一次性把表内所有内容都获取。
13.日志消费Flume配置实操
13.1 创建Flume配置文件
1 2 3 cd /opt/module/flume/mkdir job vim job/kafka_to_hdfs_log.conf
在文件内容中输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #定义组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 #配置source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 主机1:9092,主机2:9092,主机3:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.用户名.gmall.flume.interceptor.TimestampInterceptor$Builder #配置channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1 a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://主机1:8020/origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 #控制输出文件类型 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip #组装 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
13.2 创建过滤器
在idea里创建名为gmall的Maven项目
在pom.xml中输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > org.example</groupId > <artifactId > gmall</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > </properties > <dependencies > <dependency > <groupId > org.apache.flume</groupId > <artifactId > flume-ng-core</artifactId > <version > 1.10.1</version > <scope > provided</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.62</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.1</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.3.0</version > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
创建com.用户名.gmall.flume.interceptor软件包,在软件包下创建TimestampInterceptor类,并在其中输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package com.用户名.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;import java.util.Map;public class TimestampInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { Map<String, String> headers = event.getHeaders(); String log = new String (event.getBody(), StandardCharsets.UTF_8); try { JSONObject jsonObject = JSONObject.parseObject(log); String ts = jsonObject.getString("ts" ); headers.put("timestamp" , ts); return event; } catch (Exception e) { e.printStackTrace(); return null ; } } @Override public List<Event> intercept (List<Event> list) { Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()) { Event event = iterator.next(); if (intercept(event) == null ) { iterator.remove(); } } return list; } @Override public void close () { } public static class Builder implements Interceptor .Builder { @Override public Interceptor build () { return new TimestampInterceptor (); } @Override public void configure (Context context) { } } }
最后进行打包,把生成的gmall-1.0-SNAPSHOT-jar-with-dependencies.jar
放入主机1的flume/lib/下面
13.3 日志消费Flume启停脚本
创建脚本 fl2.sh
在其中输入以下内容,随后保存并赋予权限
1 2 3 4 5 6 7 8 9 10 11 12 13 #!/bin/bash case $1 in "start" ) echo " --------启动 主机1 日志数据flume-------" ssh 主机1 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &" ;; "stop" ) echo " --------停止 主机3 日志数据flume-------" ssh 主机1 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill" ;; esac
14.DataX的配置
14.1 DataX安装
解压DataX到/opt/module/目录下,然后输入该命令进行自检
1 python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
如果出现表格,则说明安装成功(如果失败了,可以尝试把虚拟机运行内存调大一点,再进行尝试)
2024-06-25 01:33:30.580 [job-0] INFO JobContainer -
任务启动时刻 : 2024-06-25 16:33:20
任务结束时刻 : 2024-06-25 16:33:30
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
可以使用如下命名查看DataX配置文件模板,son最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
1 python bin/datax.py -r mysqlreader -w hdfswriter
Reader和Writer的具体参数可参考官方文档,地址如下:
https://github.com/alibaba/DataX/blob/master/README.md
14.2 同步MySQL数据到HDFS案例
案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
下面分别使用两种模式进行演示
14.2.1 MySQLReader之TableMode
输入命令,创建配置文件base_province.json
1 vim /opt/module/datax/job/base_province.json
在文件内输入以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "column" : [ "id" , "name" , "region_id" , "area_code" , "iso_code" , "iso_3166_2" , "create_time" , "operate_time" ] , "where" : "id>=3" , "connection" : [ { "jdbcUrl" : [ "jdbc:mysql://主机1:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ] , "table" : [ "base_province" ] } ] , "password" : "数据库连接密码" , "splitPk" : "" , "username" : "root" } } , "writer" : { "name" : "hdfswriter" , "parameter" : { "column" : [ { "name" : "id" , "type" : "bigint" } , { "name" : "name" , "type" : "string" } , { "name" : "region_id" , "type" : "string" } , { "name" : "area_code" , "type" : "string" } , { "name" : "iso_code" , "type" : "string" } , { "name" : "iso_3166_2" , "type" : "string" } , { "name" : "create_time" , "type" : "string" } , { "name" : "operate_time" , "type" : "string" } ] , "compress" : "gzip" , "defaultFS" : "hdfs://主机1:8020" , "fieldDelimiter" : "\t" , "fileName" : "base_province" , "fileType" : "text" , "path" : "/base_province" , "writeMode" : "append" } } } ] , "setting" : { "speed" : { "channel" : 1 } } } }
然后在hdfs里创建目录
1 hadoop fs -mkdir /base_province
在DataX跟目录下运行命令
1 python bin/datax.py job/base_province.json
之后就能在hdfs的web页面观察到文件的产生了,但是文件的产生不一定代表数据生成成功了,还需要输入
1 hadoop fs -cat /base_province/* | zcat
来观察数据的完整性,如果成功的话,可以看到数据表格。
14.2.2 MySQLReader之QuerySQLMode
修改配置文件base_province.json
1 vim /opt/module/datax/job/base_province_sql.json
然后修改内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "connection" : [ { "jdbcUrl" : [ "jdbc:mysql://虚拟机①:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ] , "querySql" : [ "select id,name,region_id,area_code,iso_code,iso_3166_2,create_time,operate_time from base_province where id>=3" ] } ] , "password" : "密码" , "username" : "root" } } , "writer" : { "name" : "hdfswriter" , "parameter" : { "column" : [ { "name" : "id" , "type" : "bigint" } , { "name" : "name" , "type" : "string" } , { "name" : "region_id" , "type" : "string" } , { "name" : "area_code" , "type" : "string" } , { "name" : "iso_code" , "type" : "string" } , { "name" : "iso_3166_2" , "type" : "string" } , { "name" : "create_time" , "type" : "string" } , { "name" : "operate_time" , "type" : "string" } ] , "compress" : "gzip" , "defaultFS" : "hdfs://虚拟机①:8020" , "fieldDelimiter" : "\t" , "fileName" : "base_province" , "fileType" : "text" , "path" : "/base_province" , "writeMode" : "append" } } } ] , "setting" : { "speed" : { "channel" : 1 } } } }
清空hdfs的目标文件
1 hadoop fs -rm -r -f /base_province/*
删除后,输入指令
1 python bin/datax.py job/base_province_sql.json
然后再次输入,观察数据
1 hadoop fs -cat /base_province/* | zcat
14.3 DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用
${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。
修改配置文件base_province.json
1 vim /opt/module/datax/job/base_province.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 { "job" : { "content" : [ { "reader" : { "name" : "mysqlreader" , "parameter" : { "connection" : [ { "jdbcUrl" : [ "jdbc:mysql://虚拟机①:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ] , "querySql" : [ "select id,name,region_id,area_code,iso_code,iso_3166_2,create_time,operate_time from base_province where id>=3" ] } ] , "password" : "密码" , "username" : "root" } } , "writer" : { "name" : "hdfswriter" , "parameter" : { "column" : [ { "name" : "id" , "type" : "bigint" } , { "name" : "name" , "type" : "string" } , { "name" : "region_id" , "type" : "string" } , { "name" : "area_code" , "type" : "string" } , { "name" : "iso_code" , "type" : "string" } , { "name" : "iso_3166_2" , "type" : "string" } , { "name" : "create_time" , "type" : "string" } , { "name" : "operate_time" , "type" : "string" } ] , "compress" : "gzip" , "defaultFS" : "hdfs://虚拟机①:8020" , "fieldDelimiter" : "\t" , "fileName" : "base_province" , "fileType" : "text" , "path" : "/base_province/${dt}" , "writeMode" : "append" } } } ] , "setting" : { "speed" : { "channel" : 1 } } } }
创建目标路径
1 hadoop fs -mkdir /base_province/2022-06-08
执行命令
1 python bin/datax.py -p"-Ddt=2022-06-08" job/base_province.json
然后对结果进行查看
1 hadoop fs -ls /base_province/2022-06-08
14.4 同步HDFS数据到MySQL
创建配置文件test_province.json
1 vim /opt/module/datax/job/test_province.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "defaultFS": "hdfs://centos01:8020", "path": "/base_province", "column": [ "*" ], "fileType": "text", "compress": "gzip", "encoding": "UTF-8", "nullFormat": "\\N", "fieldDelimiter": "\t", } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "1336214wdsj", "connection": [ { "table": [ "test_province" ], "jdbcUrl": "jdbc:mysql://centos01:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" } ], "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2", "create_time", "operate_time" ], "writeMode": "replace" } } } ], "setting": { "speed": { "channel": 1 } } } }
然后在MySQL中创建gmall.test_province表
1 2 3 4 5 6 7 8 9 10 11 12 DROP TABLE IF EXISTS `test_province`; CREATE TABLE `test_province` ( `id` bigint(20) NOT NULL, `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `create_time` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `operate_time` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
然后执行文件
1 python bin/datax.py job/test_province.json
之后就能在MySQL中看到生成的数据。
15.全量表数据同步
15.1 准备生成器
创建生成器目录
1 mkdir /opt/module/gen_datax_config
然后将生成器和配置文件上传至该目录下,并修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 mysql.username =root mysql.password =密码 mysql.host =虚拟机① mysql.port =3306 mysql.database.import =gmall mysql.tables.import =activity_info,activity_rule,base_trademark,cart_info,base_category1,base_category2,base_category3,coupon_info,sku_attr_value,sku_sale_attr_value,base_dic,sku_info,base_province,spu_info,base_region,promotion_pos,promotion_refer is.seperated.tables =0 hdfs.uri =hdfs://虚拟机①:8020 import_out_dir =/opt/module/datax/job/import
在该目录下执行指令
1 java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
输入指令观察
1 ll /opt/module/datax/job/import
15.2 编写全量数据同步脚本
创建脚本目录
1 vim ~/bin/mysql_to_hdfs_full.sh
编写以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 #!/bin/bash DATAX_HOME=/opt/module/datax if [ -n "$2 " ] ;then do_date=$2 else do_date=`date -d "-1 day" +%F` fi handle_targetdir () { hadoop fs -test -e $1 if [[ $? -eq 1 ]]; then echo "路径$1 不存在,正在创建......" hadoop fs -mkdir -p $1 else echo "路径$1 已经存在" fi } import_data () { datax_config=$1 target_dir=$2 handle_targetdir $target_dir python $DATAX_HOME /bin/datax.py -p"-Dtargetdir=$target_dir " $datax_config } case $1 in "activity_info" ) import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date ;; "activity_rule" ) import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date ;; "base_category1" ) import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date ;; "base_category2" ) import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date ;; "base_category3" ) import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date ;; "base_dic" ) import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date ;; "base_province" ) import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date ;; "base_region" ) import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date ;; "base_trademark" ) import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date ;; "cart_info" ) import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date ;; "coupon_info" ) import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date ;; "sku_attr_value" ) import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date ;; "sku_info" ) import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date ;; "sku_sale_attr_value" ) import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date ;; "spu_info" ) import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date ;; "promotion_pos" ) import_data /opt/module/datax/job/import/gmall.promotion_pos.json /origin_data/gmall/db/promotion_pos_full/$do_date ;; "promotion_refer" ) import_data /opt/module/datax/job/import/gmall.promotion_refer.json /origin_data/gmall/db/promotion_refer_full/$do_date ;; "all" ) import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date import_data /opt/module/datax/job/import/gmall.promotion_pos.json /origin_data/gmall/db/promotion_pos_full/$do_date import_data /opt/module/datax/job/import/gmall.promotion_refer.json /origin_data/gmall/db/promotion_refer_full/$do_date ;; esac
赋予脚本权限
1 chmod 777 ~/bin/mysql_to_hdfs_full.sh
随后输入指令
1 mysql_to_hdfs_full.sh all 2022-06-08
等待指令运行完毕后,到hdfs的目标目录下查看文件是否存在
16.增量数据同步
16.1 FLume配置
在虚拟机①节点的Flume的job目录下创建kafka_to_hdfs_db.conf
1 vim job/kafka_to_hdfs_db.conf
配置以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 虚拟机①:9092,虚拟机②:9092 a1.sources.r1.kafka.topics = topic_db a1.sources.r1.kafka.consumer.group.id = flume a1.sources.r1.setTopicHeader = true a1.sources.r1.topicHeader = topic a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.用户名.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://虚拟机①:8020/origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = db a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
在com.用户名.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类,然后输入以下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.List;import java.util.Map;public class TimestampAndTableNameInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { Map<String, String> headers = event.getHeaders(); String log = new String (event.getBody(), StandardCharsets.UTF_8); JSONObject jsonObject = JSONObject.parseObject(log); Long ts = jsonObject.getLong("ts" ); String timeMills = String.valueOf(ts * 1000 ); String tableName = jsonObject.getString("table" ); headers.put("timestamp" , timeMills); headers.put("tableName" , tableName); return event; } @Override public List<Event> intercept (List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close () { } public static class Builder implements Interceptor .Builder { @Override public Interceptor build () { return new TimestampAndTableNameInterceptor (); } @Override public void configure (Context context) { } } }
打包完成后,删除虚拟机①的/opt/module/flume/lib目录下的gmall-1.0-SNAPSHOT-jar-with-dependencies.jar文件
1 rm -rf gmall-1.0-SNAPSHOT-jar-with-dependencies.jar
最后将打包好的jar放在/opt/module/flume/lib文件夹下。
在home/atguigu/bin目录下创建脚本fl3.sh,输入以下内容,并赋予执行权限。
1 2 3 4 5 6 7 8 9 10 11 12 13 #!/bin/bash case $1 in "start" ) echo " --------启动 虚拟机① 业务数据flume-------" ssh 虚拟机① "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &" ;; "stop" ) echo " --------停止 虚拟机① 业务数据flume-------" ssh 虚拟机① "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill" ;; esac
16.2 MAXWELL配置
在/opt/module/maxwell/config.properties文件中添加业务日期
16.3 增量测试
依次启动 myhadoop.sh
、 zk.sh
16.4 增量表首日全量同步
在~/bin目录创建mysql_to_kafka_inc_init.sh,输入以下内容,并赋予执行权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 #!/bin/bash MAXWELL_HOME=/opt/module/maxwell import_data () { $MAXWELL_HOME /bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME /config.properties } case $1 in "cart_info" ) import_data cart_info ;; "comment_info" ) import_data comment_info ;; "coupon_use" ) import_data coupon_use ;; "favor_info" ) import_data favor_info ;; "order_detail" ) import_data order_detail ;; "order_detail_activity" ) import_data order_detail_activity ;; "order_detail_coupon" ) import_data order_detail_coupon ;; "order_info" ) import_data order_info ;; "order_refund_info" ) import_data order_refund_info ;; "order_status_log" ) import_data order_status_log ;; "payment_info" ) import_data payment_info ;; "refund_payment" ) import_data refund_payment ;; "user_info" ) import_data user_info ;; "all" ) import_data cart_info import_data comment_info import_data coupon_use import_data favor_info import_data order_detail import_data order_detail_activity import_data order_detail_coupon import_data order_info import_data order_refund_info import_data order_status_log import_data payment_info import_data refund_payment import_data user_info ;; esac
17. 安装Hive
解压hive-3.1.3.tar.gz到/opt/module/目录下面,然后改名
1 mv /opt/module/apache-hive-3.1.3-bin/ /opt/module/hive
修改/etc/profile.d/my_env.sh,添加环境变量
1 sudo vim /etc/profile.d/my_env.sh
添加内容
1 2 3 export HIVE_HOME=/opt/module/hiveexport PATH=$PATH :$HIVE_HOME /bin
source一下 /etc/profile.d/my_env.sh文件,使环境变量生效
1 source /etc/profile.d/my_env.sh
解决日志Jar包冲突,进入/opt/module/hive/lib目录
1 mv log4j-slf4j-impl-2.17.1.jar log4j-slf4j-impl-2.17.1.jar.bak
将MySQL的JDBC驱动拷贝到Hive的lib目录下
1 cp /opt/software/mysql/mysql-connector-j-8.0.31.jar /opt/module/hive/lib/
在$HIVE_HOME/conf目录下新建hive-site.xml文件
添加如下内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 <?xml version="1.0" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > javax.jdo.option.ConnectionURL</name > <value > jdbc:mysql://虚拟机①:3306/metastore?useSSL=false& useUnicode=true& characterEncoding=UTF-8& allowPublicKeyRetrieval=true</value > </property > <property > <name > javax.jdo.option.ConnectionDriverName</name > <value > com.mysql.cj.jdbc.Driver</value > </property > <property > <name > javax.jdo.option.ConnectionUserName</name > <value > root</value > </property > <property > <name > javax.jdo.option.ConnectionPassword</name > <value > 密码</value > </property > <property > <name > hive.metastore.warehouse.dir</name > <value > /user/hive/warehouse</value > </property > <property > <name > hive.metastore.schema.verification</name > <value > false</value > </property > <property > <name > hive.server2.thrift.port</name > <value > 10000</value > </property > <property > <name > hive.server2.thrift.bind.host</name > <value > 虚拟机①</value > </property > <property > <name > hive.metastore.event.db.notification.api.auth</name > <value > false</value > </property > <property > <name > hive.cli.print.header</name > <value > true</value > </property > <property > <name > hive.cli.print.current.db</name > <value > true</value > </property > </configuration >
登陆MySQL
新建Hive元数据库
1 mysql> create database metastore;
初始化Hive元数据库
1 schematool -initSchema -dbType mysql -verbose
修改元数据库字符集,Hive元数据库的字符集默认为Latin1,由于其不支持中文字符,所以建表语句中如果包含中文注释,会出现乱码现象。如需解决乱码问题,须做以下修改。
修改Hive元数据库中存储注释的字段的字符集为utf-8。
1 2 mysql> use metastore; mysql> alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
表注释
1 mysql> alter table TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;
退出mysql
启动Hive客户端
查看一下数据库
1 hive (default)> show databases;
18. Spark安装
在Hive所在节点部署Spark纯净版,上传并解压
1 tar -zxvf spark-3.3.1-bin-without-hadoop.tgz -C /opt/module/
重命名
1 mv /opt/module/spark-3.3.1-bin-without-hadoop /opt/module/spark
修改spark-env.sh配置文件
1 mv /opt/module/spark/conf/spark-env.sh.template /opt/module/spark/conf/spark-env.sh
编辑spark-env.sh配置文件
1 vim /opt/module/spark/conf/spark-env.sh
加入内容
1 export SPARK_DIST_CLASSPATH=$(hadoop classpath)
更新环境变量
1 sudo vim /etc/profile.d/my_env.sh
加入以下内容
1 2 3 export SPARK_HOME=/opt/module/sparkexport PATH=$PATH :$SPARK_HOME /bin
使其生效
1 source /etc/profile.d/my_env.sh
在hive中创建spark配置文件
1 vim /opt/module/hive/conf/spark-defaults.conf
添加内容
1 2 3 4 5 spark.master yarn spark.eventLog.enabled true spark.eventLog.dir hdfs://虚拟机①:8020/spark-history spark.executor.memory 1g spark.driver.memory 1g
在HDFS创建如下路径,用于存储历史日志
1 hadoop fs -mkdir /spark-history
创建spark-jars
1 hadoop fs -mkdir /spark-jars
上传数据
1 hadoop fs -put /opt/module/spark/jars/* /spark-jars
修改hive-site.xml文件
1 vim /opt/module/hive/conf/hive-site.xml
加入以下内容
1 2 3 4 5 6 7 8 9 10 11 <property > <name > spark.yarn.jars</name > <value > hdfs://虚拟机①:8020/spark-jars/*</value > </property > <property > <name > hive.execution.engine</name > <value > spark</value > </property >
最后进行测试,启动hive客户端
创建表
1 create table student(id int, name string);
通过insert测试效果
1 insert into table student values(1,'abc');
19. Yarn环境配置
打开虚拟机①中的capacity-scheduler.xml文件
1 /opt/module/hadoop/etc/hadoop/capacity-scheduler.xml
编辑文件
1 2 3 4 <property > <name > yarn.scheduler.capacity.maximum-am-resource-percent</name > <value > 0.8</value > </property >
分发capacity-scheduler.xml配置文件
1 xsync capacity-scheduler.xml
20. 配置DataGrip连接
启动hive,连接DataGrip