这篇文章主要介绍storm中trident是什么,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
简介
Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特点以batch的形式处理stream。
一些最基本的操作函数有Filter、Function,Filter可以过滤掉tuple,Function可以修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。
聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操作。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操作,它们会把stream重定向到一个partition中进行聚合操作。
重定向操作会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操作,不会产生网络传输。
GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。
Trident主要有5类操作:
1、作用在本地的操作,不产生网络传输。
2、对数据流的重分布,不改变流的内容,但是产生网络传输。
3、聚合操作,有可能产生网络传输。
4、作用在分组流(grouped streams)上的操作。
5、Merge和join
partition
概念
partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。
操作
既然有partition的概念,那么也就有partition的操作。Trident提供的分区操作,类似于Storm里面讲的grouping。分区操作有:
重分区操作通过运行一个函数改变元组在任务之间的分布,也可以调整分区的数量(比如重分区之后将并行度调大),重分区需要网络传输的参与。重分区函数包含以下这几个:
shuffle:使用随机轮询算法在所有目标分区间均匀分配元组;
broadcast:每个元组复制到所有的目标分区。这在DRPC中非常有用,例如,需要对每个分区的数据做一个stateQuery操作;
partitionBy:接收一些输入字段,根据这些字段输入字段进行语义分区。通过对字段取hash值或者取模来选择目标分区。partitionBy保证相同的字段一定被分配到相同的目标分区;
global:所有的元组分配到相同的分区,该分区是流种所有batch决定的;
batchGlobal:同一个batch中的元组被分配到相同的目标分区,不同batch的元组有可能被分配到不同的目标分区;
partition:接收一个自定义的分区函数,自定义分区函数需要实现backtype.storm.grouping.CustomStreamGrouping接口。
注意,除了这里明确提出来的分区操作,Trident里面还有aggregate()函数隐含有分区的操作,它用的是global()操作,这个在后面接收聚合操作的时候还会再介绍。
API
each() 方法
作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。
下面通过一个例子来介绍each()方法,假设我们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。我们可以通过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。
1.Filter类:过滤tuple
一个通过actor字段过滤消息的Filter:
public static class PerActorTweetsFilter extends BaseFilter {
String actor;
public PerActorTweetsFilter(String actor) {
this.actor = actor;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return tuple.getString(0).equals(actor);
}
}
Topology:
topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.each(new Fields("actor", "text"), new Utils.PrintFilter());
从上面例子看到,each()方法有一些构造参数
2.Function类:加工处理tuple内容
一个能把tuple中text内容变成大写的Function:
public static class UppercaseFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
collector.emit(new Values(tuple.getString(0).toUpperCase()));
}
}
Topology:
topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
.each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。
其次,它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function之后的tuple内容多了一个"uppercased_text",并且这个字段排在最后面。
3. Field Selector与project
我们需要注意的是,上面每个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,如果想要彻底丢掉它们,我们就需要用到project()方法。
投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”],运行如下代码:
mystream.project(new Fields("b", "d"))
则输出的流仅包含 [“b”, “d”]字段。
aggregation的介绍
首先聚合操作分两种:partitionAggregate(),以及aggregate()。
1.partitionAggregate
partitionAggregate()的操作是在partition上,一个batch的tuple被分成多个partition后,每个partition都会单独运行partitionAggregate中指定的聚合操作。分区聚合在一批tuple的每一个分区上运行一个函数。与函数不同的是,分区聚合的输出元组会覆盖掉输入元组。请看如下示例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假设你有一个包含a,b两个字段的输入流,元组的分区情况如下:
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
运行上面的那一行代码将会输出如下的元组,这些元组只包含一个sum字段:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
2.aggregate
aggregate()隐含了一个global分区操作,也就是它做的是全局聚合操作。它针对的是整个batch的聚合计算。
这两种聚合操作,都可以传入不同的aggregator实现具体的聚合任务。Trident中有三种aggregator接口,分别为:ReducerAggregator,CombinerAggregator,Aggregator。
下面是CombinerAggregator接口的定义:
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每个输入元组上运行init函数,然后通过combine函数聚合结果值直到只剩下一个元组。如果分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。比如,下面是Count聚合器的实现:
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
ReducerAggregator接口的定义如下:
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
ReducerAggregator通过init函数得到一个初始的值,然后对每个输入元组调用reduce方法计算值,产生一个元组作为输出。比如Count的ReducerAggregator实现如下:
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
最常用的聚合器的接口是Aggregator,它的定义如下:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
Aggregator能够发射任意数量,任意字段的元组。并且可以在执行期间的任何时候发射元组,它的执行流程如下:
处理batch之前调用init方法,init函数的返回值是一个表示聚合状态的对象,该对象会传递到aggregate和complete函数;
每个在batch分区中的元组都会调用aggregate方法,该方法能够更新聚合状态并且发射元组;
当batch分区中的所有元组都被aggregate函数处理完时调用complete函数。
下面是使用Aggregator接口实现的Count聚合器:
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
有些时候,我们需要通知执行很多个聚合器,则可以使用如下的链式调用执行:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
上面的代码将会在每一个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。
最重要的区别是CombinerAggregator,它是先在partition上做partial aggregate,然后再将这些部分聚合结果通过global分区到一个总的分区,在这个总的分区上对结果进行汇总。
groupBy()分组操作
首先它包含两个操作,一个是分区操作,一个是分组操作。
如果后面是partitionAggregate()的话,就只有分组操作:在每个partition上分组,分完组后,在每个分组上进行聚合;
如果后面是aggregate()的话,先根据partitionBy分区,在每个partition上分组,,分完组后,在每个分组上进行聚合。
parallelismHint并发度的介绍
它设置它前面所有操作的并发度,直到遇到某个repartition操作为止。
topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter());
意味着:parallelismHit之前的spout,each都是5个相同的操作一起并发,对,一共有5个spout同时发射数据,其实parallelismHint后面的each操作,也是5个并发。分区操作是作为Bolt划分的分界点的。
如果想单独设置Spout怎么办?要在Spout之后,Bolt之前增加一个ParallelismHint,并且还要增加一个分区操作:
topology.newStream("spout", spout)
.parallelismHint(2)
.shuffle()
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter());
很多人只是设置了Spout的并发度,而没有调用分区操作,这样是达不到效果的,因为Trident是不会自动进行分区操作的。像我之前介绍的,先分区,再设置并发度。如果Spout不设置并发度,只设置shuffle,默认是1个并发度,这样后面设置5个并发度不会影响到Spout,因为并发度的影响到shuffle分区操作就停止了。
例子
groupBy+aggregate+parallelismHint
package com.demo;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
public class MyAgg extends BaseAggregator<Map<String, Integer>> {
/**
*
*/
private static final long serialVersionUID = 1L;
/**
* 属于哪个分区
*/
private int partitionId;
/**
* 分区数量
*/
private int numPartitions;
private String batchId;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
partitionId = context.getPartitionIndex();
numPartitions = context.numPartitions();
}
public void aggregate(Map<String, Integer> val, TridentTuple tuple,
TridentCollector collector) {
String word = tuple.getString(0);
Integer value = val.get(word);
if (value == null) {
value = 0;
}
value++;
// 把数据保存到一个map对象中
val.put(word, value);
System.err.println("I am partition [" + partitionId
+ "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId);
}
public void complete(Map<String, Integer> val, TridentCollector collector) {
collector.emit(new Values(val));
}
public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
this.batchId = arg0.toString();
return new HashMap<String, Integer>();
}
}
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology
.newStream("spout", spout)
.shuffle()
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new MyAgg(),
new Fields("Map"))
.parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0
groupBy+partitionAggregate+parallelismHint
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology
.newStream("spout", spout)
.shuffle()
.groupBy(new Fields("sentence"))
.partitionAggregate(new Fields("sentence"), new MyAgg(),
new Fields("Map")))
.toStream()
.parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0
由于shuffle已经把tuple平均分配给5个partition了,用groupBy+partitionAggregate来聚合又没有partitionBy分区的作用,所以,直接在5个分区上进行聚合,结果就是每个分区各有一个tuple。
而用groupBy+aggregate,虽然也是shuffle,但是由于具有partitiononBy分区的作用,值相同的tuple都分配到同一个分区,结果就是每个分区根据不同的值来做汇聚。
aggregate+parallelismHint(没有groupBy)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology
.newStream("spout", spout)
.shuffle()
.aggregate(new Fields("sentence"), new MyAgg(),
new Fields("Map"))
.parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [1] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0
partitionAggregate+parallelismHint(没有groupBy操作)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology
.newStream("spout", spout)
.shuffle()
.partitionAggregate(new Fields("sentence"), new MyAgg(),
new Fields("Map"))
.toStream()
.parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0
我们可以发现,partitionAggregate加上groupBy,或者不加上groupBy,对结果都一样:groupBy对于partitionAggregate没有影响。但是对于aggregate来说,加上groupBy,就不是做全局聚合了,而是对分组做聚合;不加上groupBy,就是做全局聚合。
如果spout设置并行度,但是没有加shuffle,不会起作用,分区默认为1,;如果不设置并行度并且没有加shuffle,分区默认为1。
Merge和Joins
api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:
topology.merge(stream1, stream2, stream3);
Trident指定新的合并之后的流中的字段为stream1中的字段。
另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。
下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
stream1流的key字段与stream2流的x字段组join操作,另外,Trident要求所有新流的输出字段被重命名,因为输入流可能包含相同的字段名称。连接流发射的元组将会包含:
连接字段的列表。在上面的例子中,字段key对应stream1的key,stream2的x;
来自所有流的所有非连接字段的列表,按照传递到连接方法的顺序排序。在上面的例子中,字段a与字段b对应stream1的val1和val2,c对应于stream2的val1.
当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。
以上是“storm中trident是什么”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注天达云行业资讯频道!