Wormhole部署&使用

Wormhole部署&使用[hadoop@hadoop003software]$tar-zxfwormhole-0.6.2.tar.gz-C../app/[hadoop@hadoop003app]$ln-swormhole-0.6.2wormhole[hadoop@hadoop003app]$sudovi/etc/profileexportWORMHOLE_HOME=/h…_wormhole自己部署

大家好,欢迎来到IT知识分享网。

 

[hadoop@hadoop003 software]$ tar -zxf wormhole-0.6.2.tar.gz -C ../app/

[hadoop@hadoop003 app]$ ln -s wormhole-0.6.2 wormhole

[hadoop@hadoop003 app]$ sudo vi /etc/profile

export WORMHOLE_HOME=/home/hadoop/app/wormhole
export PATH=$WORMHOLE_HOME/bin:$PATH

[hadoop@hadoop003 app]$ source /etc/profile

[hadoop@hadoop003 app]$ vi $WORMHOLE_HOME/conf/application.conf

 


mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_mixed_case_count=0;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_number_count=3;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_special_char_count=0;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_length=3;
Query OK, 0 rows affected (0.00 sec)

mysql> CREATE DATABASE IF NOT EXISTS wormhole DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
Query OK, 1 row affected (0.01 sec)

mysql> CREATE USER 'wormhole'@'%' IDENTIFIED BY '123456';
Query OK, 0 rows affected (0.01 sec)

mysql> GRANT ALL ON wormhole.* TO 'wormhole'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)

mysql> exit
Bye

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

创建测试topic

[hadoop@hadoop002 ~]$ cd /usr/hdp/current/kafka-broker/bin

[hadoop@hadoop002 bin]$ ./kafka-topics.sh --zookeeper hadoop001:2181 --create --topic test_source --replication-factor 1 --partitions 1

[hadoop@hadoop002 bin]$ ./kafka-topics.sh --zookeeper hadoop001:2181 --list

创建Instance

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

创建Database

Wormhole部署&使用

Wormhole部署&使用

创建Namespace

Wormhole部署&使用

往Kafka写数据 

[hadoop@hadoop002 bin]$ ./kafka-console-producer.sh --broker-list hadoop002:6667 --topic test_source --property "parse.key=true" --property "key.separator=@@@"

[hadoop@hadoop003 bin]$ ./kafka-console-consumer.sh  --bootstrap-server hadoop001:6667 --from-beginning --topic test_source

{"id": 1, "name": "test", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:00:00"}

创建Source Namaspace 

Wormhole部署&使用

Wormhole部署&使用

创建Sink Namaspace

Wormhole部署&使用

Wormhole部署&使用

useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true

Wormhole部署&使用

创建Lookup Namespace

Wormhole部署&使用

useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&noAccessToProcedureBodies=true&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false

Wormhole部署&使用

创建Project并授权

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

创建Stream

Wormhole部署&使用

添加Stream

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

创建Flow

Wormhole部署&使用

选择数据源source

Wormhole部署&使用

选择目标(sink)

Wormhole部署&使用

Wormhole部署&使用

配置转换逻辑

Wormhole部署&使用

Wormhole部署&使用

select id as id1,cardBank from userCard where (id) in (kafka.hdp-kafka.test_source.test_table.id);

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

在hadoop01节点上登录MySQL:

CREATE DATABASE IF NOT EXISTS lookup DEFAULT CHARSET utf8 COLLATE utf8_unicode_ci;

use lookup;

CREATE TABLE IF NOT EXISTS `userCard` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `cardBank` VARCHAR(200) NOT NULL,
  PRIMARY KEY (`id`)
)ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci;

INSERT INTO userCard (id, cardBank) VALUES (1, "CMB");
INSERT INTO userCard (id, cardBank) VALUES (2, "CITIC");
INSERT INTO userCard (id, cardBank) VALUES (3, "ABC");
INSERT INTO userCard (id, cardBank) VALUES (4, "BOC");
INSERT INTO userCard (id, cardBank) VALUES (5, "CEB");
INSERT INTO userCard (id, cardBank) VALUES (6, "CCB");
INSERT INTO userCard (id, cardBank) VALUES (7, "ICBC");
INSERT INTO userCard (id, cardBank) VALUES (8, "CMBC");
INSERT INTO userCard (id, cardBank) VALUES (9, "SPDB");
INSERT INTO userCard (id, cardBank) VALUES (10, "GDB");

sink table

CREATE DATABASE IF NOT EXISTS testdb DEFAULT CHARSET utf8 COLLATE utf8_unicode_ci;

use testdb;

CREATE TABLE IF NOT EXISTS `user` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(200) NOT NULL,
  `cardBank` VARCHAR(200) NOT NULL,
  `phone` VARCHAR(200) NOT NULL,
  `city` VARCHAR(200) NOT NULL,
  PRIMARY KEY (`id`)
)ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci;

启动Flow

Wormhole部署&使用

注意:Flow suspending状态代表挂起状态,标识Flow信息已注册到Stream中,Stream目前处于非 running状态。Stream状态正常后Flow状态会自动切换到running或failed状态

启动Stream

Wormhole部署&使用

Wormhole部署&使用

Wormhole部署&使用

问题:flow一直failed,百思不得其解

Wormhole部署&使用

Wormhole部署&使用

每台机器的/usr/share/java/文件夹下必须要有mysql-connector-java-8.0.18.jar

com.mysql.cj.exceptions.CJException: Access denied for user 'root'@'node01'
  (using password: YES)

这是因为我们配置的MySQL的namespace使用的root用户,而root用户不允许远程访问。

解决办法1:改用其他用户,且其他用户具有相关权限(远程访问,已经对前面创建的那几个

database的读写权限)

解决办法2:开通root用户的远程访问权限。

mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)

mysql>   set global validate_password_mixed_case_count=0;
Query OK, 0 rows affected (0.00 sec)

mysql>   set global validate_password_number_count=3;
Query OK, 0 rows affected (0.00 sec)

mysql>   set global validate_password_special_char_count=0;
Query OK, 0 rows affected (0.01 sec)

mysql>   set global validate_password_length=3;
Query OK, 0 rows affected (0.01 sec)

mysql> CREATE USER 'aaron'@'%' IDENTIFIED BY '123456';
Query OK, 0 rows affected (0.01 sec)

mysql> GRANT ALL ON testdb.* TO 'aaron'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql>   GRANT ALL ON lookup.* TO 'aaron'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)

至此,完美解决

Wormhole部署&使用

去MySQL数据库查看结果

Wormhole部署&使用

再插入一条数据观察情况

[root@hadoop002 java]# cd /usr/hdp/current/kafka-broker/bin

[root@hadoop002 bin]# ./kafka-console-producer.sh --broker-list hadoop002:6667 --topic test_source --property "parse.key=true" --property "key.separator=@@@"

>data_increment_data.kafka.hdp-kafka.test_source.test_table.*.*.*@@@{"id": 2, "name": "test1", "phone":"18074546423", "city": "Shanghai", "time": "2017-12-22 10:00:00"}

Wormhole部署&使用

漂亮!

 

 

 

 

 

 

 

 

 

 

 

 

 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/27529.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信