大家好,欢迎来到IT知识分享网。
[hadoop@hadoop003 software]$ tar -zxf wormhole-0.6.2.tar.gz -C ../app/
[hadoop@hadoop003 app]$ ln -s wormhole-0.6.2 wormhole
[hadoop@hadoop003 app]$ sudo vi /etc/profile
export WORMHOLE_HOME=/home/hadoop/app/wormhole
export PATH=$WORMHOLE_HOME/bin:$PATH
[hadoop@hadoop003 app]$ source /etc/profile
[hadoop@hadoop003 app]$ vi $WORMHOLE_HOME/conf/application.conf
mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_mixed_case_count=0;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_number_count=3;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_special_char_count=0;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_length=3;
Query OK, 0 rows affected (0.00 sec)
mysql> CREATE DATABASE IF NOT EXISTS wormhole DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
Query OK, 1 row affected (0.01 sec)
mysql> CREATE USER 'wormhole'@'%' IDENTIFIED BY '123456';
Query OK, 0 rows affected (0.01 sec)
mysql> GRANT ALL ON wormhole.* TO 'wormhole'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
mysql> exit
Bye
创建测试topic
[hadoop@hadoop002 ~]$ cd /usr/hdp/current/kafka-broker/bin
[hadoop@hadoop002 bin]$ ./kafka-topics.sh --zookeeper hadoop001:2181 --create --topic test_source --replication-factor 1 --partitions 1
[hadoop@hadoop002 bin]$ ./kafka-topics.sh --zookeeper hadoop001:2181 --list
创建Instance
创建Database
创建Namespace
往Kafka写数据
[hadoop@hadoop002 bin]$ ./kafka-console-producer.sh --broker-list hadoop002:6667 --topic test_source --property "parse.key=true" --property "key.separator=@@@"
[hadoop@hadoop003 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop001:6667 --from-beginning --topic test_source
{"id": 1, "name": "test", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:00:00"}
创建Source Namaspace
创建Sink Namaspace
useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
创建Lookup Namespace
useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&noAccessToProcedureBodies=true&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false
创建Project并授权
创建Stream
添加Stream
创建Flow
选择数据源source
选择目标(sink)
配置转换逻辑
select id as id1,cardBank from userCard where (id) in (kafka.hdp-kafka.test_source.test_table.id);
在hadoop01节点上登录MySQL:
CREATE DATABASE IF NOT EXISTS lookup DEFAULT CHARSET utf8 COLLATE utf8_unicode_ci;
use lookup;
CREATE TABLE IF NOT EXISTS `userCard` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`cardBank` VARCHAR(200) NOT NULL,
PRIMARY KEY (`id`)
)ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO userCard (id, cardBank) VALUES (1, "CMB");
INSERT INTO userCard (id, cardBank) VALUES (2, "CITIC");
INSERT INTO userCard (id, cardBank) VALUES (3, "ABC");
INSERT INTO userCard (id, cardBank) VALUES (4, "BOC");
INSERT INTO userCard (id, cardBank) VALUES (5, "CEB");
INSERT INTO userCard (id, cardBank) VALUES (6, "CCB");
INSERT INTO userCard (id, cardBank) VALUES (7, "ICBC");
INSERT INTO userCard (id, cardBank) VALUES (8, "CMBC");
INSERT INTO userCard (id, cardBank) VALUES (9, "SPDB");
INSERT INTO userCard (id, cardBank) VALUES (10, "GDB");
sink table
CREATE DATABASE IF NOT EXISTS testdb DEFAULT CHARSET utf8 COLLATE utf8_unicode_ci;
use testdb;
CREATE TABLE IF NOT EXISTS `user` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`name` VARCHAR(200) NOT NULL,
`cardBank` VARCHAR(200) NOT NULL,
`phone` VARCHAR(200) NOT NULL,
`city` VARCHAR(200) NOT NULL,
PRIMARY KEY (`id`)
)ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci;
启动Flow
注意:Flow suspending状态代表挂起状态,标识Flow信息已注册到Stream中,Stream目前处于非 running状态。Stream状态正常后Flow状态会自动切换到running或failed状态
启动Stream
问题:flow一直failed,百思不得其解
每台机器的/usr/share/java/文件夹下必须要有mysql-connector-java-8.0.18.jar
com.mysql.cj.exceptions.CJException: Access denied for user 'root'@'node01' (using password: YES)
这是因为我们配置的MySQL的namespace使用的root用户,而root用户不允许远程访问。
解决办法1:改用其他用户,且其他用户具有相关权限(远程访问,已经对前面创建的那几个
database的读写权限)
解决办法2:开通root用户的远程访问权限。
mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_mixed_case_count=0;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_number_count=3;
Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_special_char_count=0;
Query OK, 0 rows affected (0.01 sec)
mysql> set global validate_password_length=3;
Query OK, 0 rows affected (0.01 sec)
mysql> CREATE USER 'aaron'@'%' IDENTIFIED BY '123456';
Query OK, 0 rows affected (0.01 sec)
mysql> GRANT ALL ON testdb.* TO 'aaron'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL ON lookup.* TO 'aaron'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)
至此,完美解决
去MySQL数据库查看结果
再插入一条数据观察情况
[root@hadoop002 java]# cd /usr/hdp/current/kafka-broker/bin
[root@hadoop002 bin]# ./kafka-console-producer.sh --broker-list hadoop002:6667 --topic test_source --property "parse.key=true" --property "key.separator=@@@"
>data_increment_data.kafka.hdp-kafka.test_source.test_table.*.*.*@@@{"id": 2, "name": "test1", "phone":"18074546423", "city": "Shanghai", "time": "2017-12-22 10:00:00"}
漂亮!
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/27529.html