大家好,欢迎来到IT知识分享网。
实现 doris 的 source
参考 mysql 的 source 来实现 doris 的 source, 因为直接使用 mysql 的 source 存在如下的几个小问题:
- doris 的一些列类型不被支持,界面会显示为 NullType
- doris 里的视图(view)没法被正常采集,导致视图的界面没有列的信息
doris 自己的一些列类型的支持
按如下注册就可以了:
LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")
register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)
视图(view)采集的支持
问题原因是 SQLAlchemy 中 MySQLDialect 的 _setup_parser 方法对建表语句是不是一个视图的判断是一个这样的正则:”^CREATE (?:ALGORITHM)?.* VIEW“,而 doris 内部和 mysql 的表示是不一样的,doris 的用 “^CREATE VIEW” 去匹配就可以了。
MySQLDialect 中的 _setup_parser 方法:
@reflection.cache
def _setup_parser(self, connection, table_name, schema=None, **kw):
charset = self._connection_charset
parser = self._tabledef_parser
full_name = ".".join(
self.identifier_preparer._quote_free_identifiers(
schema, table_name
)
)
sql = self._show_create_table(
connection, None, charset, full_name=full_name
)
if re.match(r"^CREATE (?:ALGORITHM)?.* VIEW", sql):
# Adapt views to something table-like.
columns = self._describe_table(
connection, None, charset, full_name=full_name
)
sql = parser._describe_to_create(table_name, columns)
return parser.parse(sql, charset)
DorisSource 类继承 SQLAlchemySource,重写 _process_view 方法,在 _process_view 里用自己实现的 doris_setup_parser 方法替换掉 MySQLDialect 的 _setup_parser 方法:
setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect)
inspector.bind.dialect._setup_parser = setup_parser
doris_setup_parser 方法的实现在下面的完整代码中。
Doris source 的完整实现代码
metadata-ingestion/src/datahub/ingestion/source/sql/doris.py
# This import verifies that the dependencies are available.
import pymysql # noqa: F401
from pydantic.fields import Field
from sqlalchemy.dialects.mysql import base
from sqlalchemy.dialects.mysql.base import MySQLDialect
from pydantic.fields import Field
from sqlalchemy import create_engine, dialects, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import sqltypes as types
import types as python_types
from typing import (
Dict,
Iterable,
Optional,
Union,
cast
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
import logging
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
SubTypesClass,
ViewPropertiesClass,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
make_sqlalchemy_type,
register_custom_type,
SqlWorkUnit,
get_schema_metadata,
make_dataset_urn_with_platform_instance
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
NumberTypeClass
)
GEOMETRY = make_sqlalchemy_type("GEOMETRY")
POINT = make_sqlalchemy_type("POINT")
LINESTRING = make_sqlalchemy_type("LINESTRING")
POLYGON = make_sqlalchemy_type("POLYGON")
DECIMAL128 = make_sqlalchemy_type("DECIMAL128")
LARGEINT = make_sqlalchemy_type("LARGEINT")
BITMAP = make_sqlalchemy_type("BITMAP")
HLL = make_sqlalchemy_type("HLL")
HYPERLOGLOG = make_sqlalchemy_type("HYPERLOGLOG")
register_custom_type(GEOMETRY)
register_custom_type(POINT)
register_custom_type(LINESTRING)
register_custom_type(POLYGON)
register_custom_type(DECIMAL128)
register_custom_type(LARGEINT, NumberTypeClass)
register_custom_type(BITMAP)
register_custom_type(HLL)
register_custom_type(HYPERLOGLOG)
base.ischema_names["geometry"] = GEOMETRY
base.ischema_names["point"] = POINT
base.ischema_names["linestring"] = LINESTRING
base.ischema_names["polygon"] = POLYGON
base.ischema_names["decimal128"] = DECIMAL128
base.ischema_names["largeint"] = LARGEINT
base.ischema_names["bitmap"] = BITMAP
base.ischema_names["hll"] = HLL
base.ischema_names["hyperloglog"] = HYPERLOGLOG
logger: logging.Logger = logging.getLogger(__name__)
class DorisConfig(BasicSQLAlchemyConfig):
# defaults
host_port = Field(default="localhost:3306", description="MySQL host URL.")
scheme = "mysql+pymysql"
def get_identifier(self, *, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.database_alias:
return f"{self.database_alias}.{table}"
else:
return regular
@platform_name("Doris")
@config_class(DorisConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class DorisSource(SQLAlchemySource):
"""
This plugin extracts the following:
Metadata for databases, schemas, and tables
Column types and schema associated with each table
Table, row, and column statistics via optional SQL profiling
"""
def __init__(self, config, ctx):
super().__init__(config, ctx, self.get_platform())
def get_platform(self):
return "doris"
@classmethod
def create(cls, config_dict, ctx):
print(">>>> doris")
config = DorisConfig.parse_obj(config_dict)
return cls(config, ctx)
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: BasicSQLAlchemyConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
try:
setup_parser = python_types.MethodType(doris_setup_parser, inspector.bind.dialect)
inspector.bind.dialect._setup_parser = setup_parser
columns = inspector.get_columns(view, schema)
except KeyError:
# For certain types of views, we are unable to fetch the list of columns.
self.report.report_warning(
dataset_name, "unable to get schema for this view"
)
schema_metadata = None
else:
schema_fields = self.get_schema_fields(dataset_name, columns)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
canonical_schema=schema_fields,
)
try:
# SQLALchemy stubs are incomplete and missing this method.
# PR: https://github.com/dropbox/sqlalchemy-stubs/pull/223.
view_info: dict = inspector.get_table_comment(view, schema) # type: ignore
except NotImplementedError:
description: Optional[str] = None
properties: Dict[str, str] = {}
except ProgrammingError as pe:
# Snowflake needs schema names quoted when fetching table comments.
logger.debug(
f"Encountered ProgrammingError. Retrying with quoted schema name for schema {schema} and view {view}",
pe,
)
description = None
properties = {}
view_info: dict = inspector.get_table_comment(view, f'"{schema}"') # type: ignore
else:
description = view_info["text"]
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info.get("properties", {})
try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""
properties["view_definition"] = view_definition
properties["is_view"] = "True"
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
db_name = self.get_db_name(inspector)
yield from self.add_table_to_schema_container(dataset_urn, db_name, schema)
if self.is_stateful_ingestion_configured():
cur_checkpoint = self.get_current_checkpoint(
self.get_default_ingestion_job_id()
)
if cur_checkpoint is not None:
checkpoint_state = cast(
BaseSQLAlchemyCheckpointState, cur_checkpoint.state
)
checkpoint_state.add_view_urn(dataset_urn)
dataset_properties = DatasetPropertiesClass(
name=view,
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
if schema_metadata:
dataset_snapshot.aspects.append(schema_metadata)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
subtypes_aspect = MetadataWorkUnit(
id=f"{view}-subtypes",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["view"]),
),
)
self.report.report_workunit(subtypes_aspect)
yield subtypes_aspect
if "view_definition" in properties:
view_definition_string = properties["view_definition"]
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
view_properties_wu = MetadataWorkUnit(
id=f"{view}-viewProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="viewProperties",
aspect=view_properties_aspect,
),
)
self.report.report_workunit(view_properties_wu)
yield view_properties_wu
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)
import re
def doris_setup_parser(self, connection, table_name, schema=None, **kw):
charset = self._connection_charset
parser = self._tabledef_parser
full_name = ".".join(
self.identifier_preparer._quote_free_identifiers(
schema, table_name
)
)
sql = self._show_create_table(
connection, None, charset, full_name=full_name
)
if re.match(r"^CREATE VIEW", sql):
# Adapt views to something table-like.
columns = self._describe_table(
connection, None, charset, full_name=full_name
)
sql = parser._describe_to_create(table_name, columns)
return parser.parse(sql.lower(), charset)
最后让 doris source 可以被找到
对 metadata-ingestion/setup.py 文件进行修改,在”datahub.ingestion.source.plugins” 中增加 “doris = datahub.ingestion.source.sql.doris:DorisSource”。
...
entry_points = {
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
...
"doris = datahub.ingestion.source.sql.doris:DorisSource",
...
],
...
最后的最后
recipe.yml 的配置例子:
source:
type: doris
config:
platform_instance: doris_dev
host_port: 'doris_fe_ip:9030'
database: data_market
# Credentials
username: doris_username
password: doris_password
include_tables: true
include_views: true
schema_pattern:
deny:
- sys
- mysql
- information_schema
- performance_schema
sink:
# sink configs
type: datahub-rest
config:
server: 'http://gms-server-ip:8080'
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/54455.html