这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
package storm.contrib.hbase.bolts;
import static backtype.storm.utils.Utils.tuple;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;
/*
一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送
* Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
*/
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
private static transient HBaseConnector connector = null;
private static transient HBaseConfiguration conf = null;
private static transient HBaseCommunicator communicator = null;
OutputCollector _collector;
/*
* Constructor initializes the variables storing the hbase table information and connects to hbase
*/
public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {
this.tableName = tableName;
this.colFamilyName = colFamilyName;
this.colName = colName;
this.rowKeyField = rowKeyField;
connector = new HBaseConnector();
conf = connector.getHBaseConf(hbaseXmlLocation);
communicator = new HBaseCommunicator(conf);
}
/*
* emits the value of the column with name @colName and rowkey @rowKey
* @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String rowKey = input.getStringByField(this.rowKeyField);
columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
collector.emit(tuple(rowKey, columnValue));
}
public void prepare(Map confMap, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("rowKey", "columnValue"));
}
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> map = null;
return map;
}
public void prepare(Map stormConf, TopologyContext context) {
}
}
package storm.contrib.hbase.bolts;
import static backtype.storm.utils.Utils.tuple;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import storm.contrib.hbase.utils.HBaseCommunicator;
import storm.contrib.hbase.utils.HBaseConnector;
/*
* Reads the specified column of HBase table and emits the row key and the column values in the form of tuples
*/
public class HBaseColumnValueLookUpBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null;
private static transient HBaseConnector connector = null;
private static transient HBaseConfiguration conf = null;
private static transient HBaseCommunicator communicator = null;
OutputCollector _collector;
/*
* Constructor initializes the variables storing the hbase table information and connects to hbase
*/
public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) {
this.tableName = tableName;
this.colFamilyName = colFamilyName;
this.colName = colName;
this.rowKeyField = rowKeyField;
connector = new HBaseConnector();
conf = connector.getHBaseConf(hbaseXmlLocation);
communicator = new HBaseCommunicator(conf);
}
/*
* emits the value of the column with name @colName and rowkey @rowKey
* @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String rowKey = input.getStringByField(this.rowKeyField);
//通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。
columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName);
collector.emit(tuple(rowKey, columnValue));
}
public void prepare(Map confMap, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("rowKey", "columnValue"));
}
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> map = null;
return map;
}
public void prepare(Map stormConf, TopologyContext context) {
}
}
Rowkey
package storm.contrib.hbase.spouts;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import java.util.UUID;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;
/*
这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。
* Spout emitting tuples containing the rowkey of the hbase table
*/
public class RowKeyEmitterSpout implements IRichSpout {
private static final long serialVersionUID = 6814162766489261607L;
public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public RowKeyEmitterSpout() {
this(true);
}
public RowKeyEmitterSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public boolean isDistributed() {
return _isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
Thread.yield();
final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word), UUID.randomUUID());
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public void activate() {
}
public void deactivate() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
// 我们用来简单的测试系统的代码,测试接口是否正确
package storm.contrib.hbase.spouts;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class TestSpout implements IRichSpout {
SpoutOutputCollector _collector;
Random _rand;
int count = 0;
public boolean isDistributed() {
return true;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[] { "hello", "tiwari", "indore", "jayati"};
Integer[] numbers = new Integer[] {
1,2,3,4,5
};
if(count == numbers.length -1) {
count = 0;
}
count ++;
int number = numbers[count];
String word = words[count];
int randomNum = (int) (Math.random()*1000);
_collector.emit(new Values(word, number));
}
public void close() {
}
public void ack(Object id) {
}
public void fail(Object id) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "number"));
}
public void activate() {
}
public void deactivate() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。
以上是“Storm-Hbase接口怎么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注天达云行业资讯频道!