kudu系统性总结,这一篇就足够

kudu系统性总结,这一篇就足够0x00、总体概览1.历史发展2012年10月由Cloudera公司发起创建,2015年10月对外公布,2015年12月进入apache基金会进

大家好,欢迎来到IT知识分享网。

0x00、总体概览

1.历史发展

  • 2012年10月由Cloudera公司发起创建,2015年10月对外公布,2015年12月进入apache基金会进行孵化,后续进入快速发展轨道。

2.产品定位

  • Fast Analytics on Fast Data。
  • 在快速修改的数据上进行快速分析

3.产生之前

在有kudu之前,数据分类为:

静态数据

  • 以HDFS引擎作为存储引擎,适⽤用于⾼吞吐量的离线大数据分析场景
  • 这类存储的局限性是数据无法进行行随机的读写

动态数据

  • 以HBase、Cassandra 作为存储引擎,适⽤用于⼤大数据随机读写场景
  • 这类存储的局限性是批量量读取吞吐量量远不不如 HDFS,不不适⽤用于批量量数据分 析的场景

而既要满足随机读写和大数据分析的解决方案如下:

kudu系统性总结,这一篇就足够

而以上的方案问题是:

架构复杂

  • 流转涉及环节太多,运维成本很高
  • 每个环节需要保证高可用,需要维护多个副本,存储空间也有一定的浪费
  • 数据在多个系统上,对数据安全策略、监控等都提出了挑战

时效性低

  • 从 HBase 导出成静态文件是周期性

难以应对后续的更新

  • 已经从 HBase 导出到 HDFS,新到的变更数据就难以处理

所以产生了新的解决方案,那就是kudu,其定位如下:

kudu系统性总结,这一篇就足够

而其gap了hdfs和hbase之间的

kudu系统性总结,这一篇就足够

4.架构图

kudu的整体架构图如下

kudu系统性总结,这一篇就足够

5.基本概念

下面了解一些kudu里的一些基本概念。

Table

  • 数据存储,没有库的概念
  • 有对应的schema结构
  • 字段是强类型
  • 需要设置主键
  • 分隔为N个tablets

Tablet

  • Table按照行切分后存储的位置
  • 一个tablet会多副本到其他server上,并且有一个会是leader
  • 如果leader失效,会用raft一致性协议重新选举
  • 任何一组副本内的tablet都可以读,但是只能从leader写入

Mater

  • 持续追踪所有的tablet、tablet server、catalog table、其他的集群meta信息
  • 可以部署多个master,但只有一个是active
  • 如果active时效了,用raft协议在选举出一个
  • Master信息也会存储在一个tablet上,并且被复制同步到其他master机器上
  • Tablet servers会发送心跳包到master机器(默认是一秒一次)

Tablet Server

  • 用来存储tablet数据并且向client提供服务
  • 一个tablet会在多个tablet server上
  • 一个tablet server会有多个tablet

Catelog table

  • 是kudu元数据的中心,存储了关于table和tablet的信息
  • 不允许被直接读写

Raft一致性协议算法

  • 分布式服务高可用性进行多副本
  • 通过raft来选举出leader
  • 多副本数据都可以读,但只有leader可以写
  • 写入时只有当大多数follower确认成功,才算写入成功
  • 在一组副本中(通常是3或5个),允许有(N-1)/2的失败

6.使用场景

流式实时计算

  • 流式计算场景通常有持续不断地大量写入
  • 与此同时这些数据还要支持近乎实时的读、写以及更新操作

时间序列存储

  • hash分片设计能够很好地避免TSDB类请求的局部热点问题
  • 高效的Scan性能让Kudu能够比Hbase更好的支持查询操作

历史数据共存

  • Impala可以同时支持HDFS、Kudu等多个底层存储引擎
  • 不必把所有的数据都迁移到Kudu

0x01、设计模式

1.数据类型

  • boolean
  • 8-bit signed integer
  • 16-bit signed integer
  • 32-bit signed integer
  • 64-bit signed integer
  • unixtime_micros (64-bit microseconds since the Unix epoch)
  • single-precision (32-bit) IEEE-754 floating-point number
  • double-precision (64-bit) IEEE-754 floating-point number
  • decimal (see Decimal Type for details)
  • UTF-8 encoded string (up to 64KB uncompressed)
  • binary (up to 64KB uncompressed)

2.主键设计

  • 单列或多列
  • 可以设置一列为主键,也同时设置多列为主键
  • 唯一性
  • 数据是全表唯一的
  • 不可被更改性
  • 在schema建成后,主键不能够修改
  • 其值不能被更改,不同即新增
  • 主键索引
  • 为主键建立B+索引

3.分区设计(Partition)

表分区类型

hash

  • 按照某一个或几个值的hash进行分区
  • 可指定分区的个数

range

  • 按照某一个字段值的范围分区
  • 可以动态地增删

multilevel

  • 可以同时按照hash、range结合来分区
  • 既分散热点数据又可动态增删

比如建立如下一个表

CREATE TABLE metrics ( host STRING NOT NULL, metric STRING NOT NULL, time INT64 NOT NULL,       value DOUBLE NOT NULL,       PRIMARY KEY (host, metric, time)   );

按照range来分区如下

kudu系统性总结,这一篇就足够

按照hash分区如下

kudu系统性总结,这一篇就足够

同时按照range和hash来分区

kudu系统性总结,这一篇就足够

同时按照hash和hash来分区

kudu系统性总结,这一篇就足够

那么hash与range的优缺点是

kudu系统性总结,这一篇就足够

4.字段编码

编码的类型如下

kudu系统性总结,这一篇就足够

每中字段适用及默认的编码类型如下

kudu系统性总结,这一篇就足够

5.数据压缩

编码类型有一下几种

  • LZ4 可以设置为默认
  • zlib
  • snappy

0x02、存储结构

逻辑结构图如下

kudu系统性总结,这一篇就足够

文件内容

  • BloomFile:根据DiskRowSet中key生成一个bloom filter,用于快速模糊的定位某一个key是否在DiskRowSet中
  • Ad_hoc Index:是主键的索引,用于定位key在DiskRowSet中具体哪个偏移位置
  • BaseData:是MemRowSet flush下来的数据,按照列存储,按照主键有序
  • UndoFile:是BaseData之前的数据历史数据
  • RedoFile:是BaseData之后的mutation记录,可以获得较新的数据
  • DeltaMem:用于在内存中存储mutation记录,先写到内存中,然后写满后flush到磁盘,形成 DeltaFile

0x03、读写流程

1.锁类型

共享锁(读锁)

  • 获得后只能够进行读操作
  • 其他读可以,任何写等待

互斥锁(写锁)

  • 只能获得锁的对象进行写
  • 任何读等待,其他写等待

MVCC(Multi-Version Concurrency Control)

  • 每一个写操作都会创建一个新版本的数据
  • 读操作会从有限多个版本的数据中挑选一个最合适的结果直接返回
  • 读写操作之间的冲突就不再需要被关注,而管理和快速挑选数据的版本就成了 MVCC 需要解决的主要问题。

2.存储数据

kudu系统性总结,这一篇就足够

kudu系统性总结,这一篇就足够

fluash示例如下

kudu系统性总结,这一篇就足够

3.读写流程

写流程:

a)插入操作:

  • 客户端连接到TMaster获取表的相关信息(分区和tablet信息);
  • 找到负责写请求的tablet所在的TServer,kudu接受客户端的请求,检查本次写操作是否符合要求;
  • kudu在tablet中所有RowSet中,查找是否存在与待插入数据相同主键的记录,如果存在,返回错误,否则继续;
  • 写入操作先被提交到tablet 的预写日志(WAL)上,然后根据Raft 一致性算法取得追随节点的同意后,才会被添加到其中一个tablet 的内存中,插入到MenRowSet中。(因为在MemRowSet 中支持了多版本并发控制(mvcc) ,对最近插入的行(未刷新到磁盘上的新的行)的更新和删除操作将被追加到MemRowSet中的原始行之后以生成Redo 记录的列表)。
  • kudu在MemRowSet 中写入新数据,在MemRowSet 达到一定大小或者时间限制(1G 或者 120s),MemRowSet 会将数据落盘,生成一个DiskRowSet 用于持久化数据 和 一个 MemRowSet 继续接受新数据的请求。

b)更新操作:

  • 客户端连接到TMaster获取表的相关信息(分区和tablet信息);
  • 找到负责写请求的tablet 所在的TServer,kudu接受客户端的请求,检查本次写操作是否符合要求;
  • 因为待更新的数据可能位于MemRowSet ,也可能位于DiskRowSet 中,所以根据待更新的数据所处的位置,kudu有不同的做法:
  • 当待更新的数据位于MemRowSet时,找到它所在的行,然后将跟新操作记录在所在行中的一个mutation的链表中,在MemRowSet 数据落地的时候,kudu会将更新合并到base data,并生成undo records 用于查看历史版本的数据和MVCC, undo records 实际上也是以 DeltaFile 的形式存放;
  • 当待跟新的数据位于DiskRowSet时,找到待跟新数据所在的DiskRowSet ,每个DiskRowSet 都会在内存中设置一个DeltaMemStore,将更新操作记录在DeltaMemStore中,在DeltaMemStore达到一定大小时,flush 在磁盘,形成Delta并存放在DeltaFile中。

读流程:

  • 客户端连接TMaster 获取表的相关信息,包括分区和表中的tablet 的信息;
  • 客户端找到 tablet 所在的TServer 以后,kudu接受读请求,并记录timestamp(没有显示指定就使用当前时间)信息;
  • 从内存中读取数据,即是从MemRowSet 和 DeltaRowSet中读取数据,根据timestamp来找到对应的mutation链表;
  • 从磁盘中读取数据,从metadata文件中使用boomfilter 快速模糊的判断所有候选的RowSet中是否包含此key,然后从DiskRowSet 中读取数据,实际上是根据B+树,判断key 在这些DiskRowSet 中的range 范围内,然后从metadata文件中,获取index来判断rowID 在Data中的偏移,根据读操作中的timestamp 信息判断是否需要对basedata中的数据进行回滚,从而获取数据。

4.compaction

kudu系统性总结,这一篇就足够

5.kudu api

from datetime import datetime import kudu from kudu.client import Partitioning   # Connect to Kudu master server(s).  client = kudu.connect(host='192.168.96.162', port=7051)   # Define a schema for a new table.  builder = kudu.schema_builder() builder.add_column('key').type(kudu.int64).nullable(False).primary_key() builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') schema = builder.build()   # Define the partitioning schema.  partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3).set_range_partition_columns(['year']).add_range_partition({'year':2018},{'year':2019})   # Delete table if it already exists.  if client.table_exists('python-example'):   client.delete_table('python-example')   # Create a new table.  client.create_table('python-example', schema, partitioning)   # Open a table.  table = client.table('python-example')   # Create a new session so that we can apply write operations.  session = client.new_session()   try: # Insert a row.  op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) session.apply(op)   # Upsert a row.  op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) session.apply(op)   # Update a row. op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) session.apply(op)   # Delete a row.  op = table.new_delete({'key': 2}) session.apply(op)   # Flush write operations, if failures occur, print them. try:   session.flush() except kudu.KuduBadStatus:   print(session.get_pending_errors())   # Create a scanner and add a predicate. scanner = table.scanner() # Open scanner and print all tuples. # Note: This doesn't scale for large scans  scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))   # The expected output: [(1, datetime.datetime(2017, 1, 1, 0, 0, tzinfo=<UTC>))]  print(scanner.open().read_all_tuples())

0x04、Impala结合

1.功能简介

支持存储

  • HDFS、Hbase、Kudu、Hive
  • csv、file

结构概念

  • 库、表、视图、角色
  • 权限、函数

多种类型

  • array、bigint、boolean、char
  • map、struct

2.表的概念

内部表

  • 通过impala创建的表都是内部表(internal)
  • 修改表结构对应的底层表也会进行变动
  • 删除表底层表也删除

外部表

  • 不同过impala创建的表都是外部表(external)
  • 要操作这些表,需要在impala建立表的映射关系
  • 通过sql修改表结构或者删除表不会删除底层表,删除的只是表的映射关系

3.建表示例

/*  创建内部表 */  CREATE TABLE default.example ( user_id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   stat_date STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   year INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   nick_name STRING NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   user_type INT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   gender INT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,   PRIMARY KEY (user_id, stat_date, year) ) PARTITION BY  HASH (user_id, stat_date) PARTITIONS 10, RANGE (year) ( PARTITION VALUE = 2018, PARTITION VALUE = 2019, PARTITION VALUE = 2020, PARTITION VALUE = 2021, PARTITION VALUE = 2022 ) STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='192.168.1.10:7051,192.168.1.11:7051,192.168.1.12:7051', 'kudu.table_name'='example'); /*创建外部表*/  CREATE EXTERNAL TABLE `example2` STORED AS KUDU TBLPROPERTIES( 'kudu.example' = 'example', 'kudu.master_addresses' = '192.168.1.10:7051,192.168.1.11:7051,192.168.1.12:7051');

4.属性操作

/* 修改impala表的名字 */  ALTER TABLE my_table RENAME TO my_new_table;    /* 修改kudu表的名字 */  // 现在好像不能这么用了 ALTER TABLE my_internal_table SET TBLPROPERTIES('kudu.table_name' = 'new_name')   /* 修改kudu master地址 */  ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-original-master.example.com:7051, kudu-new-master.example.com:7051');   /* 修改内部表改为外部表 */  ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');   /* 添加range */  ALTER TABLE my_table ADD RANGE PARTITION VALUE = 2017;   /* 删除range */  ALTER TABLE my_table DROP RANGE PARTITION VALUE = 2012;   /* 添加字段 */  alter table my_table add column field1 INT DEFAULT 0 ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION COMMENT 'comments'; /* 删除字段 */ alter table my_table drop field2;

5.常用操作

/* 查询 */  select user_id, crawler_time from weib_user_profile where user_id=;   /* join查询 */ SELECT t1.c1, t2.c2 FROM t1 JOIN t2   ON t1.id = t2.id and t1.type_flag = t2.type_flag   WHERE t1.c1 > 100; SELECT t1.c1, t2.c2 FROM t1 JOIN t2   USING (id, type_flag)   WHERE t1.c1 > 100;   /* 半链接 */  SELECT t1.c1, t1.c2, t1.c2 FROM t1 LEFT SEMI JOIN t2 ON t1.id = t2.id;   /* 反链接 */  select x from t1 left anti join t2 on (t1.x = t2.y);   /* 插入数据 */  INSERT INTO my_first_table VALUES (99, "sarah"); INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim"); insert overwrite table new_table select * from default.test_table limit 3;   /* upsert操作 */  UPSERT INTO kudu_table (pk, c1, c2, c3) VALUES (0, 'hello', 50, true), (1, 'world', -1, false); UPSERT INTO production_table SELECT * FROM staging_table;   /* 修改 */  UPDATE kudu_table SET c3 = NULL WHERE c1 > 100 AND c3 IS NULL;   /* 删除数据 */  DELETE FROM my_first_table WHERE id < 3; DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;

6.建表注意事项

主键的hash+range

要看数据量,如果太大,hash的partition要足够大,range可以按年月来处理

0x05、特性

1.表限制

  • 表的备份数必须为奇数,最⼤大为7
  • 备份数在设置后不不可修改
  • 删除表后空间会⽴立⻢马释放
  • 建表时partition总数限制为60个,后续可以添加

2.列限制

  • 部分数据类型不支持,如CHAR, VARCHAR, DATE, ARRAY等
  • 数据类型以及是否可为空等列列属性不支持修改
  • 一张表最多有300列列
  • 删除列列的时候不会立马清除空间,需要执⾏Compaction操作,但是 Compaction操作不⽀持手动执行

3.主键限制

  • 表创建后主键不不可更更改
  • 主键字段必须在其他字段之前定义
  • ⼀行对应的主键内容不可以被Update操作修改。要修改⼀行的主键值,需 要删除并新增⼀行新数据,并且该操作无法保持原子性
  • 主键的类型不支持DOUBLE、FLOAT、BOOL,且必须是非空的(NOT NULL)
  • 不支持⾃自动⽣生成主键
  • 每行对应的主键存储单元(CELL)最⼤为16KB

4.单元限制

  • 单元对应的数据最⼤大为64KB,并且是在压缩前

5.分片限制

  • 分⽚只支持手动指定,⾃自动分⽚不支持
  • 分片设定不支持修改,修改分⽚设定需要”建新表-导数据-删老表”操作;
  • 丢掉多数备份的Tablets需要⼿手动修复。
  • Range⽀持删除或新增

6.容量建议

  • 建议tablet servers的最⼤数量为100;
  • 建议masters的最⼤数量为3;
  • 建议每个tablet server存储的数据最大为8T
  • 每个tablet server存储的tablets数量建议在1000以内;
  • 每个表分片后的tablets存储在单个tablet server的最大数量为60。

7.Impala集成限制

  • 不支持varchar、array、map、struct等类型
  • Impala⼤小写不敏敏感,kudu是⼤小写敏敏感,所以表名及字段最好都是小写
  • !=和like查询是kudu把数据全部给impala,然后在过滤的,相⽐其他操作效率要低
  • Updates、inserts、deletes都是⾮非事务的
  • 一条sql的并发执⾏行数在于tablets的个数,所以推荐至少有10个tablets

8.其他限制

  • 主键有索引,不支持⼆级索引(Secondary indexes)
  • 不支持多⾏的事务操作
  • 关系型数据的一些功能,如外键,不支持;
  • 列和表的名字强制为UTF-8编码,并且最大256字节

0x06、文档资源

  • https://kudu.apache.org/kudu.pdf
  • https://www.cloudera.com/documentation/enterprise/latest/PDF/cloudera-kudu.pdf
  • https://kudu.apache.org/docs/
  • https://github.com/apache/kudu/blob/master/docs/design-docs/tablet.md
  • https://github.com/apache/kudu/blob/master/docs/design-docs/cfile.md
  • https://raft.github.io/raft.pdf
  • https://impala.apache.org/docs/build/impala-2.8.pdf
  • https://www.cloudera.com/documentation/enterprise/5-10-x/topics/impala_tutorial.html

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/72238.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信