Apache Kafka的Apache Trafodion消费者

本文介绍了如何实现Apache Trafodion与Apache Kafka的无缝结合。我们展示了Trafodion如何轻松地获取数据,如何结合不同的开源组件,从而使用 Apache Kafka、 TrafodionHBase Hadoop创建近实时的流式处理工作流。

如何实现各组件的结合?

什么是Kafka?Kafka是一个分布式、分区、多复本的日志提交服务。Kafka维护按类区分的消息,称为主题(topic)。生产者(producer)向Kafka的主题发布消息。消费者(consumer)订阅主题,接收发布到这些主题的消息。一个主题就是一个类别或者一个可订阅的条目名称。对每个主题来说,Kafka维护的是一个分区日志(partitioned log)。客户端控制将消息发布到哪个分区。

Kafka集群包含一个或多个服务器,每台服务器被称为broker。消费者向分区的leader broker发出fetch请求。在每个请求中,消费者指定偏移量(offset),从该位置返回日志块。如果有需要,消费者可以将偏移量倒回,重新消费数据。同时,Kafka保留消费者在日志中的位置,即偏移量(offset)。消费者在读取消息时,会提高其偏移量。消费者也可以按照任意的顺序消费消息。分区允许日志扩展到超过单个服务器,但是每个分区的大小必须适应于其服务器。我们可以对给定主题的数据进行分区,以便处理大量数据。

接下来,关于Trafodion SQL引擎的工作原理。Trafodion SQL编译器为所有的关系操作使用运算符模型,包括进程间的消息传递、用于横向扩展处理的可扩展分区功能。编译器根据region的边界或统计数据信息,生成使用表的分区布局的并行查询计划。

在同一进程中以及在跨多个节点重新分区或收集数据时,查询引擎在操作符间使用数据流模型。Trafodion工作负载在运行时使用分区并行,从而并行处理多个数据分区。

在分区并行计划中,多个运算符为相同的计划工作。使用多队列或管道合并结果,再保存输入分区的sort顺序。由于数据被划分成多个独立执行的单元,所以分区也被称为“数据并行”。表映射UDF允许在Trafodion使用MapReduce模型编程。这些UDF可以使用可选的表值参数并生成表值输出。

如同Trafodion中的其他运算符,表映射UDF可以并行执行。可选的优化器接口允许表映射UDF的编写人员实现多态性,以便在编译时确定结果列(名称和类型),将谓词下推到UDF或其输入表,从而影响UDF的并行度并执行各种其他优化。

Trafodion优化器采用从上而下的方法,非常适合通过运算符优化查询(例如,表映射UDF);它不会假设查询树的运算符是硬编码的。除了MapReduce模型编程的常规应用程序,表映射UDF也是一个简便而强大的机制,可以将其他数据源整合到Trafodion。

结合Trafodion和Kafka的并行架构是非常有益的。例如,可以直接使用示例的Trafodion Kafka UDF,也可以进行一些修改,用于消费Kafka生产者发出的数据。一旦执行了查询,Kafka UDF就将启动等待中的Trafodion执行程序。

下图展示了完整的流程:

示例:将数据导入Trafodion

以下步骤将连续数据加载到8节点集群的Trafodion表:

  1. 创建用于加载数据的表。

CREATE TABLE employee (id int not null,
name varchar(20),
email varchar(20),
primary key(id))
SALT USING 4 PARTITIONS ON (id);

上表有4个分区;每个节点一个分区。

  1. 创建一个主题。

创建一个名为employee的主题,有4个分区和2个副本。

> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic employee

  1. 开启进程。

创建一个生产者。示例代码:

Producer producer = new Producer(config);
String topic = “employee” ;
// Produce strings in delimited form “id|name|email”
for (int i = 1 ; i < = 1000 ; i++) String msg = ; System.out.println(msg); data = new KeyedMessage(topic, String.valueOf(i), msg);
producer.send(data);
}

  1. 使用易鲸捷github资料库中的示例UDF(https://github.com/esgyn/code-examples),消费Trafodion中的数据。注意:该简单的示例UDF不是并行执行的。

SELECT *FROM udf(kafka(‘localhost:2181′, — zookeeper connection
0, — Kafka group id
’employee’, — Kafka topic
‘IC20C20’, — int, and two char output cols
‘|’, — field delimiter
100, — max. rows to read
10000)) — timeout 10 seconds
KafkaResult(id, name, email);– name the output columns