这篇文章主要介绍“Flume怎么自定义Event Serializer序列化类”,在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flume怎么自定义Event Serializer序列化类”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):
public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;//hbase表
private byte[] cf;//列簇
private byte[][] payload;//列集合
private byte[][] payloadColumn;//列值
private byte[] incrementColumn;
private String rowSuffix;//roykey后缀
private String rowPrefix;//rowkey前缀
private byte[] incrementRow;
private KeyType keyType;//rowkey后缀类型
private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
//设置主键后缀类型,这里使用时间戳
keyType = KeyType.TS;
if (iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}
@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}
@Override
public void initialize(byte[] table, byte[] cf) {
// TODO Auto-generated method stub
this.table = table;
this.cf = cf;
}
/**
*
* @Title: setEvent
* @Description: 获取日志信息,并解析出HBase的列以及列的value值
* @param event
* @throws
* @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)
*/
@Override
public void setEvent(Event event) {
// TODO Auto-generated method stub
//获取日志信息
String log = new String(event.getBody(), StandardCharsets.UTF_8);
//headers包含日志中项目编号和host信息
Map<String, String> headers = event.getHeaders();
JsonReader jsonReader = new JsonReader(new StringReader(log));
String name = "";
String value = "";
String path = "";
Map<String, String> kv = new HashMap<String, String>();
try {
//解析日志中的键值对缓存到map中
jsonReader.beginObject();
while (jsonReader.hasNext()) {
name = jsonReader.nextName();
value = jsonReader.nextString();
if(name.equals("uri"))
path = value.split(" ")[1];
kv.put(name, value);
}
jsonReader.endObject();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//解析headers中的项目id和服务host、路径
if(path.contains("?")){
path = path.substring(0, path.indexOf("?"));
}
String pcode = headers.get("pcode");
String host = headers.get("host");
//将项目编号和服务器host添加到map中
kv.put("pcode",pcode);
kv.put("host", host);
//初始化列和value数组
this.payloadColumn = new byte[kv.keySet().size()][];
this.payload = new byte[kv.keySet().size()][];
int i = 0;
//给hbase的列和value赋值
for (String key : kv.keySet()) {
this.payloadColumn[i] = key.getBytes();
this.payload[i] = kv.get(key).getBytes();
i++;
}
//设置rowkey的前缀 格式是项目编号+路径
this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");
}
@Override
public List<PutRequest> getActions() {
// TODO Auto-generated method stub
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
byte[] rowKey;
try {
rowKey = rowSuffix.getBytes();
// for 循环,提交所有列和对于数据的put请求。
for (int i = 0; i < this.payload.length; i++) {
PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);
actions.add(putRequest);
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
@Override
public List<AtomicIncrementRequest> getIncrements() {
// TODO Auto-generated method stub
List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
if (incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}
@Override
public void cleanUp() {
// TODO Auto-generated method stub
}
}
到此,关于“Flume怎么自定义Event Serializer序列化类”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!