通过kafka和etcd实现多个服务器的日志收集和检索

image.png

组件介绍

LogAgent:日志收集客户端,用来收集服务器上所有服务的日志,然后发往kafka。它根据配置项来收集日志,将配置项存储etcd中。用web管理页面来管理配置项,当扩容或增加服务时只需要在web上设置就行

Kafka:高吞吐量的分布式队列

ElasticSearch:开源的搜索引擎,提供基于HTTP RESTful的web接口,给日志信息提供索引,让日志支持检索

Kibaa:开源的ES数据分析和可视化工具。

Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台

Storm:一个免费并开源的分布式实时计算系统

通过Hadoop和Storm可以进行实时数据计算,来分析数据。

消息队列的两种通信模式

点对点模式(queue)

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。一条消息被消费以后,queue中就没有了,不存在重复消费。

发布/订阅模式(topic)

消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费(类似于关注了微信公众号的人都能收到推送的文章)

注意:发布订阅模式下,如果发布者发布的消息量很大时,单个订阅者的处理能力是不足的(接受缓慢)。因此在现实场景中是多个订阅者节点组成一个订阅组负载均衡来消费topic消息**(分组订阅)**,这样订阅者很容易实现消费能力线性扩展。可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式(这样每个Queue都轮询接受topic的消息,每个Queue接受部分消息)

Kafka

Kafka是一个分布式的数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群,它提供了发布和订阅功能,使用者可以发送数据到Kafka中,也可以从Kafka中读取数据(以便进行后续的处理),Kafka具有高吞吐、低延迟、高容错等特点

image.png

image.pngimage.png

工作流程

image.png

一定要记住:Producer在写入数据的时候是把数据写入到leader中,不会直接将数据写入follower。

选择partition的原则

image.png

ACK应答机制

image.png

Topic和数据日志

image.pngimage.pngimage.png

Partition结构

image.pngimage.pngimage.png

logAgent读取配置文件版:https://github.com/XiaoNuoZ/logAgentByConfig

etcd

etcd是使用Go语言开发的一个开源的、高可用的分布式key-value存储系统,可以用于配置共享和服务的注册和发现(微服务)

etcd具有以下特点:

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 快速:每秒10000次写入的基准速度
  • 可靠:使用Raft算法实现了强一致、高可用

etcd使用场景

将配置信息放到etcd上集中管理的使用方式:应用在启动的时候主动从etcd获取一次配置信息,同时在etcd节点上注册一个Watcher并等待,以后每次配置有更新的时候,etcd都会实时通知订阅者,一次达到获取最新配置信息的目的

etcd也经常用于服务发现,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接?本质上来说,服务发现就是了解集群中是否有进程在监听udp或tcp端口,并且通过名字就可以查找和连接

image.png

etcd原理

image.png

Raft协议:Raft协议原理详解

image.pngimage.png

etcd的搭建

image.pngimage.pngimage.pngimage.png

为了避免在生产环境中有多个集群,因此使用token来区分,CLUSTER_STATE是状态的意思,new表示新建一个节点,CLUSTER表示它属于哪个集群

etcd的下载启动

1
2
下载:https://github.com/etcd-io/etcd/releases
启动:解压后直接对etcd.exe右键管理员运行即可(普通的cmd不是管理员运行,可能会出错)

注意:连接etcd时,默认的etcdctl使用的是v2版本的命令,需要设置环境变量 SET ETCDCTL_API=3 来使用v3版本的API,而默认的 ETCDCTL_API=2是使用v2版本的API(针对老版本,最新版已修复)

etcd的常用的有put(设值)、get(取值)、watch(监听)

1
2
etcdctl.exe --endpoints=http://127.0.0.1:2379 put xiaonuo 23		//设置key和value
etcdctl.exe --endpoints=http://127.0.0.1:2379 get xiaonuo		//通过Key取值

watch用来获取未来更改 的通知,当值发生改变便会通知

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)
func main(){
	//etcd连接
	cli,err:=clientv3.New(clientv3.Config{
		Endpoints: []string{"127.0.0.1:2379"},	//连接对象
		DialTimeout: 5*time.Second,		//5秒没连接上就超时返回err
	})
	if err!=nil{
		fmt.Println("connect to etcd faild,err:",err)
		return
	}
	defer cli.Close()

	//etcd Put
	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
	_,err=cli.Put(ctx,"xiaonuo","23")
	//这里没开携程,因此会put执行完才执行cancel()put如果一秒后还没有成功就会自动结束上下文cancel()是避免等待直到垃圾回收结束它。
	cancel()
	if err!=nil{
		fmt.Println("put to etcd faild,err:",err)
		return
	}

	//etcd Get
	ctx,cancel=context.WithTimeout(context.Background(),time.Second)
	//Get(ctx context.Context, key string, opts ...OpOption)中的第三个选项是clientv3.WithPrefix(),它可以给key定义一个前缀,方便整理
	resp,err:=cli.Get(ctx,"xiaonuo")
	cancel()
	if err!=nil{
		fmt.Println("get to etcd faild,err:",err)
		return
	}
	for _,ev:=range resp.Kvs{
		fmt.Println("key:",string(ev.Key),"----value:",string(ev.Value))
	}

	//etcd Watchwatch会派一个哨兵一直监听着key的value的变化(新增、修改、删除)
	//watch返回的是一个channel,<-chan WatchResponse
	//也可以设置WithTimeout()来让它监视一个固定时间或设置条件让其取消监视(不过还是建议让其一直监视着)
	rch:=cli.Watch(context.Background(),"xiaonuo")
	//从通道尝试取值,range会在没值的时候阻塞,有值后执行内部代码,然后循环回来继续阻塞等待下一个值
	for wresp:=range rch{
		for _,ev:=range wresp.Events{
			//type可以取到类型,是删除修改还是新增
			fmt.Println("type:",string(ev.Type),"----key:",string(ev.Kv.Key),"----value:",string(ev.Kv.Value))
		}
	}
}

etcd的续期

etcd可以设置续期,如果续期时间超过则直接数据失效

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cli, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:2379"},
      DialTimeout: time.Second,
   })
   if err != nil {
      log.Fatal(err)
   }
   defer cli.Close()
   //设置续期5秒
   resp, err := cli.Grant(context.TODO(), 5)
   if err != nil {
      log.Fatal(err)
   }
   // 将 k-v 设置到etcd
   _, err = cli.Put(context.TODO(), "root", "admin", clientv3.WithLease(resp.ID))
   if err != nil {
      log.Fatal(err)
   }
   // 若想一直有效,设置自动续期,关闭此程序后数据5s后消失
   ch, err := cli.KeepAlive(context.TODO(), resp.ID)
   if err != nil {
      log.Fatal(err)
   }
   for {
      //返回续期信息
      c := <-ch
      fmt.Println("c:", c)
   }

Elasticsearch和Kibana

1
2
https://www.elastic.co/cn/elasticsearch/
https://www.elastic.co/cn/kibana

要注意的是:es和kibana版本要一致,否则可能出现不兼容的问题

kibana是一个开源的分析和可视化平台,设计出来用于和Eleasticsearch一起工作。

它可以查看、搜索并和存储在Eleastucsearch索引中的数据进行交互。以各种图标、表格和地图的形式可视化数据

es解压后可以直接启动elasticsearch.bat。

kibana解压后要先更改配置文件kibana.yml,将里面的elasticsearch.hosts设置为自己的elasticsearch的地址,然后修改i18n.locale的值为zh-CN

logAgent实现源代码:

https://github.com/XiaoNuoZ/logAgentByConfig

Licensed under CC BY-NC-SA 4.0