大家好,欢迎来到IT知识分享网。
0x00、总体概览
1.历史发展
- 2012年10月由Cloudera公司发起创建,2015年10月对外公布,2015年12月进入apache基金会进行孵化,后续进入快速发展轨道。
2.产品定位
- Fast Analytics on Fast Data。
- 在快速修改的数据上进行快速分析
3.产生之前
在有kudu之前,数据分类为:
静态数据
- 以HDFS引擎作为存储引擎,适⽤用于⾼吞吐量的离线大数据分析场景
- 这类存储的局限性是数据无法进行行随机的读写
动态数据
- 以HBase、Cassandra 作为存储引擎,适⽤用于⼤大数据随机读写场景
- 这类存储的局限性是批量量读取吞吐量量远不不如 HDFS,不不适⽤用于批量量数据分 析的场景
而既要满足随机读写和大数据分析的解决方案如下:
而以上的方案问题是:
架构复杂
- 流转涉及环节太多,运维成本很高
- 每个环节需要保证高可用,需要维护多个副本,存储空间也有一定的浪费
- 数据在多个系统上,对数据安全策略、监控等都提出了挑战
时效性低
- 从 HBase 导出成静态文件是周期性
难以应对后续的更新
- 已经从 HBase 导出到 HDFS,新到的变更数据就难以处理
所以产生了新的解决方案,那就是kudu,其定位如下:
而其gap了hdfs和hbase之间的
4.架构图
kudu的整体架构图如下
5.基本概念
下面了解一些kudu里的一些基本概念。
Table
- 数据存储,没有库的概念
- 有对应的schema结构
- 字段是强类型
- 需要设置主键
- 分隔为N个tablets
Tablet
- Table按照行切分后存储的位置
- 一个tablet会多副本到其他server上,并且有一个会是leader
- 如果leader失效,会用raft一致性协议重新选举
- 任何一组副本内的tablet都可以读,但是只能从leader写入
Mater
- 持续追踪所有的tablet、tablet server、catalog table、其他的集群meta信息
- 可以部署多个master,但只有一个是active
- 如果active时效了,用raft协议在选举出一个
- Master信息也会存储在一个tablet上,并且被复制同步到其他master机器上
- Tablet servers会发送心跳包到master机器(默认是一秒一次)
Tablet Server
- 用来存储tablet数据并且向client提供服务
- 一个tablet会在多个tablet server上
- 一个tablet server会有多个tablet
Catelog table
- 是kudu元数据的中心,存储了关于table和tablet的信息
- 不允许被直接读写
Raft一致性协议算法
- 分布式服务高可用性进行多副本
- 通过raft来选举出leader
- 多副本数据都可以读,但只有leader可以写
- 写入时只有当大多数follower确认成功,才算写入成功
- 在一组副本中(通常是3或5个),允许有(N-1)/2的失败
6.使用场景
流式实时计算
- 流式计算场景通常有持续不断地大量写入
- 与此同时这些数据还要支持近乎实时的读、写以及更新操作
时间序列存储
- hash分片设计能够很好地避免TSDB类请求的局部热点问题
- 高效的Scan性能让Kudu能够比Hbase更好的支持查询操作
历史数据共存
- Impala可以同时支持HDFS、Kudu等多个底层存储引擎
- 不必把所有的数据都迁移到Kudu
0x01、设计模式
1.数据类型
- boolean
- 8-bit signed integer
- 16-bit signed integer
- 32-bit signed integer
- 64-bit signed integer
- unixtime_micros (64-bit microseconds since the Unix epoch)
- single-precision (32-bit) IEEE-754 floating-point number
- double-precision (64-bit) IEEE-754 floating-point number
- decimal (see Decimal Type for details)
- UTF-8 encoded string (up to 64KB uncompressed)
- binary (up to 64KB uncompressed)
2.主键设计
- 单列或多列
- 可以设置一列为主键,也同时设置多列为主键
- 唯一性
- 数据是全表唯一的
- 不可被更改性
- 在schema建成后,主键不能够修改
- 其值不能被更改,不同即新增
- 主键索引
- 为主键建立B+索引
3.分区设计(Partition)
表分区类型
hash
- 按照某一个或几个值的hash进行分区
- 可指定分区的个数
range
- 按照某一个字段值的范围分区
- 可以动态地增删
multilevel
- 可以同时按照hash、range结合来分区
- 既分散热点数据又可动态增删
比如建立如下一个表
CREATE TABLE metrics ( host STRING NOT NULL, metric STRING NOT NULL, time INT64 NOT NULL, value DOUBLE NOT NULL, PRIMARY KEY (host, metric, time) );
按照range来分区如下
按照hash分区如下
同时按照range和hash来分区
同时按照hash和hash来分区
那么hash与range的优缺点是
4.字段编码
编码的类型如下
每中字段适用及默认的编码类型如下
5.数据压缩
编码类型有一下几种
- LZ4 可以设置为默认
- zlib
- snappy
0x02、存储结构
逻辑结构图如下
文件内容
- BloomFile:根据DiskRowSet中key生成一个bloom filter,用于快速模糊的定位某一个key是否在DiskRowSet中
- Ad_hoc Index:是主键的索引,用于定位key在DiskRowSet中具体哪个偏移位置
- BaseData:是MemRowSet flush下来的数据,按照列存储,按照主键有序
- UndoFile:是BaseData之前的数据历史数据
- RedoFile:是BaseData之后的mutation记录,可以获得较新的数据
- DeltaMem:用于在内存中存储mutation记录,先写到内存中,然后写满后flush到磁盘,形成 DeltaFile
0x03、读写流程
1.锁类型
共享锁(读锁)
- 获得后只能够进行读操作
- 其他读可以,任何写等待
互斥锁(写锁)
- 只能获得锁的对象进行写
- 任何读等待,其他写等待
MVCC(Multi-Version Concurrency Control)
- 每一个写操作都会创建一个新版本的数据
- 读操作会从有限多个版本的数据中挑选一个最合适的结果直接返回
- 读写操作之间的冲突就不再需要被关注,而管理和快速挑选数据的版本就成了 MVCC 需要解决的主要问题。
2.存储数据
fluash示例如下
3.读写流程
写流程:
a)插入操作:
- 客户端连接到TMaster获取表的相关信息(分区和tablet信息);
- 找到负责写请求的tablet所在的TServer,kudu接受客户端的请求,检查本次写操作是否符合要求;
- kudu在tablet中所有RowSet中,查找是否存在与待插入数据相同主键的记录,如果存在,返回错误,否则继续;
- 写入操作先被提交到tablet 的预写日志(WAL)上,然后根据Raft 一致性算法取得追随节点的同意后,才会被添加到其中一个tablet 的内存中,插入到MenRowSet中。(因为在MemRowSet 中支持了多版本并发控制(mvcc) ,对最近插入的行(未刷新到磁盘上的新的行)的更新和删除操作将被追加到MemRowSet中的原始行之后以生成Redo 记录的列表)。
- kudu在MemRowSet 中写入新数据,在MemRowSet 达到一定大小或者时间限制(1G 或者 120s),MemRowSet 会将数据落盘,生成一个DiskRowSet 用于持久化数据 和 一个 MemRowSet 继续接受新数据的请求。
b)更新操作:
- 客户端连接到TMaster获取表的相关信息(分区和tablet信息);
- 找到负责写请求的tablet 所在的TServer,kudu接受客户端的请求,检查本次写操作是否符合要求;
- 因为待更新的数据可能位于MemRowSet ,也可能位于DiskRowSet 中,所以根据待更新的数据所处的位置,kudu有不同的做法:
- 当待更新的数据位于MemRowSet时,找到它所在的行,然后将跟新操作记录在所在行中的一个mutation的链表中,在MemRowSet 数据落地的时候,kudu会将更新合并到base data,并生成undo records 用于查看历史版本的数据和MVCC, undo records 实际上也是以 DeltaFile 的形式存放;
- 当待跟新的数据位于DiskRowSet时,找到待跟新数据所在的DiskRowSet ,每个DiskRowSet 都会在内存中设置一个DeltaMemStore,将更新操作记录在DeltaMemStore中,在DeltaMemStore达到一定大小时,flush 在磁盘,形成Delta并存放在DeltaFile中。
读流程:
- 客户端连接TMaster 获取表的相关信息,包括分区和表中的tablet 的信息;
- 客户端找到 tablet 所在的TServer 以后,kudu接受读请求,并记录timestamp(没有显示指定就使用当前时间)信息;
- 从内存中读取数据,即是从MemRowSet 和 DeltaRowSet中读取数据,根据timestamp来找到对应的mutation链表;
- 从磁盘中读取数据,从metadata文件中使用boomfilter 快速模糊的判断所有候选的RowSet中是否包含此key,然后从DiskRowSet 中读取数据,实际上是根据B+树,判断key 在这些DiskRowSet 中的range 范围内,然后从metadata文件中,获取index来判断rowID 在Data中的偏移,根据读操作中的timestamp 信息判断是否需要对basedata中的数据进行回滚,从而获取数据。
4.compaction
5.kudu api
from datetime import datetime import kudu from kudu.client import Partitioning # Connect to Kudu master server(s). client = kudu.connect(host='192.168.96.162', port=7051) # Define a schema for a new table. builder = kudu.schema_builder() builder.add_column('key').type(kudu.int64).nullable(False).primary_key() builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') schema = builder.build() # Define the partitioning schema. partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3).set_range_partition_columns(['year']).add_range_partition({'year':2018},{'year':2019}) # Delete table if it already exists. if client.table_exists('python-example'): client.delete_table('python-example') # Create a new table. client.create_table('python-example', schema, partitioning) # Open a table. table = client.table('python-example') # Create a new session so that we can apply write operations. session = client.new_session() try: # Insert a row. op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) session.apply(op) # Upsert a row. op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) session.apply(op) # Update a row. op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) session.apply(op) # Delete a row. op = table.new_delete({'key': 2}) session.apply(op) # Flush write operations, if failures occur, print them. try: session.flush() except kudu.KuduBadStatus: print(session.get_pending_errors()) # Create a scanner and add a predicate. scanner = table.scanner() # Open scanner and print all tuples. # Note: This doesn't scale for large scans scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1)) # The expected output: [(1, datetime.datetime(2017, 1, 1, 0, 0, tzinfo=<UTC>))] print(scanner.open().read_all_tuples())
0x04、Impala结合
1.功能简介
支持存储
- HDFS、Hbase、Kudu、Hive
- csv、file
结构概念
- 库、表、视图、角色
- 权限、函数
多种类型
- array、bigint、boolean、char
- map、struct
2.表的概念
内部表
- 通过impala创建的表都是内部表(internal)
- 修改表结构对应的底层表也会进行变动
- 删除表底层表也删除
外部表
- 不同过impala创建的表都是外部表(external)
- 要操作这些表,需要在impala建立表的映射关系
- 通过sql修改表结构或者删除表不会删除底层表,删除的只是表的映射关系
3.建表示例
/* 创建内部表 */ CREATE TABLE default.example ( user_id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, stat_date STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, year INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, nick_name STRING NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, user_type INT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, gender INT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, PRIMARY KEY (user_id, stat_date, year) ) PARTITION BY HASH (user_id, stat_date) PARTITIONS 10, RANGE (year) ( PARTITION VALUE = 2018, PARTITION VALUE = 2019, PARTITION VALUE = 2020, PARTITION VALUE = 2021, PARTITION VALUE = 2022 ) STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='192.168.1.10:7051,192.168.1.11:7051,192.168.1.12:7051', 'kudu.table_name'='example'); /*创建外部表*/ CREATE EXTERNAL TABLE `example2` STORED AS KUDU TBLPROPERTIES( 'kudu.example' = 'example', 'kudu.master_addresses' = '192.168.1.10:7051,192.168.1.11:7051,192.168.1.12:7051');
4.属性操作
/* 修改impala表的名字 */ ALTER TABLE my_table RENAME TO my_new_table; /* 修改kudu表的名字 */ // 现在好像不能这么用了 ALTER TABLE my_internal_table SET TBLPROPERTIES('kudu.table_name' = 'new_name') /* 修改kudu master地址 */ ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-original-master.example.com:7051, kudu-new-master.example.com:7051'); /* 修改内部表改为外部表 */ ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE'); /* 添加range */ ALTER TABLE my_table ADD RANGE PARTITION VALUE = 2017; /* 删除range */ ALTER TABLE my_table DROP RANGE PARTITION VALUE = 2012; /* 添加字段 */ alter table my_table add column field1 INT DEFAULT 0 ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION COMMENT 'comments'; /* 删除字段 */ alter table my_table drop field2;
5.常用操作
/* 查询 */ select user_id, crawler_time from weib_user_profile where user_id=; /* join查询 */ SELECT t1.c1, t2.c2 FROM t1 JOIN t2 ON t1.id = t2.id and t1.type_flag = t2.type_flag WHERE t1.c1 > 100; SELECT t1.c1, t2.c2 FROM t1 JOIN t2 USING (id, type_flag) WHERE t1.c1 > 100; /* 半链接 */ SELECT t1.c1, t1.c2, t1.c2 FROM t1 LEFT SEMI JOIN t2 ON t1.id = t2.id; /* 反链接 */ select x from t1 left anti join t2 on (t1.x = t2.y); /* 插入数据 */ INSERT INTO my_first_table VALUES (99, "sarah"); INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim"); insert overwrite table new_table select * from default.test_table limit 3; /* upsert操作 */ UPSERT INTO kudu_table (pk, c1, c2, c3) VALUES (0, 'hello', 50, true), (1, 'world', -1, false); UPSERT INTO production_table SELECT * FROM staging_table; /* 修改 */ UPDATE kudu_table SET c3 = NULL WHERE c1 > 100 AND c3 IS NULL; /* 删除数据 */ DELETE FROM my_first_table WHERE id < 3; DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;
6.建表注意事项
主键的hash+range
要看数据量,如果太大,hash的partition要足够大,range可以按年月来处理
0x05、特性
1.表限制
- 表的备份数必须为奇数,最⼤大为7
- 备份数在设置后不不可修改
- 删除表后空间会⽴立⻢马释放
- 建表时partition总数限制为60个,后续可以添加
2.列限制
- 部分数据类型不支持,如CHAR, VARCHAR, DATE, ARRAY等
- 数据类型以及是否可为空等列列属性不支持修改
- 一张表最多有300列列
- 删除列列的时候不会立马清除空间,需要执⾏Compaction操作,但是 Compaction操作不⽀持手动执行
3.主键限制
- 表创建后主键不不可更更改
- 主键字段必须在其他字段之前定义
- ⼀行对应的主键内容不可以被Update操作修改。要修改⼀行的主键值,需 要删除并新增⼀行新数据,并且该操作无法保持原子性
- 主键的类型不支持DOUBLE、FLOAT、BOOL,且必须是非空的(NOT NULL)
- 不支持⾃自动⽣生成主键
- 每行对应的主键存储单元(CELL)最⼤为16KB
4.单元限制
- 单元对应的数据最⼤大为64KB,并且是在压缩前
5.分片限制
- 分⽚只支持手动指定,⾃自动分⽚不支持
- 分片设定不支持修改,修改分⽚设定需要”建新表-导数据-删老表”操作;
- 丢掉多数备份的Tablets需要⼿手动修复。
- Range⽀持删除或新增
6.容量建议
- 建议tablet servers的最⼤数量为100;
- 建议masters的最⼤数量为3;
- 建议每个tablet server存储的数据最大为8T
- 每个tablet server存储的tablets数量建议在1000以内;
- 每个表分片后的tablets存储在单个tablet server的最大数量为60。
7.Impala集成限制
- 不支持varchar、array、map、struct等类型
- Impala⼤小写不敏敏感,kudu是⼤小写敏敏感,所以表名及字段最好都是小写
- !=和like查询是kudu把数据全部给impala,然后在过滤的,相⽐其他操作效率要低
- Updates、inserts、deletes都是⾮非事务的
- 一条sql的并发执⾏行数在于tablets的个数,所以推荐至少有10个tablets
8.其他限制
- 主键有索引,不支持⼆级索引(Secondary indexes)
- 不支持多⾏的事务操作
- 关系型数据的一些功能,如外键,不支持;
- 列和表的名字强制为UTF-8编码,并且最大256字节
0x06、文档资源
- https://kudu.apache.org/kudu.pdf
- https://www.cloudera.com/documentation/enterprise/latest/PDF/cloudera-kudu.pdf
- https://kudu.apache.org/docs/
- https://github.com/apache/kudu/blob/master/docs/design-docs/tablet.md
- https://github.com/apache/kudu/blob/master/docs/design-docs/cfile.md
- https://raft.github.io/raft.pdf
- https://impala.apache.org/docs/build/impala-2.8.pdf
- https://www.cloudera.com/documentation/enterprise/5-10-x/topics/impala_tutorial.html
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/72238.html