这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)
serializerType = context.getString("serializer", "TEXT");
枚举的类如下:
public enum EventSerializerType {
TEXT(BodyTextEventSerializer.Builder.class),
HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),
AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),
CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类
OTHER(null);
private final Class<? extends EventSerializer.Builder> builderClass;
EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) {
this.builderClass = builderClass;
}
public Class<? extends EventSerializer.Builder> getBuilderClass() {
return builderClass;
}
}
在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。
自定义的序列化类如下:
public class CUSTOMEventSerializer implements EventSerializer {
private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class);
private final String SPLITCHAR = "\001";//列分隔符
// for legacy reasons, by default, append a newline to each event written
// out
private final String APPEND_NEWLINE = "appendNewline";
private final boolean APPEND_NEWLINE_DFLT = true;
private final OutputStream out;
private final boolean appendNewline;
private CUSTOMEventSerializer(OutputStream out, Context ctx) {
this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);
this.out = out;
}
@Override
public boolean supportsReopen() {
return true;
}
@Override
public void afterCreate() {
// noop
}
@Override
public void afterReopen() {
// noop
}
@Override
public void beforeClose() {
// noop
}
@Override
public void write(Event e) throws IOException {
// 获取日志信息
String log = new String(e.getBody(), StandardCharsets.UTF_8);
logger.info("-----------logs-------" + log);
// headers包含日志中项目编号和host信息
Map<String, String> headers = e.getHeaders();
String parsedLog = parseJson2Value(log, headers);
out.write(parsedLog.getBytes());
logger.info("-----------values-------" + parsedLog);
logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes());
out.write('\n');
}
/**
*
* @Title: parseJson2Value
* @Description: 解析出json日志中的value。
* @param log json格式日志
* @param headers event头信息
* @return
* @return String 解析后的日志
* @throws
*/
private String parseJson2Value(String log, Map<String, String> headers) {
log.replace("\\", "/");
String time = "";
String path = "";
Object value = "";
StringBuilder values = new StringBuilder();
ObjectMapper objectMapper = new ObjectMapper();
try {
Map<String,Object> m = objectMapper.readValue(log, Map.class);
for(String key:m.keySet()){
value = m.get(key);
if (key.equals("uri")){
//解析访问路径
path = pasreUriToPath(value.toString());
}
if(key.equals("time")){
time = value.toString().substring(10);
}
values.append(value).append(this.SPLITCHAR);
}
} catch (JsonParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 解析headers中的项目编号和服务host
String pcode = headers.get("pcode");
String host = headers.get("host");
values.append(path).append(this.SPLITCHAR).
append(pcode).append(this.SPLITCHAR).
append(host).append(this.SPLITCHAR).
append(time).append(this.SPLITCHAR);
//value字符串
return values.toString();
}
@Override
public void flush() throws IOException {
// noop
}
public static class Builder implements EventSerializer.Builder {
@Override
public EventSerializer build(Context context, OutputStream out) {
CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context);
return s;
}
}
/**
* 把请求uri转换成具体的访问路径
*
* @param uri 请求uri
* @return 访问路径
*/
protected String pasreUriToPath(String uri){
if(uri == null || "".equals(uri.trim())){
return uri;
}
int index = uri.indexOf("/");
if(index > -1){
uri = uri.substring(index);
}
index = uri.indexOf("?");
if(index > -1){
uri = uri.substring(0, index);
}
index = uri.indexOf(";");
if(index > -1){
uri = uri.substring(0, index);
}
index = uri.indexOf(" HTTP/1.1");
if(index > -1){
uri = uri.substring(0, index);
}
index = uri.indexOf("HTTP/1.1");
if(index > -1){
uri = uri.substring(0, index);
}
return uri;
}
}
关于“flume中hdfssink如何自定义EventSerializer序列化类”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。