2018-12-30

Kafka

N_TensorFlow OpenCV

官页 官网文档 官下载页 Kafka core apis Kafka史上最详细原理总结上

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。

in sort 一个分布式高并发的MQ(消息队列)

kafka拓扑结构

名词 解释 Producer 消息的生成者 Consumer 消息的消费者 ConsumerGroup 消费者组, 可以并行消费Topic中的partition的消息 Broker 缓存代理, Kafka集群中的一台或多台服务器统称broker.保存主题消息的代理服务器,消费者从这里读取数据

Topic Kafka处理资源的消息源(feeds of messages)的不同分类 Partition Topic物理上的分组, 一个topic可以分为多个partion,每个partion是一个有序的队列; partion中每条消息都会被分配一个有序的 Id(offset) Message 消息, 是通信的基本单位, 每个producer可以向一个topic(主题)发布一些消息 Producers 消息和数据生成者, 向Kafka的一个topic发布消息的 过程叫做producers Consumers 消息和数据的消费者, 订阅topic并处理其发布的消费过程叫做consumers

Broker 节点

Kafka 集群包含一个或多个服务器,服务器节点称为broker;

broker存储topic的数据; 如果某topic有N个partition, 集群有N个broker, 那么每个broker存储该topic的一个partition;

如果某topic有N个partition, 集群有(N+M)个broker, 那么其中有N个broker存储该topic的一个partition, 剩下的M个broker不存储该topic的partition数据; 换言之topic是按partition拆分的存在broker中

如果某topic有N个partition,集群中broker数目少于N个, 那么一个broker存储该topic的一个或多个partition; 在实际生产环境中, 尽量避免这种情况的发生, 这种情况容易导致Kafka集群数据不均衡;

简而言之, Broker是读/写主题(topic) 分区(partition) 数据的服务器节点, 当然只有leader有写权限, 其他follower同步数据

主题中的分区(partition)

消息: Kafka 中的数据单元被称为消息, 也被称为记录, 可以把它看作数据库表中某一行的记录;

批次: 为了提高效率, 消息会分批次写入 Kafka, 批次就代指的是一组消息;

主题: 消息的种类称为 主题(Topic),可以说一个主题代表了一类消息; 相当于是对消息进行分类; 主题就像是数据库中的表;

分区: 主题可以被分为若干个分区(partition), 同一个主题中的分区可以不在一个机器上, 有可能会部署在多个机器上, 由此来实现 kafka 的伸缩性, 单一主题中的分区有序, 但是无法保证主题中所有的分区有序

简而言之, 一个主题划分多个分区, 一条消息会以某种(Key和partition机制)规则, 选择其中的一个分区写入, 以提升IO性能 (因此每个分区的消息是不同的)

分区可以分布在不同的服务器上, 可以有多个冗灾备份, 通过架构多个服务器节点来实现性能的提升

关于offset: Kafka 通过 offset 保证消息在分区内的顺序, offset 的顺序不跨分区, 因此只保证在同一个分区内的消息是有序的;

细节问题

1)一个Topic的Partition数量大于等于Broker的数量, 可以提高吞吐率; 2)同一个Partition的Replica尽量分散到不同的机器, 高可用;

当 add a new partition 的时候, partition里面的message不会重新进行分配, 原来的partition里面的message数据不会变, 新加的这个partition刚开始是空的, 随后进入这个topic的message就会重新参与所有partition的load balance

Partition Replica:每个partition可以在其他的kafka broker节点上存副本, 以便某个kafka broker节点宕机不会影响这个kafka集群 存replica副本的方式是按照kafka broker的顺序存; 例如有5个kafka broker节点, 某个topic有3个partition, 每个partition存2个副本

消息流程

生产者定期向主题发送消息;

Kafka代理存储为该特定主题配置的分区中的所有消息; 它确保消息在分区之间平等共享; 如果生产者发送两个消息并且有两个分区, Kafka将在第一分区中存储一个消息, 在第二分区中存储第二消息;

消费者订阅特定主题;

一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移, 并且还将偏移保存在Zookeeper系统中;

消费者将定期请求Kafka(如100 Ms)新消息;

一旦Kafka收到来自生产者的消息, 它将这些消息转发给消费者;

消费者将收到消息并进行处理;

一旦消息被处理,消费者将向Kafka代理(broker)发送确认;

一旦Kafka收到确认,它将偏移更改为新值, 并在Zookeeper中更新它; 由于偏移在Zookeeper中维护, 消费者可以正确地读取下一封邮件, 即使在服务器暴力期间;

以上流程将重复, 直到消费者停止请求;

消费者可以随时回退/跳到所需的主题偏移量, 并阅读所有后续消息;

细节问题

  • producer 先把message发送到partition leader,再由leader发送给其他partition follower; 如果让producer发送给每个replica那就太慢了

  • Leader/和选举 每个partition有多个副本, 其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition;

Follower跟随Leader, 所有写请求都通过Leader路由, 数据变更会广播给所有Follower, Follower与Leader保持数据同步;如果Leader失效,则从Follower中选举出一个新的Leader; 当Follower与Leader挂掉, 卡住或者同步太慢,leader会把这个follower从”in sync replicas”(ISR)列表中删除, 重新创建一个Follower;

  • 怎样处理某个Replica不工作的情况?

如果这个部工作的partition replica不在ack列表中(不是partition leader), 就是producer在发送消息到partition leader上,partition 的leader再向partition follower发送message没有响应而已, 这个不会影响整个系统, 也不会有什么问题;

如果这个不工作的partition replica在ack列表中的话(即partition leader), producer发送的message的时候会等待这个不工作的partition replca写message成功, 但是会等到time out, 然后返回失败因为某个ack列表中的partition replica没有响应, 此时kafka会自动的把这个部工作的partition replica从ack列表中移除, 以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了;

  • 消息投递可靠性

一个消息如何算投递成功, Kafka提供了三种模式:

第一种是啥都不管, 发送出去就当作成功, 这种情况当然不能保证消息成功投递到broker;

第二种是Master-Slave模型, 只有当Master和所有Slave都接收到消息时, 才算投递成功, 这种模型提供了最高的投递可靠性, 但是损伤了性能;

第三种模型, 即只要Master确认收到消息就算投递成功;

实际使用时, 根据应用特性选择, 绝大多数情况下都会中和可靠性和性能选择第三种模型

消息在broker上的可靠性, 因为消息会持久化到磁盘上, 所以如果正常stop一个broker, 其上的数据不会丢失; 但是如果不正常stop, 可能会使存在页面缓存来不及写入磁盘的消息丢失, 这可以通过配置flush页面缓存的周期, 阈值缓解, 但是同样会频繁的写磁盘会影响性能, 又是一个选择题, 根据实际情况配置;

  • Kafka高效文件存储设计特点

Kafka把topic中一个parition大文件分成多个小文件段, 通过多个小文件段, 就容易定期清除或删除已经消费完文件, 减少磁盘占用; 通过索引信息可以快速定位message和确定response的最大大小; 通过index元数据全部映射到memory, 可以避免segment file的IO磁盘操作; 通过索引文件稀疏存储, 可以大幅降低index文件元数据占用空间大小;

Kafka 与 Zookeeper

  • Zookeeper 协调控制

管理broker与consumer的动态加入与离开; (Producer不需要管理, 随便一台计算机都可以作为Producer向Kakfa Broker发消息)

触发负载均衡, 当broker或consumer加入或离开时会触发负载均衡算法, 使得一个consumer group内的多个consumer的消费负载平衡; (因为一个comsumer消费一个或多个partition, 一个partition只能被一个consumer消费)

维护消费关系及每个partition的消费信息;

  • Zookeeper上的细节:

每个broker启动后会在zookeeper上注册一个临时的broker registry, 包含broker的ip地址和端口号, 所存储的topics和partitions信息;

每个consumer group关联一个临时的owner registry和一个持久的offset registry; 对于被订阅的每个partition包含一个owner registry, 内容为订阅这个partition的consumer id; 同时包含一个offset registry, 内容为上一次订阅的offset;

Quickstart

quickstart w3c Apache Kafka 教程

Step 1: Download the code

tar -xzf kafka_2.12-2.5.0.tgz cd kafka_2.12-2.5.0

下载解压 里面就已经包含了 Zookeeper了…

./config 配置目录 ./bin 一些操作脚本

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

先启动 zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties 带 -daemon 参数; 则后台启动

再启动 kafka broker bin/kafka-server-start.sh config/server.properties

Step 3: Create a topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-ka

Created topic test.

replication-factor 1 //replication 的数量 partitions //partition 的数量 使用kafka-topics.sh的--describe参数查看一下Topic为kafka的详情 Leader是指负责这个分区所有读写的节点; Replicas是指这个分区所在的所有节点(不论它是否活着); ISR是Replicas的子集, 代表存有这个分区信息而且当前活着的节点;

Step 4: Send some messages

// 启动生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test This is a message This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output. //启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message This is another message

Step 6: Setting up a multi-broker cluster

配置集群

server 配置文件

First we make a config file for each of the brokers (首先为每一个broker 创建配置文件)

//复制2份 cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties

Now edit these new files and set the following properties:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

roker.id 在集群中必须唯一 listeners 监听地址 log.dirs 日志数据目录

启动

bin/kafka-server-start.sh config/server-1.properties bin/kafka-server-start.sh config/server-2.properties

create topics

官方例子是 一个partitions, 三个 replication

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看该主题的brokers

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

[root@localhost kafka_2.12-2.5.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic: my-replicated-topic      PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: my-replicated-topic      Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

解析 第一行..略

主题有2个partitions 所以有两行 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

拿partition:0这个分区来说,**(Leader: 1 )该分区的Leader是server1,(Replicas: 1,2,0)分布在id为1,2,0这三个节点上,(Isr:1,2,0)**而且这三个节点都活着;

Leader是指负责这个分区所有读写的节点; Replicas是指这个分区所在的所有节点(不论它是否活着); ISR是Replicas的子集, 代表存有这个分区信息而且当前活着的节点;或者说指向有该Partition的备份且活着的节点

这些角色貌似都是自动分配的…

//列出主题列表 ./bin/kafka-topics.sh —list —zookeeper localhost:2181

架构图

效果图

把第0的server 停掉 bin/kafka-server-start.sh config/server.properties

//再查下 (9092 听了 , 去9093查) bin/kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic my-replicated-topic

[root@localhost kafka_2.12-2.5.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic my-replicated-topic
Topic: my-replicated-topic      PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 1    Leader: 1       Replicas: 0,1,2 Isr: 1,2

第0个, 就不在ISR列表里面了.

附录 kafka server.properties 配置

也可以参考 官方文档

broker.id =0
每一个broker在集群中的唯一表示, 要求是正数; 当该服务器的IP地址发生改变时, broker.id没有变化, 则不会影响consumers的消息情况
log.dirs=/data/kafka-logs
kafka数据的存放地址, 多个地址的话用逗号分割 /data/kafka-logs-1, /data/kafka-logs-2
port =9092
broker server服务端口
message.max.bytes =6525000
表示消息体的最大大小, 单位是字节
num.network.threads =4
broker处理消息的最大线程数, 一般情况下不需要去修改
num.io.threads =8
broker处理磁盘IO的线程数, 数值应该大于你的硬盘数
background.threads =4
一些后台任务处理的线程数, 例如过期消息文件的删除等, 一般情况下不需要去做修改
queued.max.requests =500
等待IO线程处理的请求队列最大数, 若是等待IO的请求超过这个数值, 那么会停止接受外部消息, 应该是一种自我保护机制; 
host.name
broker的主机地址, 若是设置了, 那么会绑定到这个地址上, 若是没有, 会绑定到所有的接口上, 并将其中之一发送到ZK, 一般不设置
socket.send.buffer.bytes=100*1024
socket的发送缓冲区, socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes =100*1024
socket的接受缓冲区, socket的调优参数SO_RCVBUFF
socket.request.max.bytes =100*1024*1024
socket请求的最大数值, 防止serverOOM, message.max.bytes必然要小于socket.request.max.bytes, 会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024
topic的分区是以一堆segment文件存储的, 这个控制每个segment的大小, 会被topic创建时的指定参数覆盖
log.roll.hours =24*7
这个参数会在日志segment没有达到log.segment.bytes设置的大小, 也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
日志清理策略选择有: delete和compact主要针对过期数据的处理, 或是日志文件达到限制的额度, 会被 topic创建时的指定参数覆盖
log.retention.minutes=60*24 # 一天后删除
数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据, 也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes任意一个达到要求, 都会执行删除, 会被topic创建时的指定参数覆盖
log.retention.bytes=-1
topic每个分区的最大文件大小, 一个topic的大小限制 = 分区数*log.retention.bytes; -1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求, 都会执行删除, 会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=5minutes
文件大小检查的周期时间, 是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false
是否开启日志压缩
log.cleaner.threads = 2
日志压缩运行的线程数
log.cleaner.io.max.bytes.per.second=None
日志压缩时候处理的最大大小
log.cleaner.dedupe.buffer.size=500*1024*1024
日志压缩去重时候的缓存空间, 在空间允许的情况下, 越大越好
log.cleaner.io.buffer.size=512*1024
日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms =15000
检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5
日志清理的频率控制, 越大意味着更高效的清理, 同时会存在一些空间上的浪费, 会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day
对于压缩的日志保留的最长时间, 也是客户端消费消息的最长时间, 同log.retention.minutes的区别在于一个控制未压缩数据, 一个控制压缩后的数据; 会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024
对于segment日志的索引文件大小限制, 会被topic创建时的指定参数覆盖
log.index.interval.bytes =4096
当执行一个fetch操作后, 需要一定的空间来扫描最近的offset大小, 设置越大, 代表扫描速度越快, 但是也更好内存, 一般情况下不需要搭理这个参数
log.flush.interval.messages=None
log文件"sync"到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性""性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms =3000
检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms = None
仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.delete.delay.ms =60000
文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
控制上次固化硬盘的时间点, 以便于数据恢复一般不需要去修改
auto.create.topics.enable =true
是否允许自动创建topic, 若是false, 就需要通过命令创建topic
default.replication.factor =1
是否允许自动创建topic, 若是false, 就需要通过命令创建topic
num.partitions =1
每个topic的分区个数, 若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
 
 
########### 以下是kafka中Leader,replicas配置参数
 
controller.socket.timeout.ms =30000
partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size=10
partition leader与replicas数据同步时,消息的队列尺寸
replica.lag.time.max.ms =10000
replicas响应partition leader的最长等待时间, 若是超过这个时间, 就将replicas列入ISR(in-sync replicas), 并认为它是死的, 不会再加入管理中
replica.lag.max.messages =4000
如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
##到其他follower中.
##在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.socket.timeout.ms=30*1000
follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes=64*1024
leader复制时候的socket缓存大小
replica.fetch.max.bytes =1024*1024
replicas每次获取数据的最大大小
replica.fetch.wait.max.ms =500
replicas同leader之间通信的最大等待时间, 失败了会重试
replica.fetch.min.bytes =1
fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
num.replica.fetchers=1
leader进行复制的线程数, 增大这个数值会增加follower的IO
replica.high.watermark.checkpoint.interval.ms =5000
每个replica检查是否将最高水位进行固化的频率
controlled.shutdown.enable =false
是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader, 并转移到其他broker
controlled.shutdown.max.retries =3
控制器关闭的尝试次数
controlled.shutdown.retry.backoff.ms =5000
每次关闭尝试的时间间隔
leader.imbalance.per.broker.percentage =10
leader的不平衡比例, 若是超过这个数值, 会对分区进行重新的平衡
leader.imbalance.check.interval.seconds =300
检查leader是否不平衡的时间间隔
offset.metadata.max.bytes
客户端保留offset信息的最大空间大小
kafka中zookeeper参数配置
 
zookeeper.connect = localhost:2181
zookeeper集群的地址, 可以是多个, 多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=6000
ZooKeeper的最大超时时间, 就是心跳的间隔, 若是没有反映, 那么认为已经死了, 不易过大
zookeeper.connection.timeout.ms =6000
ZooKeeper的连接超时时间
zookeeper.sync.time.ms =2000