这篇文章给大家介绍kafka-Storm中如何将日志文件打印到local,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
阅读前提:
1 : 您可能需要对 logback 日志系统有所了解
2 :您可能需要对于 kafka 有初步的了解
3:请代码查看之前,请您仔细参考系统的业务图解
由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:
业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS
1: 一个正式环境系统的系统设计图解:
通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:
其一: 实时通道
其二:离线通道
在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。
也就是:
如下图所示:
在kafka之中,通常而言,有如下的 代码 用来处理:
在这里我们针对了2种日志,有两个Consumer用来处理
package com.mixbox.kafka.consumer;
public class logSave {
public static void main(String[] args) throws Exception {
Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
visitlog.start();
Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
orderlog.start();
}
}
在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。
package com.mixbox.kafka.consumer;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* @author Yin Shuai
*/
public class Consumer_Thread extends Thread {
// 在事实上我们会依据传递的topic名称,来生成不桐的记录机器
// private Logger _log_order = LoggerFactory.getLogger("order");
// private Logger _log_visit = LoggerFactory.getLogger("visit");
private Logger _log = null;
private final ConsumerConnector _consumer;
private final String _topic;
public Consumer_Thread(String topic) {
_consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this._topic = topic;
_log = LoggerFactory.getLogger(_topic);
System.err.println("log的名称" + _topic);
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
// 在这里我们的组ID为logSave
props.put("group.id", KafkaProperties.logSave);
props.put("zookeeper.session.timeout.ms", "100000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(_topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer
.createMessageStreams(topicCountMap);
for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) {
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while (iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> next = iterator.next();
try {
// 在这里我们分拆了一个Consumer 来处理visit日志
logFile(next);
System.out.println("message:"
+ new String(next.message(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
}
private void logFile(MessageAndMetadata<byte[], byte[]> next)
throws UnsupportedEncodingException {
_log.info(new String(next.message(), "utf-8"));
}
}
一个简单的小tips:
logback.xml ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<jmxConfigurator />
<!-- 控制台输出日志 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- 过滤掉 TRACE 和 DEBUG 级别的日志 -->
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> -->
<!-- <level>INFO</level> -->
<!-- </filter> -->
<!-- 按天来回滚,如果需要按小时来回滚,则设置为{yyyy-MM-dd_HH} -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>f:/opt/log/test.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 如果按天来回滚,则最大保存时间为1天,1天之前的都将被清理掉 -->
</rollingPolicy>
<!-- 日志输出格式 -->
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{36}-%msg%n</pattern>
</layout>
</appender>
<!-- 记录到日志 文件的滚动日志 -->
<appender name="ERROR"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
e:/logs/error/error.log
</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>
ERROR
</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<!-- 定义每天生成一个日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>e:/logs/yuanshi-%d{yyyy-MM-dd}.log</fileNamePattern>
<MaxHistory>10</MaxHistory>
</rollingPolicy>
<!-- 日志样式 -->
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{36}-%msg%n</pattern>
</layout>
</appender>
<!-- 记录到日志 文件的滚动日志 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>E:\logs\file\file.log</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<!-- 定义每天生成一个日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>e:/logs/venality-%d{yyyy-MM-dd}.log
</fileNamePattern>
<MaxHistory>10</MaxHistory>
</rollingPolicy>
<!-- 日志样式 -->
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{36}-%msg%n</pattern>
</layout>
</appender>
<appender name="visit"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>
E:\logs\visitlog\visit.log
</File>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>E:\logs\visit.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="visit" additivity="false" level="INFO">
<appender-ref ref="visit" />
</logger>
<appender name="order"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>
E:\logs\orderlog\order.log
</File>
<encoder>
<pattern>%msg%n
</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>E:\logs\order.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="order" additivity="false" level="INFO">
<appender-ref ref="order" />
</logger>
<root level="DEBUG">
<appender-ref ref="FILE" />
</root>
</configuration>
关于kafka-Storm中如何将日志文件打印到local就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。