Apache Apex的Operators

Operator是Apache Apex平台进行数据处理的最基本构建模块。Apex将要处理的数据抽象为tuple,而tuple就是流动在operator之间。Apex的operator可以通过由DAG(Directed Acyclic Graph,有向无环图)组成的Stream进行连接。因此,在Apex架构体系与数据流模型中,一个典型的Apex流应用是由DAG来呈现的,而DAG又包括了数据流(称之为streams)与操作(称之为operators)。

什么是Operator

Operator是独立的逻辑操作单元,可用于执行用例的业务逻辑。例如在ETL流程中,对数据的过滤就可以视为一个单独的operator。在Apex中,Operator的粒度并没有限制,你可以编写一个operator来完成整个业务逻辑,但为了更好地利用Apache Apex提供的分布式框架,通常还是建议将Operator定义为一个轻量级的独立任务(task)。Apex的API允许用户将任务拆分为不同的阶段(stage),这样就能使得所有的任务可以并行地执行在不同的tuple之上。

Operator的类型

在同一时间,一个operator只能工作在一个tuple上。tuple可以是其他operator提供,也可以是诸如数据库或消息队列之类的外部数据源。被处理的tuple又可以传递给另外的operator,或者存储到外部系统中。因此,基于功能可以将operator分为三种类型:

  • Input Adapter:是应用DAG的起点,用于从外部系统获取tuple。还有一种情况是无需与外部系统打交道,数据可以由operator自身生成。
  • Generic Operator:这种类型的operator会接收前置operator输入的tuple,对其进行处理后,将处理结果传递给DAG中的后续operator。
  • OutputAdapter:是应用DAG的终点,用于将数据写入到某些外部系统。

Operator在DAG中的位置

根据operator在DAG中的位置,我们以任何一个operator作为参考点(如下图的opr),那么针对所有与opr之间存在直接相关路径的operator,在opr之前的都可以称之为是上游operator,之后的则称之为下游operator。

注意:在DAG中没有环状关系。

Ports

在DAG中,operator通过streams进行连接。一个stream连接operator的端点则被称之为port。port有两种类型:

  • Input Port:operator通过input port接收上游operator传递过来的tuple
  • Output Port:operator通过output port将数据传递给下游operator

显然,Input Adapter这种operator将只有output port而没有input port,Generic Operator既有input也有output port,而Output Adapter则只有input port而没有output port,如下图所示:

Operator如何工作

每个operator都有自己的生命周期,并被分为多个stage。Stage以API的形式被Streaming Application Master调用。下图展示了Input Adapter、Generic Operator与Output Adapter的生命周期以及各阶段对应的方法:

各个方法的说明如下:

  • setup()方法:初始化operator,并为其准备待处理的tuple
  • beginWindow()方法:应用窗口(application window)的开启,方法中的实现都会在窗口启动之前被执行完成
  • process()方法:该方法属于InputPort,在tuple到达operator的input port时被触发。由于Input Adapter并没有input port,因此该方法只提供给Generic Operator与Output Adapter
  • emitTuples()方法:与process()相反,它只提供给Input Adapter,用于将从外部系统(或上游operator)获取到的tuple发射出去。该方法会被持续不断地被调用,直到预先配置的窗口时间达到才会停止
  • endWindow()方法:标记窗口结束,方法中的实现都会在窗口结束之后被执行
  • teardown()方法:用于友好地关闭operator,释放被operator占用的资源

文件IO流处理的案例

假设要通过Apex以流处理的形式读取文件,并创建一个stream将读取到的文件内容写入到另一个文件,同时创建另一个stream将读取到的文件内容在控制台显示出来。则整个DAG如下图所示:

Apex为文件IO流处理提供了AbstractFileInputOperator与AbstractFileOutputOperator两个抽象类,我们只需定义自己的类,并分别派生这两个类,就可以作为文件读写的input adapter与output adapter。

在FileReader这个input adapter中,分别定义了output与control两个output port,作为tuple的出口,将数据传入到各自的stream中;而在FileWriter中,则定义了input与control这两个input port,用以将stream中的tuple传递给它自己。如下代码所示:

@ApplicationAnnotation(name="FileIO")
public class Application implements StreamingApplication {
@Override
public void populateDAG(DAG dag, Configuration conf)
{
FileReader reader = dag.addOperator("read", FileReader.class);
FileWriter writer = dag.addOperator("write", FileWriter.class);

dag.addStream("data", reader.output, writer.input);
dag.addStream("ctrl", reader.control, writer.control);
}
}

在Application中,Apex还允许我们对Port设置属性。例如希望在writer operator中对读取过来的数据进行并行的分区处理,则可以设置为:

dag.setInputPortAttribute(writer.input, PARTITION_PARALLEL, true);
dag.setInputPortAttribute(writer.control, PARTITION_PARALLEL, true);

Operator是支撑Apache Apex进行流数据处理的重要组成部分。当我们使用Apex这个流处理平台时,主要的开发工作其实都是与Operator打交道。为了支持开发者能够更专注于自己的应用场景,Apex业已定义了大量常用的Operator,例如对TCP、HDFS、JDBC、JMS、各种文件、NoSQL等的支持。同时,它又提供了非常灵活的自定义operator支持。例如,它为其定义了Operator接口与提供了基本实现的BaseOperator类。它还定义了诸如AbstractFileInputOperator之类的与具体场景有关的大量抽象类模板,极大地减少了自定义operator的编码量。

Apex的架构核心是DAG,由Operator、Stream与Port三者组成。DAG既满足了高效地流处理需求,又非常形象地以图形建模形式帮助我们梳理处理流数据的业务逻辑和处理流程,降低了数据流建模的难度。

说明:本文内容主要来自Apache Apex官方文档。

您的赞赏是我创作的动力!