Storm的Transactional Topology怎么配置
更新:HHH   时间:2023-1-7


这篇文章主要讲解了“Storm的Transactional Topology怎么配置”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Storm的Transactional Topology怎么配置”吧!

1、什么是Transactional Topology?

    ○ 是一个每个tuple仅被处理一次的框架

    ○ 由Storm0.7引入,于Storm0.9被弃用,被triden取而代之

    ○ 底层依靠spout\bolt\topology\stream抽象的一个特性

2、Transactional Topology设计思路

    ○ 一次只处理一次tuple

        基于Storm处理tuple失败时会重发(replay),如何确保replay的记录不被重复记录,换句话说就是如何保证tuple仅被处理一次,这就依赖于一个称作强顺序性的思想。

        强顺序性:每个tuple与一个transaction id相关联,transaction id实际就是一个数字,每一个tuple都有一个按照顺序的transaction id(例如:tuple1的transaction id 为 1,tuple2的transaction id 为 2,...以此类推),只有当前的tuple处理并存储完毕,下一个tuple(处于等待状态)才能进行存储,tuple被存储时连同transaction id一并存储,此时考虑两种情况:

                        tuple处理失败时:重新发送一个和原来一模一样的transaction id

                        tuple处理成功时:发送的transaction id会和存储的transaction id对比,如果不存在transaction id,表示第一次记录,直接存储;如果发现存在,则忽略该tuple。

        这一思想是由Kafka开发者提出来的。

    ○ 一次处理一批tuple

        基于上面的一个优化,将一批tuple直接打包成一个batch,然后分配一个transaction id ,让batch与batch之间保证强顺序性,且batch内部的tuples可以并行。

    ○ Storm是如何采用的?

        两个步骤:

            1、并行计算batch中的tuple数量

            2、batch强顺序性存储

            在batch强顺序性存储的同时让其他等待存储的batch内部进行并行运算,不必等到下一个batch存储时才进行内部运算。

        在Storm上面的两个步骤表现为processing阶段commit阶段

3、一些设计细节

使用Transactional Topology时,storm提供如下操作:

    ○ 管理状态

        将需要处理的状态如:transaction id 、batch meta等状态信息放在zookeeper

    ○ 协调事务

        指定某个时间段执行processing操作和commit操作

    ○ 错误检测

        storm使用acking框架自动检测batch被成功或失败处理,然后相应的重发(replay)

    ○ 内置批处理API

        通过对普通的bolt进行包装,提供一套对batch处理的API、协调工作(即某个时刻处理某个processing或者commit),并且storm会自动清除中间结果

Transactional Topology是可以完全重发一个特定batch的消息队列系统,在 Kakfa中正是有这样的需求,为此Storm在storm-contrib里面的Storm-Kafka中为Kafka实现了一个事务性的spout。

4、来自Storm-Starter.jar的例子

    计算来自输入流中tuple的个数

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

    ○ 通过TransactionalTopologyBuilder类构建Transactional

        参数:

        Transaction ID:transactional topology的ID,在zookeeper中用于保存进度状态,重启topology时可以直接从执行的进度开始执行而不用重头到尾又执行一遍

        Spout ID:位于整个Topology的Spout的ID

        Spout Object:Transactional中的Spout对象

        Spout:Trasactional中的Spout的并行数

    ○ MemoryTransactionalSpout用于从一个内存变量中读取数据

        DATA:数据

        tuple fields:字段

        tupleNum:在batch中最大的tuple数

    ○ Bolts

        第一个Bolt采用随机分组的方式随机分发到各个task

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
    int _count = 0;
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }
    @Override
    public void execute(Tuple tuple) {
        _count++;
    }
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}

        BatchBolt对象运行在BatchBoltExecutor中,BatchBoltExecutor负责BatchBolt对象的创建和清理 

        BatchBolt的ID在context对象中,该ID是一个TransactionAttempt对象.

        BatchBolt在DRPC中也可以使用,只是txid类型不一样,如果在Transactional Topology中使用BatchBolt,可以继承BaseTransactionalBolt.

        在Tranasctional Topology中所有的Tuple都必须以TransactionAttempt作为第一个field,然后storm才能根据该field判断Tuple所属的BatchBolt,所以在发射Tuple必须满足此条件。

        TransactionAttempt对象中有两个属性:

            transaction id:强顺序性,无论重发多少次都是一样的数字

            attempt id:对每一个Batch标识的ID,每次重发都其值不一致,通过该ID可以区分每次重发的Tuple的不同版本

第二个Bolt使用GlobalGrouping汇总batch中的tuple数

 public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
    int _sum = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }
 
    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}

    ICommitter接口:实现该接口的Bolt会在commit阶段调用finishBatch方法,该方法的调用会按照强顺序性,此外还可以使用TransactionalTopologyBuilder的setCommiterBolt来添加Bolt实现和该接口一样的功能。

    executor方法:在processing阶段和commit阶段都可以执行。

    关于更多的transactional topology例子可以看看storm-starter中的TransactionalWords类,该例子会在一个事务中更新多个数据库

5、Transaction Topology API

    Bolt类

    BaiscBolt:该Bolt不跟batch中的tuples交互,仅基于单个传来的tuple和产生新的tuple

    BatchBolt:该Bolt处理batch中的tuples,对于每一个tuple调用executor方法,整个batch完成时调用finishBatch方法

    被Committer标记的Bolt:在commit阶段才调用finishBatch方法,commit具有强顺序性,标记Bolt为commit阶段执行finishBatch的方法有两种:1、实现ICommiter接口。2、TransactionalTopologyBuilder的setCommiterBolt来添加Bolt。

    Processing阶段和Commit阶段

    

    红色轮廓的Bolt被标记过为commit

    Spout向Bolt A发送整个Batch

    Bolt A处理完整个Batch之后调用finishBatch方法分别向Bolt B 和 Bolt C发送Batch

    Bolt B接收到Bolt A传递过来的tuple进行处理(此时还尚未处理完毕)不会调用finishBatch方法

    Bolt C接口Bolt A传递的tuple,尽管处理完Bolt A传递来的tuple,但是由于Bolt B还尚未commit,所以Bolt C处于等待Bolt B commit的状态,不会调用finishBatch方法

    Bolt D接收来自Bolt C调用executor方法时发送的所有tuple

    此时一旦Bolt B进行commit进行finishBatch操作,那么Bolt C就会确认接收到所有Bolt B的tuple,Bolt C也调用finishBatch方法,最终Bolt D也接收到所有来自Bolt C的batch。

    在这里尽管Bolt D是一个committer,它在接收到整个batch的tuple之后不需要等待第二个commit信号。因为它是在commit阶段接收到的整个batch,它会调用finishBatch来完成整个事务。

    Acking

注意,当使用transactional topology的时候你不需要显式地去做任何的acking或者anchoring,storm在背后都做掉了。(storm对transactional topolgies里面的acking机制进行了高度的优化)

    Failing a transaction

在使用普通bolt的时候, 你可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。Transactional Topology对用户隐藏了acking框架, 它提供一个不同的机制来fail一个batch(从而使得这个batch被replay):只要抛出一个FailedException就可以了。跟普通的异常不一样, 这个异常只会导致当前的batch被replay, 而不会使整个进程崩溃掉。

    Transactional spout

TransactionalSpout接口跟普通的Spout接口完全不一样。一个TransactionalSpout的实现会发送一批一批(batch)的tuple, 而且必须保证同一批次tuples的transaction id始终一样。

在transactional topology运行的时候, transactional spout看起来是这样的一个结构:

coordinator是一个普通的storm的spout——它一直为事务的batch发射tuple。

Emitter则像一个普通的storm bolt,它负责为每个batch实际发射tuple,emitter以all grouping的方式订阅coordinator的”batch emit”流。
关于如何实现一个TransactionalSpout的细节可以参见Javadoc

    Partitioned Transactional Spout

一种常见的TransactionalSpout是那种从多个queue broker读取数据然后再发射的tuple。比如TransactionalKafkaSpout就是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。更多可以参考Javadoc。

    配置

Transactional Topologies有两个重要的配置:

Zookeeper:默认情况下,transactional topology会把状态信息保存在一个zookeeper里面(协调集群的那个)。你可以通过这两个配置来指定其它的zookeeper:”transactional.zookeeper.servers” 和 “transactional.zookeeper.port“。

同时活跃的batch数量:你必须设置同时处理的batch数量,你可以通过”topology.max.spout.pending” 来指定, 如果你不指定,默认是1。

6、实现

Transactional Topologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。

1、transactional spout是一个子topology, 它由一个coordinator spout和一个emitter bolt组成。

2、coordinator是一个普通的spout,并行度为1;emitter是一个bolt,并行度为P,使用all分组方式连接到coordinator的“batch”流上。

3、coordinator使用一个acking框架决定什么时候一个batch被成功执行(process)完成,然后去决定一个batch什么时候被成功提交(commit)。

感谢各位的阅读,以上就是“Storm的Transactional Topology怎么配置”的内容了,经过本文的学习后,相信大家对Storm的Transactional Topology怎么配置这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!

返回云计算教程...