Post

BigData - FB Trino


FB - Trino (Presto)


basic

Presto

  • 一个 facebook 开源的分布式 SQL 查询引擎
    • 基于 SQL 进行大数据分析的高性能分布式计算引擎
    • 主要用来以解决 Facebook 海量 Hadoop 数据仓库的低延迟交互分析问题
  • 为了 高效查询 不同系统和各种规模的数据源 而从头开始设计和编写的一套系统。
    • 尽管数据库的操作都大同小异,但每个数据库的操作都各不相同。
      • 这就导致开发者不得不学习无数种操作数据库的方式,很多时间被浪费在学习不同数据库的操作。
      • 这往往导致开发者疲于拼命,没有足够的时间来提升自己的开发效率。
      • 于是一个可以统一操作不同数据库的软件就是大势所趋了。
    • 最开始是用来解决Hive速度慢以及异构数据源互通的问题。

    • 它在大数据家族中属于MPP(massive parallel processing)计算引擎范畴

    • 其原理是火山(volcano)模型:
      • 将SQL抽象成一个个算子(operator),形成管线(pipeline)。
      • 目前能够支持 Hive、HBase、ES、Kudu、Kafka、MySQL、Redis、ElasticSearch等 等几十种数据源的读取。
  • 适用于交互式分析查询,数据量支持 GB 到 PB 字节。
  • Presto 的架构由关系型数据库的架构演化而来。
  • 它是 hadoop 生态中著名的分布式 SQL 引擎。

  • 2019年原作者从 Facebook 分道扬镳更名 Trino。
    • 由于开源纷争,Presto 现已更名为 Trino。

SQL 查询引擎?而不是数据库?

  • 和Oracle、MySQL、Hive等数据库相比,他们都具有存储数据和计算分析的能力。
  • 如MySQL具有InnoDB存储引擎和有SQL的执行能力;
  • 如Hive有多种数据类型、内外表(且这么叫)的管理能力,且能利用MR、TEZ执行HQL。
  • 而Presto并不直接管理数据,它只有计算的能力。

Presto

  • 支持从多种数据源获取数据来进行运算分析
  • 一条SQL查询可以将多个数据源的数据进行合并分析。

比如下面的SQL:a可以来源于MySQL,b可以来源于Hive。

1
2
select a.*,b.*
from a join b on (a.id = b.id);

背景及发展

MapReduce

  • 不能满足大数据快速实时 adhoc(即席查询)查询计算的性能要求。
    • Hadoop 提供的大数据解决方案使用的是 MR 计算框架
    • 这种计算框架适用于大数据的离线和批量计算
    • 因为该计算框架强调的是吞吐率而不是计算效率,所以其不能满足大数据快速实时 Ad-Hoc 查询计算的性能要求。

因此,开源社区和各大互联网公司纷纷进行大数据实时 Ad-Hoc 查询计算产品的研发

Presto 最初是由 Facebook 开发的一个分布式 SQL 执行引擎,

  • 它被设计为用来专门进行高速、实时的数据分析,以弥补 Hive 在速度对接多种数据源上的短板。
  • Facebook 于2012年秋季开始开发 Presto,目前该产品已经在超过 1000 名 Facebook 雇员中使用,每天运行超过 30000 个查询,每日查询数据量在 1PB 级别。
  • Facebook 称 Presto 的性能比 Hive 要好上 10 倍还多,2013年 Facebook 正式宣布开源 Presto。

Facebook 2012年开发,2013 年开源。发展历史如下:

  • 2012年秋季,Facebook启动Presto项目;
  • 2013年冬季,Presto 开源;
  • 2017年11月,11888 commits,203 releases,198 contributors;
  • 2019年1月,Presto 分家,目前有 PrestoDBPrestoSQL(更名为 trino) 两个社区。
    • Presto 团队的三位创始人离开了 Facebook。
    • 从此,Presto 项目被一分为二,
    • 由 Facebook 维护 PrestoDB,
    • Martin、Dain、David 三位 Presto 项目最早的发起人维护 PrestoSQL。

两个社区:

  • PrestoDB:https://prestodb.io,面向大数据的分布式 SQL 查询引擎
  • trino:trino,PrestoSQL,一个速度不可置信的查询引擎

特点

Presto有如下特点:

  • 基于SQL语言,上手成本低,而且功能强大,支持reduce和lambda函数;
  • 纯计算引擎,解耦底层存储,可快速缩扩容;
  • 纯内存计算,速度快,提供交互式的查询体验;
  • 通过插件的方式实现拓展功能,二次开发友好;
  • 通过不同的连接器(connector)插件读取异构数据源,进行联邦查询。

另外还有以下特点:

  • 快速查询
    • Trino是一个并行执行、分布式的查询引擎,通过Trino可以构建高效、低延迟的分析系统。
  • 大规模部署
    • 基于Trino可以查询EB级的数据湖、以及海量数据仓库。
  • 就地分析
    • 不需要复制数据,直接在hadoop、s3、cassandra、mysql等本地直接分析。
  • Runs anywhere
    • 可以将Trino部署在本地集群、或者是云环境。
  • 多数据源
    • 支持众多常见的数据源,
    • 目前 Presto 可以支持 Mysql、PostgreSql、Cassandra、Hive、Kafka、JMX、Iceberg 等多种 Connector,并且可以支持分库分表以及快速读取的功能。
  • 混合计算
    • 并且可以进行混合计算分析;
    • 每种类型的数据源都对应于一种特定类型的 Connector,
    • 用户可以根据业务需要在 Presto 中针对于一种类型的 Connector 配置一个或多个 Catalog 并查询其中的数据,
    • 用户可以混合多个 Catalog 进行 join 查询和计算。
  • 支持 SQL
    • Presto 已经可以完全支持 ANSI SQL
    • 并提供了一个 SQL Shell 给用户,用户可以直接使用 ANSI SQL 进行数据查询和计算。
  • 大数据
    • 完全的内存计算
    • 支持的数据量完全取决于集群内存大小。
    • 不像SparkSQL可以配置把溢出的数据持久化到磁盘,Presto是完完全全的内存计算;
  • 高性能
    • 低延迟高并发的内存计算引擎,
    • 相比Hive(无论MR、Tez、Spark执行引擎)、Impala 执行效率要高很多。
    • 根据Facebook的测试报告,至少提升10倍以上;
    • 经过 Facebook 和 京东商城的测试,Presto 的查询平均性能是 Hive 的10倍以上。
  • 支持ANSI SQL
    • 这点不像Hive、SparkSQL都是以HQL为基础,
    • Presto是标准的SQL。
    • 用户可以使用标准SQL进行数据查询和分析计算;
  • 扩展性
    • 有众多SPI扩展点支持,开发人员可编写UDF、UDTF。
    • 甚至可以实现自定义的Connector,实现索引下推,借助外置的索引能力,实现特殊场景下的MPP;
  • 流水线
    • Presto是基于PipeLine进行设计
    • 在大量数据计算过程中,终端用户(Driver)无需等到所有数据计算完成才能看到结果。
    • 一旦开始计算就可立即产生一部分结果返回,后续的计算结果会以多个Page返回给终端用户(Driver)。
    • 一旦开始计算,就可以立即产生一部分结果数据,并且结果数据会一部分接一部分地呈现在终端客户面前。

架构

Trino

  • 是典型的 MPP 架构
  • 由一个 Coordinator 和多个 Worker 组成 Trino集群
  • Coordinator 负责 SQL 的解析和调度
  • Worker 负责任务的具体执行。
  • 可配置多个不同类型的 Catalog,实现对多个数据源的访问。

Screen Shot 2022-04-17 at 16.40.10

modb_20211014_afe63b60-2cfe-11ec-9441-fa163eb4f6be

Presto 在整体业务中的架构图如下:

Screen Shot 2022-04-18 at 00.07.58

原生容器类型(Native container type)

  • presto架构的类型框架会自动将 SQL 中的数据类型与 “原生容器类型” (Native container type)进行绑定;
  • 目前“原生容器类型”只包括:
    • boolean
    • long
    • double
    • Slice
    • Block

Presto 服务进程

  1. Trino用户通过一个客户端 trino cli 连接到coordinator。
  2. coordinator与访问数据源的worker进行协作。
  3. 一旦它接收到一条SQL语句,协调器就负责跨Trino工作节点解析、分析、计划和调度查询执行。
  4. 该语句被转换为运行在一组worker上的一系列连接的任务。
  5. 当worker处理数据时,coordinator将检索结果并在输出缓冲区上向客户机公开。

trino

  • Coordinator
    • Coordinator 服务进程部署于集群中一个单独的节点上
    • 是整个 Presto 集群的管理节点。
    • 主要用于接收客户端提交的查询,查询语句解析,生成查询执行计划、Stage、Task,调度生成的Task
    • 此外,Coordinator 还对集群中的所有 Worker 进行管理
    • 是整个 Presto 集群的 Master 进程
      • 该进程既与 Worker 进行通信从而获得最新的 Worker 信息,又与 Client 进行通信,从而接收查询请求。
  • Worker
    • 在每个 Worker 节点上都存在一个 Worker 服务进程
    • 主要进行数据的处理以及 Task 的执行。
    • Worker 进程 每隔一定的时间会向 Coordinator 上的 restful 服务 发送心跳。
    • 当客户端提交一个查询时,Coordinator 则会从当前存活的 Worker 列表中选择出合适的 Worker 节点去运行 Task。
    • Worker 在执行每个 Task 时会进一步对当前 Task 读入的每个 Split 进行一系列的操作和处理。

Presto 模型

![https-___mmbiz.qpic.cnmmbiz_png6ic9JdlNtEvr6ibm4K1Px0wVUWhK4rLhMpB49jicIfrsBpGNVRdNibF3ugqcwhhuWtJomqLGZnbeiaLjqzhGic2K0WnA_640?wx_fmt=png](https://i.imgur.com/1pk4FYt.png)
  • Connector
    • 使 Presto 适配一个数据源
    • 每一个 Catalog 对应于一个特定的连接器。
    • 在Trino中,存储和计算分离的核心是基于connector的体系结构。
    • connector为Trino提供了访问任意数据源的接口。
    • 每个connector都提供了对底层数据源的基于表的抽象。
    • 只要可以使用Trino可用的数据类型以表、列和行来表示数据,就可以创建connector,查询引擎就可以使用数据进行查询处理。
    • 目前支持的connector包括:Hive, Iceberg, MySQL, PostgreSQL, Oracle, SQL Server, ClickHouse, MongoDB等。
  • Catalog
    • 定义连接到一个数据源的细节
    • 它包含了 Schema 并配置了一个连接器来使用。
  • Schema
    • 组织表的一种方式。
    • Catalog 和 Schema 一起定义了一个集合的表,这些表可以查询。
  • Table
    • 表是无序的行的集合。
    • 这些行内容被组织成带有数据类型的有名称的列。

Presto 查询执行模型

在 Presto 中一次查询执行会被分解为多个 Stage

  • Stage 与 Stage 之间是有前后依赖关系的。
  • 每个 Stage 内部又会被分解为多个 Task,属于每个 Stage 的 Task 被均分在每个 Worker 上并行执行。
  • 在每个 Task 内部又会被分解为多个 Driver ,每个 Driver 负责处理一个 Split
  • 而且每个 Driver 由一系列前后相连的 Operator 组成,这里的每个 Operator 都代表针对于一个 Split 的操作。

  • Statement 语句
    • 终端用户输入的用文字表示的 SQL 语句,
    • 由子句(Clause)、表达式(Expression)和断言(Predicate)组成。
  • Query 查询执行。
    • 当 Presto 接收一个 SQL 语句并执行时,会解析该 SQL 语句,将其转变成一个查询执行和相关的查询执行计划。
    • 一个查询执行代表可以在 Presto 集群中运行的查询,是由运行在各个 Worker 上且各自之间相互关联的阶段(Stage)组成的。
    • 查询执行是为了完成 SQL 语句所表述的查询而实例化的配置信息、组件、查询执行计划和优化信息等。
    • 一个查询执行由 Stage、Task、Driver、Split、Operator 和 DataSource 组成,
  • Stage 查询执行阶段。
    • 当 Presto 运行 Query 时,Presto 会将一个 Query 拆分成具有层级关系的多个 Stage
    • 一个 Stage 就代表查询计划的一部分。
  • Exchange
    • Presto 的 Stage 是通过 Exchange 来连接另一个 Stage 的
    • Exchange 用于完成有上下游关系的 Stage 之间的数据交换。
  • Task
    • Stage 并不会在 Presto 集群中实际运行,仅代表针对于一个 SQL 语句查询执行计划中的一部分查询的执行过程,只是用来对查询执行计划进行管理和建模。
    • Stage 在逻辑上又被分为一系列的 Task,这些 Task 则需要实际运行在 Presto 的各个 Worker 节点上。
  • Driver
    • 一个 Task 包含一个或多个 Driver。
    • 一个 Driver 其实就是作用于一个 Split 的一系列 Operator 的集合。
    • 因此一个 Driver 用于处理一个 Split,并且生成相应的输出,这些输出由 Task 收集并传送给下游 Stage 中的一个 Task。
    • 一个 Driver 拥有一个输入和一个输出。
  • Operator
    • 一个 Operator 代表一个 Split 的一种操作,例如过滤、加权、转换等。
    • 一个 Operator 依次读取一个 Split 中的数据,将 Operator 所代表的计算和操作作用于 Split 的数据上,并产生输出。
    • 每个 Operator 均会以 Page 为最小处理单位分别读取输入数据和产生输出数据。
    • Operator 每次只会读取一个 Page 对象,相应地,每次也只会产生一个 Page 对象。
  • Split 分片
    • 一个分片是一个大的数据集中的一个小的子集。
    • 而 Driver 则是作用于一个分片上的一系列操作的集合,而每个节点上运行的 Task,又包含多个 Driver,从而一个 Task 可以处理多个 Split。
  • Page
    • Page 是 Presto 中处理的最小数据单元。一个 Page 对象包含多个 Block 对象,每个 Block 对象是一个字节数组,存储一个字段的若干行。
    • 多个 Block 横切的一行是真实的一行数据。
    • 一个 Page 最大为 1MB ,最多 16 * 1024 行数据。

Presto/trino数据可视化查询

Airpal是建立在Facebook的Prestodb上的一个可视化分布式SQL查询引擎。https://github.com/airbnb/airpal,目前已经归档,大部分功能转移到superset。

Apache Superset是Airbnb开源的数据挖掘平台。支持丰富的数据源连接,多种可视化方式,并能够对用户实现细粒度的权限控制。该工具主要特点是可自助分析、自定义仪表盘、分析结果可视化(导出)、用户/角色权限控制,还集成了一个SQL编辑器,可以进行SQL编辑查询等。支持的数据源:,Amazon Athena ,Amazon Redshift ,Apache Drill ,Apache Druid ,Apache Hive ,Apache Impala ,Apache Kylin ,Apache Pinot ,Apache Solr ,Apache Spark SQL ,Ascend.io ,Azure MS SQL ,Big Query ,ClickHouse ,CockroachDB ,Dremio ,Elasticsearch ,Exasol ,Google Sheets ,Firebolt ,Hologres ,IBM Db2 ,IBM Netezza Performance Server ,MySQL ,Oracle ,PostgreSQL ,Trino ,Presto ,SAP Hana ,Snowflake ,SQLite ,SQL Server ,Teradata ,Vertica


应用场景

常见以下场景:

  • 实时计算:
    • Trino(Presto)性能优越,实时查询工具上的重要选择。
  • Ad-Hoc查询:
    • 数据分析应用、Trino(Presto)根据特定条件的查询返回结果和生成报表。
  • ETL:
    • 因支持的数据源广泛、可用于不同数据库之间迁移,转换和完成ETL清洗的能力。
  • 实时数据流分析:
    • Presto-Kafka Connector 使用 SQL对Kafka的数据流进行清洗、分析。
  • MPP:
    • Presto Connector有非常好的扩展性,可进行扩展开发,可支持其他异构非SQL查询引擎转为SQL,支持索引下推。
  • 单一的 SQL 分析访问点
    • 作为一个消费者和分析师,你可能会遇到数不清的问题:
      • 甚至不知道数据在哪儿,只有凭借公司某个部门的内部知识或者组织内多年的工作经验,你才能找到正确的数据。
      • 为了查询多个数据库,需要使用不同的连接和运行多种 SQL 方言的不同查询。
      • 这些查询看起来相似,行为上却不同。
      • 若不使用数据仓库,就无法使用查询合并来自不同系统的数据。
    • 可以使用 Presto 对接这些数据库,使用一个 SQL 标准来查询所有的系统。
      • 所有的仪表盘和分析工具以及其他商业智能系统都可以指向一个系统 – Presto,并访问组织当中的所有数据。
  • 数据仓库和数据源系统的访问点
    • 当一个组织需要更好的理解和分析存放在无数 RDBMS 中的数据时,就可以创建和维护数据仓库系统
      • 从多个系统中抽取的数据通过一个复杂的 ETL 过程,最终进入一个严格受控的、巨大的数据仓库。
    • 尽管数据仓库在很多情况下非常有用,但作为一个数据分析师,你会面临很多新问题:
      • 除了原来的那些数据库,你的工具和查询现在又多了一个数据接入点。
      • 你今天就要用的数据还没放入数据仓库。加载数据的过程痛苦、昂贵又困难重重重。
    • Presto 允许添加任何数据仓库作为数据源,就像其他关系数据库一样。
      • 如果想深入研究数据仓库的查询,可以在 Presto 里直接完成,也可以在这里访问数据仓库及其源数据库系统,甚至可以编写将它们组合在一起查询。
  • 提供对任何内容的 SQL 访问
    • Presto 允许将所有支持的系统作为数据源进行连接。它使用标准的 ANSI SQL 和使用 SQL 的所有工具对外暴露要查询的数据。
  • 联邦查询
    • 将所有的数据孤岛都暴露给 Presto 是向理解数据迈出的一大步。
    • 可以使用 SQL 和标准工具来联邦查询所有内容。
    • 在一个语句中引用并使用不同数据库和模式的 SQL 查询,这些数据库和 Schema 来自于完全不同的系统。
    • 在同一条 SQL 查询中,可以查询 Presto 中可用的所有数据源。
  • 虚拟数据仓库的语义层

    • 数据仓库系统为用户创造了巨大的价值,对组织来说确实一个负担。

      • 运行和维护数据仓库是一个巨大且昂贵的项目。
      • 需要专门的团队运行与管理数据仓库和相关的 ETL 过程。
      • 将数据导入数据仓库需要用户执行繁琐的操作,并且通常非常耗时。
    • Presto 可用作虚拟仓库。

      • 使用这一工具和标准的 ANSI SQL ,就可以定义语义层。
      • 一旦所有的数据库都设置成 Presto 的数据源,就可以直接查询它们。
      • Presto 提供了查询这些数据库所需的计算能力。
      • 使用 SQL 和 Presto 支持的函数和运算符,可以直接从数据源获得想要的数据。
      • 在使用数据进行分析之前,无需复制、移动或转换它们。
  • 数据湖查询引擎
    • 在数据被存储到数据湖的存储系统时,并没有特别考虑接下来应该如何访问它们,Presto 可以使它们成为有用的数据仓库。
    • 现代数据湖通常使用 HDFS 以外的其他对象存储系统,这些系统来自云供应商或其他开源项目。
    • Presto 能使用 Hive 连接器连接它们,无论数据在哪里、如何存储,都可以在数据湖上使用基于 SQL 的数据分析。
  • Interactive Analytics(交互式分析)
    • Facebook内运行着一个庞大的多租户数据仓库,一些业务部门或个别团队会共享其中一小部分托管的集群。
    • 其数据存储在一个分布式文件系统之上,而元数据则存储在单独的服务中,这些系统分别具有HDFS和Hive Metastore服务类似的API。
    • 我们称之为’Facebook data warehouse’,并且通过类似于Presto ‘Hive’ Connector的组件来进行文件的读写。
    • Facebook的工程师和数据科学家经常会检索少量的数据(50GB-3TB的压缩数据),用来验证假设,并构建可视化的数据展板。
    • 这些用户通常会使用查询工具、BI工具或Jupyter notebooks来进行查询操作。
    • 各个群集需要支持50-100个具有各种查询模型的操作并发执行,并且需要在数秒或数分钟内返回结果。
    • 这些用户通常并不关心查询所使用到的硬件资源,但是对查询时间却相当敏感。
    • 而对于某些探索性的查询,用户可能并不需要获取所有的查询结果。
    • 通常在返回初始结果后,查询就会被立即取消或者用户会通过LIMIT来限制系统返回的结果。
  • SQL 转换和 ETL
    • Presto 也可用于迁移数据,它所提供的丰富的 SQL 函数,可以查询数据,转换数据,并将数据写入同一个数据源或任何其他数据源。
    • 因支持的数据源广泛、可用于不同数据库之间迁移,转换和完成ETL清洗的能力。
  • Batch ETL (批量ETL)
    • 上面我们介绍到的数据仓库会使用ETL查询任务定期填充新的数据。
    • 查询任务通常是通过一个工作流系统依次调度执行的。
    • Presto支持用户从历史遗留的批处理系统迁移ETL任务,目前ETL查询任务在Facebook的Presto工作负载中占了很大一部分。
    • 这些查询通常是由数据工程师开发并优化的。
    • 相对于Interactive Analytics中涉及的查询,它们通常会占用更多的硬件资源,并且会涉及大量的CPU转换和内存(通常是数TB的分布式内存)密集型的聚合操作以及与其他的大表连接操作。
    • 因此相对于资源利用率以及集群吞吐量来说,查询延迟显得没那么重要。
  • 更快的响应带来更好的数据见解
    • 复杂的问题和海量数据集带来了诸多限制。
      • 将数据复制并加载到数据仓库并在其中分析它们的整个过程会过于昂贵。
      • 计算可能消耗太多的计算资源而无法处理全部数据,或者要消耗数天才能得到答案。
    • Presto 一开始就避免了数据复制
    • Presto 的并行计算重度优化通常能为数据分析带来性能提升。
    • 如果原来需要 3 天的查询现在只需要 15 分钟就可以完成,那么执行这个查询便是有价值的。
    • 从这些结果中获得的知识可以执行更多的查询。
  • A/B Testing (A/B测试)
    • Facebook使用A/B测试,通过统计假设性的测试来评估产品变更带来的影响。
    • 在Facebook大量的A/B测试的基础架构是基于Presto构建的。
    • 用户期望测试结果可以在数小时之内呈现(而不是数天),并且结果应该是准确无误的。
    • 对于用户来说,能够在交互式延迟的时间内(5~30s),对结果数据进行任意切分来获得更深入的见解同样重要。
    • 而通过预处理来聚合这些数据往往很难满足这一需求,因此必须得实时进行计算。
    • 生成这样的结果需要关联多个大型数据集,包括用户、设备、测试以及事件属性等数据。
    • 由于查询是通过编程方式实现的,所以查询需要被限制在较小的集合内。
  • Developer/Advertiser Analytics(开发者/广告主分析)
    • 为外部开发者和广告客户提供的几种自定义报表工具也都是基于Presto构建的。
    • Facebook Analytics就是其中一个实际案例,它为使用Facebook平台构建应用程序的开发人员提供了高级的分析工具。
    • 这些工具通常对外开放一个Web界面,该界面可以生成一组受限的查询模型。
    • 查询需要聚合的数据量是非常大的,但是这些查询是有目的性的,因为用户只能访问他们的应用程序或广告的数据。
    • 大部分的查询包括连接、聚合以及窗口函数。
    • 由于这些工具是交互式的,因此有非常严格的时间限制(约50ms~5s)。
    • 鉴于用户的数量,集群需要达到99.999%的可用性,并且支持数百个并发查询。

安装和配置 Presto

编译

mvn -T2C install -DskipTests

服务端部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# - 从编译后的源码中拷贝jar和配置文件
mkdir /usr/local/presto

cp -r ~workspace/presto/presto-server/target/presto-server-0.255-SNAPSHOT /usr/local/presto

cd /usr/local/presto/presto-server-0.255-SNAPSHOT

cp ~workspace/presto/presto-server/target/presto-main/etc ./presto-server-0.255-SNAPSHOT/

ln -s presto-server-0.255-SNAPSHOT server


# - 设置环境变量
vim /etc/profile

export PRESTO_SERVER_HOME=/usr/local/presto/server
export PATH=$PATH:$PRESTO_SERVER_HOME/bin

source /etc/profile



# - 修改 config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8086
discovery-server.enabled=true
discovery.uri=https://localhost:8086


# - 修改 node.properties
node.id=562e42e2-e874-431f-8da5-cb779744cf7c
node.data-dir=/usr/local/presto/data
catalog.config-dir=/usr/local/presto/server/etc/catalog
plugin.dir=/usr/local/presto/server/plugin
node.server-log-file=/usr/local/presto/server/var/log/server.log
node.launcher-log-file=/usr/local/presto/server/var/log/launcher.log

# - 修改 jvm.config
\-server
\-Xmx4G
\-XX:-UseBiasedLocking
\-XX:+UseG1GC
\-XX:+ExplicitGCInvokesConcurrent
\-XX:+HeapDumpOnOutOfMemoryError
\-XX:+UseGCOverheadLimit
\-XX:+ExitOnOutOfMemoryError
\-XX:ReservedCodeCacheSize=512M

# - 修改 log.properties
com.facebook.presto=INFO

# - 后台启动
$PRESTO_SERVER_HOME/bin/launcher start

# - 前台启动
$PRESTO_SERVER_HOME/bin/launcher run

# [Presto Web UI] https://localhost:8086/ui/

Presto CLI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# - 从编译后的源码中拷贝jar
cd /usr/local/presto

mkdir -p cli/lib

cp ~workspace/presto/presto-cli/target/presto-cli-0.255-SNAPSHOT-executable.jar /usr/local/presto/cli/lib

cd /usr/local/presto/cli/lib

mv presto-cli-0.255-SNAPSHOT-executable.jar presto

chmod +x presto

# - 设置环境变量
vim /etc/profile

export PRESTO_CLI_HOME=/usr/local/presto/cli
export PATH=$PATH:$PRESTO_CLI_HOME/lib

source /etc/profile

# - 运行 cli 并查看其版本
presto --version
Presto CLI 0.255-SNAPSHOT-9095346

# - 启动 cli
presto --server localhost:8086

# - 额外诊断,打印调试信息
presto --debug

# - 执行查询
presto --server localhost:8086 \

--catalog tpch \

--schema sf1 \

--execute 'select nationkey,name,regionkey from nation limit 5'
# "0","ALGERIA","0"
# "1","ARGENTINA","1"
# "2","BRAZIL","1"
# "3","CANADA","1"
# "4","EGYPT","4"

简单 SQL 语法

  • 查看 catalogs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
presto> show catalogs;
##   Catalog

 blackhole
 druid

 example
 hive

 jmx

 localfile
 memory

 mysql

 pinot

 postgresql
 raptor

 sqlserver
 system

 tpcds

 tpch

(15 rows)

Query 20210606_141818_00009_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

# - 查看 tpch Connector 的 schemas
presto> show schemas from tpch;
##
 Schema

 information_schema
 sf1

 sf100

 sf1000

 sf10000

 sf100000

 sf300

 sf3000

 sf30000

 tiny

(10 rows)

Query 20210606_142008_00010_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [10 rows, 119B] [141 rows/s, 1.65KB/s]

# - 查看 tpch.sf1 的 tables
presto> show tables from tpch.sf1;
##   Table
 customer
 lineitem
 nation
 orders
 part

 partsupp
 region
 supplier
(8 rows)

Query 20210606_142111_00011_4qtix, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [8 rows, 158B] [85 rows/s, 1.66KB/s]

# - 查看 tpch.sf1.nation 表中的实际数据
presto> select count(name) from tpch.sf1.nation;
##  _col0

    25
(1 row)

Query 20210606_142214_00012_4qtix, FINISHED, 1 node
Splits: 21 total, 21 done (100.00%)
0:00 [25 rows, 0B] [358 rows/s, 0B/s]

# - 选择使用特定 schema
presto> use tpch.sf1;
USE

安装trino

下载trino

下载地址:https://repo1.maven.org/maven2/io/trino/trino-server/359/trino-server-359.tar.gz

操作系统要求

  • 64位Linux系统
  • 为运行trino的用户提供足够的unlimit。包括trino能够打开的文件描述符,官方推荐以下配置:
1
2
3
4
5
vim /etc/security/limits.conf

trino soft nofile 131072

trino hard nofile 131072

修改完后,退出当前会话,重新登录即可生效。查看配置是否生效:

1
2
3
su trino

ulimit  -a

Java运行时要求

Trino要求使用Java 11 64位版本,最低要求为:11.0.11,

  • 注意:不支持Java 8,也不支持 Java 12或者Java 13。
  • Trino官方推荐我们使用Azul Zulu的JDK版本。
  • 此处,我们选择较新的11.0.12+7版本。

img

下载链接:https://cdn.azul.com/zulu/bin/zulu11.50.19-ca-jdk11.0.12-linux_x64.tar.gz

Python版本要求

  • 版本:2.6.x、2.7.x、或者3.x

开始安装

创建trino用户

在每个节点中创建trino用户。

1
2
3
ssh ha-node1 "useradd trino;usermod trino -G hadoop"; 
ssh ha-node2 "useradd trino;usermod trino -G hadoop"; 
ssh ha-node3 "useradd trino;usermod trino -G hadoop"

配置trino用户打开的文件

切换到trino用户,并用按照前面说的操作系统要求配置trino用户能打开的文件描述符。

上传并解压Zulu JDK

在第一个节点中配置以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[trino@ha-node1 ~]ll -hst
总用量 194M
194M -rw-r--r-- 1 root root 194M 7月  25 23:42 zulu11.50.19-ca-jdk11.0.12-linux_x64.tar.gz


# 解压
[trino@ha-node1 ~]tar -xvzf zulu11.50.19-ca-jdk11.0.12-linux_x64.tar.gz -C /opt/

# 创建超链接
[trino@ha-node1 ~]ln -s /opt/zulu11.50.19-ca-jdk11.0.12-linux_x64/ /opt/jdk11_zulu
[trino@ha-node1 ~]ll /opt/ | grep jdk11_zulu
... jdk11_zulu -> /opt/zulu11.50.19-ca-jdk11.0.12-linux_x64/



# 配置环境变量
vim ~/.bashrc
export JAVA_HOME=/opt/jdk11_zulu
export PATH=$JAVA_HOME/bin:$PATH
# 加载环境变量
source ~/.bashrc

# 查看JAVA版本
[trino@ha-node1 jdk11_zulu]java -version
openjdk version "11.0.12" 2021-07-20 LTS
OpenJDK Runtime Environment Zulu11.50+19-CA (build 11.0.12+7-LTS)
OpenJDK 64-Bit Server VM Zulu11.50+19-CA (build 11.0.12+7-LTS, mixed mode)

分发到其他节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 切换到root用户(只是为了免密发送文件)
[root@ha-node1 ~]# whoami
root

# 分发文件和环境变量
for node in "ha-node2" "ha-node3";
do
 scp -r /opt/zulu11.50.19-ca-jdk11.0.12-linux_x64 $node:/opt
 ssh $node "ln -s /opt/zulu11.50.19-ca-jdk11.0.12-linux_x64/ /opt/jdk11_zulu"
 ssh $node "chown -R trino:trino /opt/jdk11_zulu"
 scp /home/trino/.bashrc $node:/home/trino/
done


# 测试其他节点JDK11是否配置成功。
for node in "ha-node2" "ha-node3";
do
 ssh $node "java -version"
done

上传并解压trino安装包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[trino@ha-node1 ~]ll -hst
总用量 593M
593M -rw-r--r-- 1 root root 593M 7月  25 23:59 trino-server-359.tar.gz

# 解压trino
[trino@ha-node1 ~]tar -xvzf trino-server-359.tar.gz -C /opt/

# 创建超链接
[trino@ha-node1 ~]ln -s /opt/trino-server-359/ /opt/trino

# 查看链接
[trino@ha-node1 ~]ll /opt | awk '$0 ~ /^l/ { if($9 ~ /trino/) print $0 }'
lrwxrwxrwx   1 trino             trino              22 7月  26 00:01 trino -> /opt/trino-server-359/

# 创建数据目录
[trino@ha-node1 trino]mkdir /opt/trino/data

配置trino

在安装目录中创建一个etc目录,我们会在该目录中配置以下:

  • trino节点配置:配置每个trino节点的环境。
  • JVM配置:配置JVM的相关参数。
  • Config属性:配置trino服务器。
  • Catalog属性:配置trino的connector(数据源)

创建配置目录

[trino@ha-node1 trino]$ mkdir /opt/trino/etc

配置节点属性

以下是一个最简单的配置。

1
2
3
4
5
vim opt/trino/etc/node.properties

node.environment=trino_dev
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/opt/trino/data

说明:

  • node.environment:集群中的所有trino节点都必须由相同的环境名称。必须以小写字母开头,只能包含小写字母、数字和下划线。
  • node.id:安装的trino节点的唯一标识符。每个节点都必须由唯一的标识符。标识符必须以字母数字字符开头,并且只能包含字母数字、或 _ 字符。
  • node.data-dir:trino的数据目录,trino会在该目录中存放日志、以及其他数据。

配置JVM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vim /opt/trino/etc/jvm.config

-server
-Xmx3G
-XX:-UseBiasedLocking
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000

大家可以看到,都是JVM相关配置。每个节点可以配置不同的容量,大家根据自己的机器内存大小调整。


配置trino服务器

在trino中,每个节点都可以充当coordinator(协调器)和worker。

  • 官方推荐配置一台机器专门执行协调工作,保证集群的最佳性能。
  • 此处,我让ha-node1充当coordinator,其他的两台机器为worker。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 以下是coordinator的最小配置:
vim /opt/trino/etc/config.properties

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=10080
query.max-memory=8GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=https://ha-node1:10080



# 以下是worker的最小配置:
vim /opt/trino/etc/config.properties

coordinator=false
http-server.http.port=8080
query.max-memory=8GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=https://ha-node1:10080
  • discovery.uri:
    • Trino coordinator有一个发现服务,
    • 所有节点通过它来发现其他节点,
    • 每个trino启动时,向服务器发现注册,并不断发送心跳保持活动状态。

配置日志级别

1
2
3
vim /opt/trino/etc/log.properties

io.trino=INFO

配置trino catalog

catalog 数据目录。

  • 一个catalog数据目录可以对应数据schema。
  • 后续我们还会继续配置catalog目录。
  • 此处,我们仅配置一个jmx的connector。
1
2
3
4
5
6
# 创建catalog目录
mkdir /opt/trino/etc/catalog

vim /opt/trino/etc/catalog/jmx.properties

connector.name=jmx

分发配置

1
2
3
4
5
6
7
8
9
10
for node in "ha-node2" "ha-node3";
do
 scp -r /opt/trino-server-359 $node:/opt
 ssh $node "ln -s /opt/trino-server-359/ /opt/trino"
done

for node in "ha-node2" "ha-node3";
do
 ssh $node "chown -R trino:trino /opt/trino-server-359"
done

修改node2、node3配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# ha-node2节点:
ssh ha-node2
vim /opt/trino/etc/node.properties
node.environment=trino_dev
node.id=ffffffff-ffff-ffff-ffff-fffffffffffe
node.data-dir=/opt/trino/data
# -- end of node.properties

vim /opt/trino/etc/config.properties
coordinator=false
http-server.http.port=10080
query.max-memory=8GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=https://ha-node1:10080
# -- end of config.properties



# ha-node3节点:
ssh ha-node3
vim /opt/trino/etc/node.properties
node.environment=trino_dev
node.id=ffffffff-ffff-ffff-ffff-fffffffffffd
node.data-dir=/opt/trino/data
# -- end of node.properties

vim /opt/trino/etc/jvm.config
-server
-Xmx12G
# -- end of jvm.config

vim /opt/trino/etc/config.properties
coordinator=false
http-server.http.port=10080
query.max-memory=10GB
query.max-memory-per-node=4GB
query.max-total-memory-per-node=6GB
discovery.uri=https://ha-node1:10080
# -- end of config.properties

启动、关闭trino

1
2
3
4
5
6
7
8
9
10
11
# 在每个节点启动以下命令
su trino
bin/launcher start

[trino@ha-node1 trino]jps
5696 TrinoServer
5735 Jps


# 关闭
bin/launcher stop

一键启动脚本。

1
2
3
4
5
6
7
8
9
10
11
vim /opt/trino/bin/one_key.sh
# ! /bin/bash

EXE_MODE=$1

for node in "ha-node1" "ha-node2" "ha-node3";
do
 ssh $node "su - trino -c \"/opt/trino/bin/launcher ${EXE_MODE}\""
done

chmod a+x /opt/trino/bin/one_key.sh

执行启动:

1
2
3
4
5
6
7
8
9
./one_key.sh start
./one_key.sh stop

# 查看节点是否运行
./one_key.sh status
# [root@ha-node1 bin] # ./one_key.sh status
# Running as 7200
# Running as 2552
# Running as 2971

安装trino cli客户端

下载trino cli

下载地址:https://repo1.maven.org/maven2/io/trino/trino-cli/359/trino-cli-359-executable.jar

设置执行权限

1
2
mv ~/trino-cli-359-executable.jar /opt/trino/bin/trino
chmod a+x /opt/trino/bin/trino

启动客户端

1
2
3
4
5
6
7
[trino@ha-node1 bin]./trino --server ha-node1:10080 --catalog jmx --schema default
trino:default> show catalogs;
 Catalog 
---------
 jmx     
 system  
(2 rows)

到此处,trino已经安装完成。


webui

我们可以通过:https://ha-node1:10080/ui/访问trino的web ui。

img

image-20210726015158495


使用Trino

连接MySQL

接下来,我们使用trino来实现MySQL中数据查询。要去连接外部数据源,我们需要准备一个Connector。


创建MySQL对应的catalog

1
2
3
4
5
vim /opt/trino/etc/catalog/mysql_metadb.properties
connector.name=mysql
connection-url=jdbc:mysql://ha-node1:3306?enabledTLSProtocols=TLSv1.2&useSSL=false
connection-user=root
connection-password=123456

分发到所有节点:

1
2
3
4
for node in "ha-node2" "ha-node3";
do
 scp /opt/trino/etc/catalog/mysql_metadb.properties $node:/opt/trino/etc/catalog/
done

注意一定要分发哦!

重启trino

./one_key.sh restart

启动客户端

bin/trino --server ha-node1:10080

执行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
trino> show catalogs;
#    Catalog    
# --------------
#  jmx          
#  mysql_metadb 
#  system     
 


# 查询mysql的数据库
trino>  show schemas from mysql_metadb;
       Schema       
--------------------
 hive               
 hue                
 information_schema 
 performance_schema 
 c       
 
# 选择数据库
use mysql_metadb.ranger;
 
# 执行查询
select * from mysql_metadb.ranger.x_group_users limit 5;

img

再执行一个聚合计算:

1
2
3
select count (*) 
from mysql_metadb.ranger.x_group_users;

img

在TrinoDB中显示如下:

img

进入到查询中,我们可以看到更具体的执行信息。

img

再看一下查询计划。

img

一共分为两个Stage。

  • 第一个执行的Stage是TableScan。
  • Table Scan并只拉取了一条数据,直接执行了sql语句,然后输出。

img

那是不是Trino把所有查询都会下推到Mysql呢?我们有理由相信不会的。因为Trino是一个分布式计算引擎。

我们再来一个带有JOIN和COUNT的SQL:

1
2
3
4
select group_name, count(1) as cnt 
from x_user t1, x_group_users t2 
where t1.id = t2.user_id 
group by group_name;

img

一共分为5个Stage执行

  • 对应查询Mysql就是5个任务。
  • 任务的执行情况:

img

看下Trino的执行计划。在控制台来看下执行计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
trino:ranger> explain select group_name, count(1) as cnt from x_user t1, x_group_users t2 where t1.id = t2.user_id group by group_name;
         Query Plan    
--------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]   
     Output layout: [group_name, count]                     
     Output partitioning: SINGLE []                         
     Stage Execution Strategy: UNGROUPED_EXECUTION          
     Output[group_name, cnt]                                
     │   Layout: [group_name:varchar(740), count:bigint]    
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                 
     │   cnt := count  
     └─ RemoteSource[1]
            Layout: [group_name:varchar(740), count:bigint] 
                       
 Fragment 1 [HASH]     
     Output layout: [group_name, count]                     
     Output partitioning: SINGLE []                         
     Stage Execution Strategy: UNGROUPED_EXECUTION          
     Project[]         
     │   Layout: [group_name:varchar(740), count:bigint]    
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                 
     └─ Aggregate(FINAL)[group_name][$hashvalue]            
        │   Layout: [group_name:varchar(740)$hashvalue:bigint, count:bigint]                   
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                              
        │   count := count("count_5")                       
        └─ LocalExchange[HASH][$hashvalue] ("group_name")   
           │   Layout: [group_name:varchar(740), count_5:bigint, $hashvalue:bigint]              
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                           
           └─ RemoteSource[2]                               
                  Layout: [group_name:varchar(740), count_5:bigint, $hashvalue_6:bigint]         
                  
                  
 Fragment 2 [HASH]     
     Output layout: [group_name, count_5, $hashvalue_12]    
     Output partitioning: HASH [group_name][$hashvalue_12]  
     Stage Execution Strategy: UNGROUPED_EXECUTION          
     Aggregate(PARTIAL)[group_name][$hashvalue_12]          
     │   Layout: [group_name:varchar(740)$hashvalue_12:bigint, count_5:bigint]                 
     │   count_5 := count(*)                                
     └─ Project[]      
        │   Layout: [group_name:varchar(740)$hashvalue_12:bigint]                              
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                              
        │   $hashvalue_12 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("group_name"), 0))                               
        └─ InnerJoin[("id" = "user_id")][$hashvalue_7$hashvalue_9]                             
           │   Layout: [group_name:varchar(740)]            
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                           
           │   Distribution: PARTITIONED                    
           │   dynamicFilterAssignments = {user_id -> #df_340}                                   
           ├─ RemoteSource[3]                               
           │      Layout: [id:bigint, $hashvalue_7:bigint]  
           └─ LocalExchange[HASH][$hashvalue_9] ("user_id") 
              │   Layout: [group_name:varchar(740), user_id:bigint, $hashvalue_9:bigint]         
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                       
              └─ RemoteSource[4]                            
                     Layout: [group_name:varchar(740), user_id:bigint, $hashvalue_10:bigint] 
                     
 Fragment 3 [SOURCE]   
     Output layout: [id, $hashvalue_8]                      
     Output partitioning: HASH [id][$hashvalue_8]           
     Stage Execution Strategy: UNGROUPED_EXECUTION          
     ScanFilterProject[table = mysql_metadb:ranger.x_user ranger.x_user columns=[id:bigint:BIGINT], grouped = false, filterPredicate =
         Layout: [id:bigint, $hashvalue_8:bigint]           
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?
         $hashvalue_8 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id"), 0))      
         id := id:bigint:BIGINT                             
                       
 Fragment 4 [SOURCE]   
     Output layout: [group_name, user_id, $hashvalue_11]    
     Output partitioning: HASH [user_id][$hashvalue_11]     
     Stage Execution Strategy: UNGROUPED_EXECUTION          
     ScanProject[table = mysql_metadb:ranger.x_group_users ranger.x_group_users columns=[group_name:varchar(740):VARCHAR, user_id:bigi
         Layout: [group_name:varchar(740), user_id:bigint, $hashvalue_11:bigint]                 
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                     
         $hashvalue_11 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("user_id"), 0))
         group_name := group_name:varchar(740):VARCHAR      
         user_id := user_id:bigint:BIGINT
```  


1. 扫描 x_group_users 表,Trino自动选择 user_id 进行hash分区。并且只拉取三个字段:group_name、user_id、`$hashvalue_11`。可以看到group by、以及聚合计算都下推到mysql中执行拉取。
2. 扫描 x_user 表,按照id hash分区,只拉取了id,以及`$hashvalue_8`。
3. 拉取完数据后,在本地按照hash值执行shuffle分区。然后执行INNER JOIN操作。注意:这一步是按分区执行的。
4. 然后对分区的执行结果再聚合计算。计算得到最终结果。
5. 输出最终结果。



---



### 连接Hive

Trino只需要使用Hive的两个组件:

* 存储在HDFS中的数据
* Hive Metastore

Trino支持Hive 2、以及Hive3,以及衍生的发行版本,CDH以及HDP。支持的文件类型也比较全:

* ORC
* Parquet
* Avro
* RCText (RCFile using `ColumnarSerDe`    )

* RCBinary (RCFile using `LazyBinaryColumnarSerDe`    )

* SequenceFile
* JSON (using `org.apache.hive.hcatalog.data.JsonSerDe`    )

* CSV (using `org.apache.hadoop.hive.serde2.OpenCSVSerde`    )

* TextFile


---


#### 配置Hive MetaStore

为了能够在使用Hive 3时,对Avro表的支持,需要配置以下属性:


```xml
<property>
     <name>metastore.storage.schema.reader.impl</name>
     <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value>
 </property>

配置Hive connector

分别配置hive metastore的地址和HDFS(因为我们配置了HA)。

1
2
3
4
5
vim etc/catalog/hive.properties

connector.name=hive
hive.metastore.uri=thrift://ha-node1:9083
hive.config.resources=/opt/hadoop/etc/hadoop/core-site.xml,/opt/hadoop/etc/hadoop/hdfs-site.xml

分发到所有节点:

1
2
3
4
for node in "ha-node2" "ha-node3";
do
 scp /opt/trino/etc/catalog/hive.properties $node:/opt/trino/etc/catalog/
done

启动

1
2
3
# 启动HDFS
# 启动Hive MetaStore
# 重启Trino

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# 连接客户端
bin/trino --server ha-node1:10080

# 查看catalog
trino> show catalogs;
#    Catalog    
# --------------
#  hive         
#  jmx          
#  mysql_metadb 
#  system       


# 查看hive schemas
trino> show schemas in hive;
#        Schema       
# --------------------
#  default            
#  hudi_datalake      
#  information_schema 
#  kylin_test         
#  ods_hudi           
#  test               
# (6 rows)


# 查询表数据
trino> show tables in hive.test;
#         Table        
# ---------------------
#  dim_date_orc        
#  dim_date_orc_snappy 
#  x_axis_orc          
#  x_axis_orc_snappy   
#  y_axis_orc          
#  y_axis_orc_snappy   
# (6 rows)



# 执行查询
trino> use hive.test;
# USE
# trino:test> 
# trino:test> select * from dim_date_orc_snappy limit 10;
#  year | month 
# ------+-------
#  2022 |     1 
#  2022 |     2 
#  2022 |     3 
#  2022 |     4 
#  2022 |     5 
#  2022 |     6 
#  2022 |     7 
#  2022 |     8 
#  2022 |     9 
#  2022 |    10 
# (10 rows)



# 执行聚合计算
trino:test> select count(*) from dim_date_orc_snappy;
#  _col0 
# -------
#     36 
# (1 row)

将MySQL数据导入到Hive

执行导入

因为我们使用Trino能够方便得连接到Mysql与Hive

  • 所以我们在一个引擎中可以很容易地将MySQL的数据查询出来,然后导入到Hive中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 在Hive中创建一个schema
create schema hive.ranger;

# hive创建表
create table if not exists hive.ranger.x_user(
        id                bigint    
    , create_time       date
    , update_time       date
    , added_by_id       bigint        
    , upd_by_id         bigint        
    , user_name         varchar
    , descr             varchar
    , status            integer       
    , cred_store_id     bigint        
    , is_visible        integer       
    , other_attributes  varchar
    , dt                varchar
)
with (  
      format='ORC', 
    partitioned_by=ARRAY['dt']
)
;

# 添加分区
CALL system.create_empty_partition(
        schema_name => 'ranger'
    , table_name => 'x_user'
    , partition_columns => ARRAY['dt']
    , partition_values => ARRAY['2021-08-08']
);

# 导入数据
insert into hive.ranger.x_user
select
 t.*    , '2021-08-08' as dt
from mysql_madb.ranger.x_user t;
 
# 收集表和列的列统计信息
ANALYZE hive.ranger.x_user;

同样,我们把另外两张表也导入到Hive里面来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
create table if not exists hive.ranger.x_group_users (
     id          bigint      
 , create_time date
 , update_time date
 , added_by_id bigint      
 , upd_by_id   bigint      
 , group_name  varchar
 , p_group_id  bigint      
 , user_id     bigint     
    , dt          varchar
)
with (  
      format='ORC', 
    partitioned_by=ARRAY['dt']
)
;

# 添加分区
CALL system.create_empty_partition(
        schema_name => 'ranger'
    , table_name => 'x_group_users'
    , partition_columns => ARRAY['dt']
    , partition_values => ARRAY['2021-08-08']
);

# 导入数据
insert into hive.ranger.x_group_users
select
 t.*    , '2021-08-08' as dt
from mysql_metadb.rger.x_group_users t;
 
# 收集表和列的列统计信息
ANALYZE hive.ranger.x_group_users;

create table if not exists hive.ranger.x_group(
   id               bigint        
 , create_time      date  
 , update_time      date  
 , added_by_id      bigint        
 , upd_by_id        bigint        
 , group_name       varchar
 , descr            varchar
 , status           integer       
 , group_type       integer       
 , cred_store_id    bigint        
 , group_src        integer       
 , is_visible       integer       
 , other_attributes varchar
    , dt      varchar
)
with (  
      format='ORC', 
    partitioned_by=ARRAY['dt']
)
;

# 添加分区
CALL system.create_empty_partition(
        schema_name => 'ranger'
    , table_name => 'x_group'
    , partition_columns => ARRAY['dt']
    , partition_values => ARRAY['2021-08-08']
);

  # 导入数据
insert into hive.ranger.x_group
select
 t.*    , '2021-08-08' as dt
from
 mysql_medb.ranger.x_group t;
 
# 收集表和列的列统计信息
ANALYZE hive.ranger.x_group;

执行关联查询

1
2
3
select group_name, count(1) as cnt 
from hive.ranger.x_user t1, hive.ranger.x_group_users t2 
where t1.id = t2.user_id group by group_name;

非常地痛快,是不是?一个引擎把所有数据源全部连接到一起了。

  • 大家可以在https://trino.io/docs/current/language/types.html找到Trino所有的类型信息。

  • 大家可以在https://trino.io/docs/current/connector/hive.html#table-properties找到Trino与Hive相关的所有语


连接Kafka

连接Kafka,是不是有点头嗡嗡的?想想我们之前查询Kafka的数据可是费劲了,特别是要做一些数据验证,或者探索。需要一个很长的Pipeline。而有了Trino,将改变着一切。

启动Kafka

1
2
3
4
5
[kafka@ha-node1 kafka]bin/kafka-topics.sh --list --zookeeper ha-node1:2181
__consumer_offsets
chk_demo
ogg_test_ogg
oracle_test_ogg

Kafka Connector

Kafka Connector将topic映射为Trino中的表,而每条消息就是一行。配置起来相当地简单:

1
2
3
4
5
6
vim etc/catalog/kafka.properties

connector.name=kafka
kafka.table-names=chk_demo,ogg_test_ogg
kafka.nodes=ha-node1:9092,ha-node2:9092,ha-node3:9092
kafka.hide-internal-columns=false

分发到所有节点:

1
2
3
4
for node in "ha-node2" "ha-node3";
do
 scp /opt/trino/etc/catalog/kafka.properties $node:/opt/trino/etc/catalog/
done

重新启动Trino。


测试

我们先往Kafka中发送一些测试数据。

1
2
3
4
5
bin/kafka-console-producer.sh --topic chk_demo --broker-list ha-node1:9092
>test1
>test2
>test3
>测试

使用Trino查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 连接客户端
bin/trino --server ha-node1:10080
trino> use kafka.default;
# USE

# 查看表(topic)
trino:default> show tables;
#     Table     
# --------------
#  chk_demo     
#  ogg_test_ogg 
# (2 rows)


# 查看表信息
trino:default> desc chk_demo;

# Trino还专门给我们准备了注释。
#       Column       |              Type              | Extra |                   Comment                   
# -------------------+--------------------------------+-------+---------------------------------------------
#  _partition_id     | bigint                         |       | Partition Id                                
#  _partition_offset | bigint                         |       | Offset 
  forthe message within the partition 
#  _message_corrupt  | boolean                        |       | Message data is corrupt                     
#  _message          | varchar                        |       | Message text                                
#  _headers          | map(varchar, array(varbinary)) |       | Headers of the message as map               
#  _message_length   | bigint                         |       | Total number of message bytes               
#  _key_corrupt      | boolean                        |       | Key data is corrupt                         
#  _key              | varchar                        |       | Key text                                    
#  _key_length       | bigint                         |       | Total number of key bytes                   
#  _timestamp        | timestamp(3)                   |       | Message timestamp                           
# (10 rows)


# 查询数据
trino:default> select * from chk_demo limit 10;

img

再演示几个查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 根据分区、offset查询指定位置数据
select _partition_id, _partition_offset, _message, _key 
from chk_demo 
where _partition_id = 0 and _partition_offset = 0;
#  _partition_id | _partition_offset | _message | _key 
# ---------------+-------------------+----------+------
#              0 |                 0 | test1    | NULL 
# (1 row)



# 查询指定时间戳范围的消息
select 
 _partition_id
 , _partition_offset
 , _message
 , _key 
from chk_demo 
where 
 format_datetime(_timestamp, 'yyyy-MM-dd HH:mm:ss') >= '2021-08-08 02:57' 
 and format_datetime(_timestamp, 'yyyy-MM-dd HH:mm:ss') <= '2021-08-08 02:58'
;
#  _partition_id | _partition_offset | _message | _key 
# ---------------+-------------------+----------+------
#              0 |                 0 | test1    | NULL 
#              1 |                 0 | test2    | NULL 
# (2 rows)

.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.