SLS日志服务的集成配置是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
配置
pom.xml依赖
<!-- 服务器-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
<version>0.1.4</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.4</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.33</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.16</version>
</dependency>
配置AliLogConfig
package com.yhzy.doudoubookserver.global.alilog;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.ProjectConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/23
* @since 1.0.0
*/
@Configuration
@Scope("singleton")
public class AliLogConfig {
public static String accessKeyId = "";
public static String accessKeySecret = "";
public static String endPoint = "";
public static String projectName = SLSEnvironment.BOOK_PROJECT;
@Bean
@ConditionalOnClass(LogProducer.class)
public LogProducer getLogProducer() {
LogProducer producer = new LogProducer(producerConfig());
producer.setProjectConfig(projectConfig());
return producer;
}
@Bean
@ConditionalOnClass(ProducerConfig.class)
public ProducerConfig producerConfig() {
ProducerConfig producerConfig = new ProducerConfig();
//被缓存起来的日志的发送超时时间,如果缓存超时,则会被立即发送,单位是毫秒
producerConfig.packageTimeoutInMS = 1000;
//每个缓存的日志包的大小的上限,不能超过5MB,单位是字节
producerConfig.logsBytesPerPackage = 5 * 1024 * 1024;
//每个缓存的日志包中包含日志数量的最大值,不能超过4096
producerConfig.logsCountPerPackage = 4096;
//单个producer实例可以使用的内存的上限,单位是字节
producerConfig.memPoolSizeInByte = 1000 * 1024 * 1024;
//IO线程池最大线程数量,主要用于发送数据到日志服务
producerConfig.maxIOThreadSizeInPool = 50;
//当使用指定shardhash的方式发送日志时,这个参数需要被设置,否则不需要关心。后端merge线程会将映射到同一个shard的数据merge在一起,而shard关联的是一个hash区间,
//producer在处理时会将用户传入的hash映射成shard关联hash区间的最小值。每一个shard关联的hash区间,producer会定时从loghub拉取,该参数的含义是每隔shardHashUpdateIntervalInMS毫秒,
producerConfig.shardHashUpdateIntervalInMS = 10 * 60 * 1000;
producerConfig.retryTimes = 3;
return producerConfig;
}
@Bean
@ConditionalOnClass(ProjectConfig.class)
public ProjectConfig projectConfig() {
return new ProjectConfig(projectName, endPoint, accessKeyId, accessKeySecret);
}
/**
* 读取sls对象 用于读取数据
* @return
*/
@Bean
public Client client(){
String accessId = "";
String accessKey = "";
String host = "";
return new Client(host, accessId, accessKey);
}
}
配置AliLogUtil
package com.yhzy.doudoubookserver.common;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.producer.LogProducer;
import com.yhzy.doudoubookserver.global.alilog.AliLogConfig;
import com.yhzy.doudoubookserver.global.alilog.CallbackLogInfo;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Vector;
/**
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/23
* @since 1.0.0
*/
@Component
public class AliLogUtil {
@Resource
private AliLogConfig aliLogConfig;
public void saveLog(String projectName,String logStore, Vector<LogItem> logGroup, String topic, String source,Long millis) throws InterruptedException {
final LogProducer logProducer = aliLogConfig.getLogProducer();
// 并发调用 send 发送日志
logProducer.send(projectName, logStore, topic, source, logGroup,
new CallbackLogInfo(projectName, logStore, topic,null, source, logGroup, logProducer));
//主动刷新缓存起来的还没有被发送的日志
logProducer.flush();
//等待发送线程退出
Thread.sleep(millis);
//关闭后台io线程,close会将调用时刻内存中缓存的数据发送出去
logProducer.close();
}
}
配置CallbackLogInfo
package com.yhzy.doudoubookserver.global.alilog;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.producer.ILogCallback;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.response.PutLogsResponse;
import java.util.Vector;
/**
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/23
* @since 1.0.0
*/
public class CallbackLogInfo extends ILogCallback {
// 保存要发送的数据,当时发生异常时,进行重试
public String project;
public String logstore;
public String topic;
public String shardHash;
public String source;
public Vector<LogItem> items;
public LogProducer producer;
public int retryTimes = 0;
public CallbackLogInfo(String project, String logstore, String topic, String shardHash, String source,
Vector<LogItem> items, LogProducer producer) {
super();
this.project = project;
this.logstore = logstore;
this.topic = topic;
this.shardHash = shardHash;
this.source = source;
this.items = items;
this.producer = producer;
}
public void onCompletion(PutLogsResponse response, LogException e) {
if (e != null) {
// 打印异常
System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
// 最多重试三次
if (retryTimes++ < 3) {
producer.send(project, logstore, topic, source, shardHash, items, this);
}
} else {
//请求id
System.out.println("send success, request id: " + response.GetRequestId());
}
}
}
配置SLSEnvironment
记录sls日志相关 project & logStore 参数名配置
package com.yhzy.doudoubookserver.global.alilog;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
/**
* sls日志相关 project & logStore 参数名配置
*
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/25
* @since 1.0.0
*/
public interface SLSEnvironment {
/** 测试project*/
String TEST_PROJECT = "test_project";
/** 测试logStore*/
String TEST_LOGSTORE = "test_logstore";
/** 开始时间*/
public static Integer FROM() {
return Math.toIntExact(LocalDateTime.now().plusDays(-3).withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000);
}
/** 结束时间*/
public static Integer TO() {
return Math.toIntExact(LocalDateTime.now().plusDays(1).withHour(23).withMinute(59).withSecond(59).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000);
}
/** 最小开始时间 该时间为2020年12月1日的unix时间戳*/
public static Integer MINFROM(){
return 1606752000;
}
/** 最大结束时间 该时间为当前时间的unix时间戳*/
public static Integer MAXTO() {
return Math.toIntExact((System.currentTimeMillis()+86400000) / 1000);
}
}
读取写入测试AliLogTest
读取es中的数据写入到sls中
package com.yhzy;
/**
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/24
* @since 1.0.0
*/
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.*;
import com.yhzy.doudoubookserver.common.AliLogUtil;
import com.yhzy.doudoubookserver.global.alilog.SLSEnvironment;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
/**
* @author zhangqinghe
* @version 1.0.0
* @email her550@dingtalk.com
* @date 2020/12/23
* @since 1.0.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DoudouSLSApplication.class)
public class AliLogTest {
@Resource
private AliLogUtil aliLogUtil;
@Resource
private RestHighLevelClient restHighLevelClient;
/**
* 读取分析测试
* @throws LogException
*/
@Test
public void read() throws LogException {
String accessId = "";
String accessKey = "";
String host = "";
String project = "zqhtest";
String logStore = "test_logstore";
Client client = new Client(host, accessId, accessKey);
long time = new Date().getTime()/1000;
/*
* project : Project名称。
* logStore : LogStore名称。
* from : 查询开始时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。
* to : 查询结束时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。
* topic : 日志主题。
* query : 查询分析语句。
具体查看 https://help.aliyun.com/document_detail/53608.html?spm=a2c4g.11186623.2.17.56322e4acEkU4X#concept-nyf-cjq-zdb
* line : 请求返回的最大日志条数。最小值为0,最大值为100,默认值为100。 没用明白呢...这个和query中的limit是两个东西????
* offset : 查询开始行。默认值为0。
* reverse : 是否按日志时间戳逆序返回日志,精确到分钟级别。默认值为false。
true:按照逆序返回日志。
false:按照顺序返回日志。
*/
GetLogsResponse getLogsResponse = client.GetLogs(
project,
logStore,
(int) (time - 36000000),
(int) time,
"",
"* | select app_version,channel_id,content_id,COUNT(content_id) contentCount,COUNT(content_id) chapterCount, sum(time) timeSum GROUP BY app_version,channel_id,content_id LIMIT 100010",
25,
0,
false);
for (QueriedLog getLog : getLogsResponse.GetLogs()) {
System.out.println(getLog.GetLogItem().ToJsonString());
}
}
/**
* 采集数据测试
* 读取es中的数据,写入到sls中
* @throws IOException
*/
@Test
public void test1() throws IOException {
Vector<LogItem> logItems = new Vector<>();
//读取es中的书籍阅读数据
SearchRequest request = new SearchRequest("bookanalysis");
long start = LocalDateTime.now().plusDays(-1).withHour(11).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli();
long end = LocalDateTime.now().plusDays(-1).withHour(12).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
List<QueryBuilder> filter = boolQuery.filter();
//时间区间条件
filter.add(rangeQuery(start, end, "create_time"));
//这里拼接动态条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//添加条件
searchSourceBuilder.query(boolQuery);
request.source(searchSourceBuilder.trackTotalHits(true).size(15000));
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
for (SearchHit hit : response.getHits().getHits()) {
Map<String, Object> map = hit.getSourceAsMap();
LogItem logItem = new LogItem();
logItem.PushBack("app_version",map.get("app_version")==null?"1":map.get("app_version").toString());
logItem.PushBack("book_id",map.get("book_id")==null?"1":map.get("book_id").toString());
logItem.PushBack("book_name",map.get("book_name")==null?"1":map.get("book_name").toString());
logItem.PushBack("category_id_1",map.get("category_id_1")==null?"1":map.get("category_id_1").toString());
logItem.PushBack("category_id_2",map.get("category_id_2")==null?"1":map.get("category_id_2").toString());
logItem.PushBack("channel_id",map.get("channel_id")==null?"1":map.get("channel_id").toString());
logItem.PushBack("chapter_name",map.get("chapter_name")==null?"1":map.get("chapter_name").toString());
logItem.PushBack("content_id",map.get("content_id")==null?"1":map.get("content_id").toString());
logItem.PushBack("create_time",map.get("create_time")==null?"1":map.get("create_time").toString());
logItem.PushBack("device_id",map.get("device_id")==null?"1":map.get("device_id").toString());
logItem.PushBack("is_new",map.get("is_new")==null?"1":map.get("is_new").toString());
logItem.PushBack("is_official",map.get("is_official")==null?"1":map.get("is_official").toString());
logItem.PushBack("is_vip",map.get("is_vip")==null?"1":map.get("is_vip").toString());
logItem.PushBack("log_type",map.get("log_type")==null?"1":map.get("log_type").toString());
logItem.PushBack("position",map.get("position")==null?"1":map.get("position").toString());
logItem.PushBack("sex",map.get("site")==null?"1":map.get("site").toString());
logItem.PushBack("time",map.get("time")==null?"1":map.get("time").toString());
logItem.PushBack("user_id",map.get("user_id")==null?"1":map.get("user_id").toString());
logItems.add(logItem);
}
try {
aliLogUtil.saveLog(SLSEnvironment.BOOK_PROJECT,SLSEnvironment.BOOK_LOGSTORE,logItems,"read","source ip",1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static RangeQueryBuilder rangeQuery(Long startTime, Long endTime, String name){
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(name);
if (startTime != null) {
rangeQueryBuilder.gte(startTime);
}
if (endTime != null) {
rangeQueryBuilder.lte(endTime);
}
return rangeQueryBuilder;
}
}
看完上述内容,你们掌握SLS日志服务的集成配置是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注天达云行业资讯频道,感谢各位的阅读!