转载

kafka命令收录 路过 tg:hdkj8

记录下kafka的使用命令

topic

创建topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic

kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 (可以在topic创建时进行参数的指定)

修改

kafka-topics.sh --zookeeper localhost:2181 --alter --config retention.ms=0 --topic topic_name (对已经存在的topic进行config的修改)

删除

kafka-topics --delete --zookeeper localhost:2181 --topic application_mch

查询

kafka-topics.bat --list --zookeeper localhost:2181

kafka-topics.bat --zookeeper localhost:2181 --topic test --describe

kafka-topic.sh --zookeeper <zookeeper.connect> --list --describe (查看kafka集群中所有topic)

kafka-topic.sh --zookeeper <zookeeper.connect> --list --describe (查看kafka集群中所有topic)

kafka-topic.sh --zookeeper <zookeeper.connect> --describe --under-replicated-partitions(查看正在同步的分区(可能正在同步,也可能发生异常))

kafka-topic.sh --zookeeper <zookeeper.connect> --describe --unavalible-partitions(查看没有leader的分区)

kafka-topic.sh --zookeeper <zookeeper.connect> --describe --topics-with-overrides(查看一下覆盖了默认配置的分区)

kafka-topic.sh --zookeeper <zookeeper.connect> --describe --topic topic_name(查看指定topic的相关信息)

生产

kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic

消费

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --group test --from-beginning

# --from-beginning 为消息截取的点,重新消费中

# 查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list

集群消费

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

显示消费key

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
参数 值类型 说明 有效值
--topic string 被消费的topic
--whitelist string 正则表达式,指定要包含以供使用的主题的白名单
--partition integer 指定分区 除非指定’–offset’,否则从分区结束(latest)开始消费
--offset string 执行消费的起始offset位置 默认值:latest latest earliest
--consumer-property string 将用户定义的属性以key=value的形式传递给使用者
--consumer.config string 消费者配置属性文件 请注意,[consumer-property]优先于此配置
--formatter string 用于格式化kafka消息以供显示的类的名称 默认值:kafka.tools.DefaultMessageFormatter kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter
--property string 初始化消息格式化程序的属性 print.timestamp=true\false print.key=true\false print.value=true\false key.separator= line.separator= key.deserializer= value.deserializer=
--from-beginning 从存在的最早消息开始,而不是从最新消息开始
--max-messages integer 消费的最大数据量,若不指定,则持续消费下去
--timeout-ms integer 在指定时间间隔内没有消息可用时退出
--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
--bootstrap-server string 必需(除非使用旧版本的消费者),要连接的服务器
--key-deserializer string
--value-deserializer string
--enable-systest-events 除记录消费的消息外,还记录消费者的生命周期 (用于系统测试)
--isolation-level string 设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息 默认值:read_uncommitted
--group string 指定消费者所属组的ID
--blacklist string 要从消费中排除的主题黑名单
--csv-reporter-enabled 如果设置,将启用csv metrics报告器
--delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dir string 输出csv度量值 需与[csv-reporter-enable]配合使用
--zookeeper string 必需(仅当使用旧的使用者时)连接zookeeper的字符串。 可以给出多个URL以允许故障转移
正文到此结束