1.编辑xcall脚本

编辑xcall脚本,可以查看所有虚拟机的进程情况,这对后续很重要。

1
2
3
4
5
6
#! /bin/bash
for i in 虚拟机① 虚拟机② 虚拟机③
do
echo --------- $i ----------
ssh $i "$*"
done

编辑完成后,输入

1
xcall jps

如果正常的话,会显示每个虚拟机的进程

2.配置Hadoop

2.1准备

本人的hadoop是3.1.3,因为后续需要用到flume1.10.1与hadoop3.1.31不适配,所以将它升级到hadoop-3.3.4,升级之前,请将虚拟机进行快照保存,防止升级的时候出错。

2.2 安装hadoop-3.3.4

首先,安装hadoop-3.3.4到与hadoop-3.1.3相同的目录下,然后,把hadoop3.1.3中etc/hadoop/里的core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml、workers共五个文件与hadoop-3.3.4中相同文件进行替换。
替换完成后,输入以下命令,将hadoop分发到其他虚拟机上

1
xsync hadoop-3.3.4

然后更新环境变量:

1
sudo vim /etc/profile.d/my_env.sh

添加以下环境变量的内容,如果存在之前的hadoop-3.1.3的信息,请将它删掉

1
2
3
4
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.3.4
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

修改完成后输入指令刷新

1
source /etc/profile.d/my_env.sh

然后输入命令,分发内容,别忘了到每台虚拟机上刷新内容

1
xsync /etc/profile.d/my_env.sh

配置好所有虚拟机环境变量后,到虚拟机①上的hadoop-3.3.4目录下,输入

1
bin/hdfs namenode -forma

这条很重要,否则无法打开hdfs可视化页面。

2.3配置hadoop集群脚本

打开myhadoop.sh脚本

1
vim ~/bin/myhadoop.sh

输入以下集群管理内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="

echo " --------------- 启动 hdfs ---------------"
ssh 虚拟机① "/opt/module/hadoop-3.3.4/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh 虚拟机② "/opt/module/hadoop-3.3.4/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh 虚拟机③ "/opt/module/hadoop-3.3.4/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="

echo " --------------- 关闭 historyserver ---------------"
ssh 虚拟机① "/opt/module/hadoop-3.3.4/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh 虚拟机② "/opt/module/hadoop-3.3.4/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh 虚拟机③ "/opt/module/hadoop-3.3.4/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac

之后修改myhadoop.sh的权限,使它可以运行

1
chmod 777 ~/bin/myhadoop.sh

随后输入

1
myhadoop.sh start

启动hadoop,然后输入查看运行情况,最后进入网页端查看情况:http://主机名:9870/explorer.html#/

1
xcall jps

3.数据模拟

将application.yml、gmall-remake-mock-2023-05-15-3.jar、path.json、logback.xml上传到主机1的/opt/module/applog目录下,并配置相关参数。
设置好相关参数后,将applog文件分发到各个虚拟机上,在主机1上输入指令,查看是否会在当前目录下生成log/app.log

1
java -jar gmall-remake-mock-2023-05-15-3.jar test 100 2022-06-08

然后编写lg.sh脚本

1
vim ~/bin/lg.sh
1
2
3
4
5
#!/bin/bash
for i in 主机1 主机2; do
echo "========== $i =========="
ssh $i "cd /opt/module/applog/; nohup java -jar gmall-remake-mock-2023-05-15-3.jar $1 $2 $3 >/dev/null 2>&1 &"
done
1
chmod 777 ~/bin/lg.sh

执行脚本,确认虚拟机①和虚拟机②上的applog里生成了相应的文件。

1
lg.sh test 1000 2002-12-24

4.配置Zookeeper

下载Zookeeper文件,改名为zookeeper,然后在它的根目录下创建zkData文件夹

1
mkdir zkData

在zkData目录下创建yid文件,里面加上与server对应的编号(比如说1)。

1
vim yid

重命名/opt/module/zookeeper/conf这个目录下的zoo_sample.cfg为zoo.cfg

1
mv /opt/module/zookeeper/conf/zoo_sample.cfg /opt/module/zookeeper/conf/zoo.cfg

随后配置zoo.cfg文件,在里面加入

1
dataDir=/opt/module/zookeeper/zkData

并加上配置

1
2
3
4
#######################cluster##########################
server.1=主机1:2888:3888
server.2=主机2:2888:3888
server.3=主机3:2888:3888

完成配置后,分发该文件,然后根据上述,修改yid文件,与各自虚拟机编号对应

1
xsync /opt/module/zookeeper

创建脚本 ‘zk.sh

1
mkdir ~/bin/zk.sh

在其中输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash

case $1 in
"start"){
for i in 主机1 主机2 主机3
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
done
};;
"stop"){
for i in 主机1 主机2 主机3
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
done
};;
"status"){
for i in 主机1 主机2 主机3
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
done
};;
esac

5.配置kafka

下载kafka安装包,随后修改config下的server.properties文件,分发kafka文件到每个虚拟机上,虚拟机①的broke.id为1,虚拟机②和虚拟机③分别为2,3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=1
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=主机1:2181,主机2:2181,主机3:2181/kafka

编写启动\停止的集群脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#! /bin/bash
case $1 in
"start"){
for i in 主机1 主机2 主机3
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in 主机1 主机2 主机3
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac

如果遇到了bug,启动Zookeeper再启动kafka,kafka无法正常启动,把kafka目录下的datas文件里的内容删了就行。

6.配置Flume

采集日志并进行校验,其中,配置Source与KafkaChannel,Source用来接受数据并进行格式处理,处理好后传递给Channel。因为已经有Kafka了,所以sink就省略了。其中,在Source与Channel之间配置拦截器,进行筛选

下载好Flume后,首先,对flume中的conf目录下的log4j2.xml配置文件,配置日志文件路径

1
vim flume/conf/log4j2.xml

输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<Configuration status="ERROR">
<Properties>
<Property name="LOG_DIR">/opt/module/flume/log</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
</Console>
<RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i">
<PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />
<Policies>
<!-- Roll every night at midnight or when the file reaches 100MB -->
<SizeBasedTriggeringPolicy size="100 MB"/>
<CronTriggeringPolicy schedule="0 0 0 * * ?"/>
</Policies>
<DefaultRolloverStrategy min="1" max="20">
<Delete basePath="${LOG_DIR}/archive">
<!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->
<IfFileName glob="flume.log.*">
<!-- Only allow 1 GB of files to accumulate -->
<IfAccumulatedFileSize exceeds="1 GB"/>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
</Appenders>

<Loggers>
<Logger name="org.apache.flume.lifecycle" level="info"/>
<Logger name="org.jboss" level="WARN"/>
<Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
<Logger name="org.apache.hadoop" level="INFO"/>
<Logger name="org.apache.hadoop.hive" level="ERROR"/>
# 引入控制台输出,方便学习查看日志
<Root level="INFO">
<AppenderRef ref="LogFile" />
<AppenderRef ref="Console" />
</Root>
</Loggers>

</Configuration>

然后到flume的根目录下创建job/file_to_kafka.conf文件

1
mkdir job
1
vim file_to_kafka.conf

在文件中输入以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 主机1:9092,主机2:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装
a1.sources.r1.channels = c1

随后,编写脚本 fl1.sh,可以方便flume的启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/bash
case $1 in
"start"){
for i in 主机1
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
done
};;
"stop"){
for i in 主机1
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done

};;
esac

7.安装MySQL

创建software/mysql文件夹,然后将文件放入其中
进入管理员模式,输入并等待执行

1
sh install_mysql.sh

执行完毕后,输入

1
mysql -uroot -p

接着输入密码,然后输入

1
2
alter user 'root'@'%' identified with mysql_native_password by '密码';
flush privileges;

这样一来,本地的MySQL就安装好了,随后打开Navicat进行数据库的连接,并导入gmall.sql文件

8.业务数据采集模块

安装EZDML,可以根据需求建立模型

9.配置Maxwell

下载并安装Maxwell,MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启

1
sudo vim /etc/my.cnf

加入以下内容

1
2
3
4
5
6
7
8
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall

然后重启Mysql服务

1
sudo systemctl restart mysqld

创建数据库、创建Maxwell用户并赋予其必要权限

msyql> CREATE DATABASE maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

修改Maxwell配置文件名称

1
2
cd /opt/module/maxwell
cp config.properties.example config.properties

修改Maxwell配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=主机1:9092,主机2:9092,主机3:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db

# MySQL相关配置
host=cenos01
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
filter=exclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key

10.启用Maxwell

启动Kafka集群,创建Maxwell启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac

创建后,执行脚本,如果成功的话,可以看到以下信息

[ldy@主机1 applog]$ xcall jps
--------- 主机1 ----------
67175 jar
23161 QuorumPeerMain
23817 Kafka
80840 Jps
47246 Maxwell
65534 jar
--------- 主机2 ----------
36306 QuorumPeerMain
14966 Jps
124602 jar
68989 Kafka
--------- 主机3 ----------
36105 QuorumPeerMain
68779 Kafka
14396 Jps

11.增量数据同步

首先,确认applog下的application.yml文件中的内容正确

spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        druid:
          url: jdbc:mysql://主机1:3306/gmall?characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=GMT%2B8
          username: root
          password: "你的数据库密码"
          driver-class-name:  com.mysql.cj.jdbc.Driver
          max-active: 20
          test-on-borrow: true

然后执行

1
lg.sh

观察数据库中是否有数据生成,如果生成了,表示成功。

12.历史数据全量同步

历史数据的全量同步命令如下

1
/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table 表名 --config /opt/module/maxwell/config.properties

它的原理就是一次性把表内所有内容都获取。

13.日志消费Flume配置实操

13.1 创建Flume配置文件

1
2
3
cd /opt/module/flume/
mkdir job
vim job/kafka_to_hdfs_log.conf

在文件内容中输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 主机1:9092,主机2:9092,主机3:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.用户名.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://主机1:8020/origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

13.2 创建过滤器

在idea里创建名为gmall的Maven项目
在pom.xml中输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>gmall</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

创建com.用户名.gmall.flume.interceptor软件包,在软件包下创建TimestampInterceptor类,并在其中输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.用户名.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
//1、获取header和body的数据
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

try {
//2、将body的数据类型转成jsonObject类型(方便获取数据)
JSONObject jsonObject = JSONObject.parseObject(log);

//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);

return event;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()) {
Event event = iterator.next();
if (intercept(event) == null) {
iterator.remove();
}
}
return list;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}

@Override
public void configure(Context context) {
}
}
}

最后进行打包,把生成的gmall-1.0-SNAPSHOT-jar-with-dependencies.jar放入主机1的flume/lib/下面

13.3 日志消费Flume启停脚本

创建脚本 fl2.sh

1
vim ~/bin/fl2.sh

在其中输入以下内容,随后保存并赋予权限

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash

case $1 in
"start")
echo " --------启动 主机1 日志数据flume-------"
ssh 主机1 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

echo " --------停止 主机3 日志数据flume-------"
ssh 主机1 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

14.DataX的配置

14.1 DataX安装

解压DataX到/opt/module/目录下,然后输入该命令进行自检

1
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

如果出现表格,则说明安装成功(如果失败了,可以尝试把虚拟机运行内存调大一点,再进行尝试)

2024-06-25 01:33:30.580 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2024-06-25 16:33:20
任务结束时刻                    : 2024-06-25 16:33:30
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

可以使用如下命名查看DataX配置文件模板,son最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。

1
python bin/datax.py -r mysqlreader -w hdfswriter

Reader和Writer的具体参数可参考官方文档,地址如下:
https://github.com/alibaba/DataX/blob/master/README.md

14.2 同步MySQL数据到HDFS案例

案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
下面分别使用两种模式进行演示

14.2.1 MySQLReader之TableMode

输入命令,创建配置文件base_province.json

1
vim /opt/module/datax/job/base_province.json

在文件内输入以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2",
"create_time",
"operate_time"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://主机1:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
],
"table": [
"base_province"
]
}
],
"password": "数据库连接密码",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "operate_time",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://主机1:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

然后在hdfs里创建目录

1
hadoop fs -mkdir /base_province

在DataX跟目录下运行命令

1
python bin/datax.py job/base_province.json

之后就能在hdfs的web页面观察到文件的产生了,但是文件的产生不一定代表数据生成成功了,还需要输入

1
hadoop fs -cat /base_province/* | zcat

来观察数据的完整性,如果成功的话,可以看到数据表格。

14.2.2 MySQLReader之QuerySQLMode

修改配置文件base_province.json

1
vim /opt/module/datax/job/base_province_sql.json

然后修改内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://虚拟机①:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2,create_time,operate_time from base_province where id>=3"
]
}
],
"password": "密码",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "operate_time",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://虚拟机①:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

清空hdfs的目标文件

1
hadoop fs -rm -r -f /base_province/*

删除后,输入指令

1
python bin/datax.py job/base_province_sql.json

然后再次输入,观察数据

1
hadoop fs -cat /base_province/* | zcat

14.3 DataX传参

通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用
${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。

修改配置文件base_province.json

1
vim /opt/module/datax/job/base_province.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://虚拟机①:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2,create_time,operate_time from base_province where id>=3"
]
}
],
"password": "密码",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "operate_time",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://虚拟机①:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province/${dt}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

创建目标路径

1
hadoop fs -mkdir /base_province/2022-06-08

执行命令

1
python bin/datax.py -p"-Ddt=2022-06-08" job/base_province.json

然后对结果进行查看

1
hadoop fs -ls /base_province/2022-06-08

14.4 同步HDFS数据到MySQL

创建配置文件test_province.json

1
vim /opt/module/datax/job/test_province.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://centos01:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "1336214wdsj",
"connection": [
{
"table": [
"test_province"
],
"jdbcUrl": "jdbc:mysql://centos01:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2",
"create_time",
"operate_time"
],
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

然后在MySQL中创建gmall.test_province表

1
2
3
4
5
6
7
8
9
10
11
12
DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province` (
`id` bigint(20) NOT NULL,
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`create_time` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`operate_time` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

然后执行文件

1
python bin/datax.py job/test_province.json

之后就能在MySQL中看到生成的数据。

15.全量表数据同步

15.1 准备生成器

创建生成器目录

1
mkdir /opt/module/gen_datax_config

然后将生成器和配置文件上传至该目录下,并修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
mysql.username=root
mysql.password=密码
mysql.host=虚拟机①
mysql.port=3306
mysql.database.import=gmall
# mysql.database.export=gmall
mysql.tables.import=activity_info,activity_rule,base_trademark,cart_info,base_category1,base_category2,base_category3,coupon_info,sku_attr_value,sku_sale_attr_value,base_dic,sku_info,base_province,spu_info,base_region,promotion_pos,promotion_refer
# mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://虚拟机①:8020
import_out_dir=/opt/module/datax/job/import
# export_out_dir=

在该目录下执行指令

1
java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

输入指令观察

1
ll /opt/module/datax/job/import

15.2 编写全量数据同步脚本

创建脚本目录

1
vim ~/bin/mysql_to_hdfs_full.sh

编写以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
hadoop fs -test -e $1
if [[ $? -eq 1 ]]; then
echo "路径$1不存在,正在创建......"
hadoop fs -mkdir -p $1
else
echo "路径$1已经存在"

fi
}

#数据同步
import_data() {
datax_config=$1
target_dir=$2

handle_targetdir $target_dir
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}

case $1 in
"activity_info")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
;;
"activity_rule")
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
;;
"base_category1")
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
;;
"base_category2")
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
;;
"base_category3")
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
;;
"base_dic")
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
;;
"base_province")
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
;;
"base_region")
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
;;
"base_trademark")
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
;;
"cart_info")
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
;;
"coupon_info")
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
;;
"sku_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
;;
"sku_info")
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
;;
"sku_sale_attr_value")
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
;;
"spu_info")
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
;;
"promotion_pos")
import_data /opt/module/datax/job/import/gmall.promotion_pos.json /origin_data/gmall/db/promotion_pos_full/$do_date
;;
"promotion_refer")
import_data /opt/module/datax/job/import/gmall.promotion_refer.json /origin_data/gmall/db/promotion_refer_full/$do_date
;;
"all")
import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
import_data /opt/module/datax/job/import/gmall.promotion_pos.json /origin_data/gmall/db/promotion_pos_full/$do_date
import_data /opt/module/datax/job/import/gmall.promotion_refer.json /origin_data/gmall/db/promotion_refer_full/$do_date
;;
esac

赋予脚本权限

1
chmod 777 ~/bin/mysql_to_hdfs_full.sh

随后输入指令

1
mysql_to_hdfs_full.sh all 2022-06-08

等待指令运行完毕后,到hdfs的目标目录下查看文件是否存在

16.增量数据同步

16.1 FLume配置

在虚拟机①节点的Flume的job目录下创建kafka_to_hdfs_db.conf

1
vim job/kafka_to_hdfs_db.conf

配置以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 虚拟机①:9092,虚拟机②:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.用户名.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://虚拟机①:8020/origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

在com.用户名.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类,然后输入以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

JSONObject jsonObject = JSONObject.parseObject(log);

Long ts = jsonObject.getLong("ts");
//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
String timeMills = String.valueOf(ts * 1000);

String tableName = jsonObject.getString("table");

headers.put("timestamp", timeMills);
headers.put("tableName", tableName);
return event;

}

@Override
public List<Event> intercept(List<Event> events) {

for (Event event : events) {
intercept(event);
}

return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {


@Override
public Interceptor build() {
return new TimestampAndTableNameInterceptor ();
}

@Override
public void configure(Context context) {

}
}
}

打包完成后,删除虚拟机①的/opt/module/flume/lib目录下的gmall-1.0-SNAPSHOT-jar-with-dependencies.jar文件

1
rm -rf gmall-1.0-SNAPSHOT-jar-with-dependencies.jar

最后将打包好的jar放在/opt/module/flume/lib文件夹下。

在home/atguigu/bin目录下创建脚本fl3.sh,输入以下内容,并赋予执行权限。

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash

case $1 in
"start")
echo " --------启动 虚拟机① 业务数据flume-------"
ssh 虚拟机① "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

echo " --------停止 虚拟机① 业务数据flume-------"
ssh 虚拟机① "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

16.2 MAXWELL配置

在/opt/module/maxwell/config.properties文件中添加业务日期

1
2
# 修改数据时间戳的日期部分
mock_date=2022-06-08

16.3 增量测试

依次启动 myhadoop.shzk.sh

16.4 增量表首日全量同步

在~/bin目录创建mysql_to_kafka_inc_init.sh,输入以下内容,并赋予执行权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac

17. 安装Hive

解压hive-3.1.3.tar.gz到/opt/module/目录下面,然后改名

1
mv /opt/module/apache-hive-3.1.3-bin/ /opt/module/hive

修改/etc/profile.d/my_env.sh,添加环境变量

1
sudo vim /etc/profile.d/my_env.sh

添加内容

1
2
3
#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin

source一下 /etc/profile.d/my_env.sh文件,使环境变量生效

1
source /etc/profile.d/my_env.sh

解决日志Jar包冲突,进入/opt/module/hive/lib目录

1
mv log4j-slf4j-impl-2.17.1.jar log4j-slf4j-impl-2.17.1.jar.bak

将MySQL的JDBC驱动拷贝到Hive的lib目录下

1
cp /opt/software/mysql/mysql-connector-j-8.0.31.jar /opt/module/hive/lib/

在$HIVE_HOME/conf目录下新建hive-site.xml文件

1
hive-site.xml

添加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--配置Hive保存元数据信息所需的 MySQL URL地址-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://虚拟机①:3306/metastore?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8&amp;allowPublicKeyRetrieval=true</value>
</property>

<!--配置Hive连接MySQL的驱动全类名-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>

<!--配置Hive连接MySQL的用户名 -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<!--配置Hive连接MySQL的密码 -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>密码</value>
</property>

<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>

<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>

<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>

<property>
<name>hive.server2.thrift.bind.host</name>
<value>虚拟机①</value>
</property>

<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>

<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>

<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>

登陆MySQL

1
mysql -uroot -p密码

新建Hive元数据库

1
mysql> create database metastore;

初始化Hive元数据库

1
schematool -initSchema -dbType mysql -verbose

修改元数据库字符集,Hive元数据库的字符集默认为Latin1,由于其不支持中文字符,所以建表语句中如果包含中文注释,会出现乱码现象。如需解决乱码问题,须做以下修改。
修改Hive元数据库中存储注释的字段的字符集为utf-8。

1
2
mysql> use metastore;
mysql> alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;

表注释

1
mysql> alter table TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;

退出mysql

1
mysql> quit;

启动Hive客户端

1
hive

查看一下数据库

1
hive (default)> show databases;