mac m1 中go使用kafka

dandan2年前程序开发2738

当前docker-comose:

version: '3'
services:
  Etcd:
    container_name: etcd3
    image: bitnami/etcd:${ETCD_VERSION}
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    privileged: true
    volumes:
      - ${ETCD_DIR}/data:/bitnami/etcd/data
    ports:
      - ${ETCD_PORT}:2379
      - 2380:2380
    networks:
      - dandan_net
  redis:
    container_name: redis
    image: redis:7.0
    ports:
      - 6379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    volumes:
      # 数据文件 - data files
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - dandan_net
  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8.1
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - dandan_net

  #消息队列 - Message queue
  kafka:
    container_name: kafka
    image: bitnami/kafka:3.4.0
    ports:
      - 9092:9092
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - dandan_net
    depends_on:
    - zookeeper
networks:
  dandan_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

运行后,进入容器

 docker exec -it 容器id /bin/bash

进入kafka目录

cd /opt/bitnami/kafka/

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log

go示例

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "normal_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行会报错,找不到hosts,需要将容器id添加到etc/hosts中

sudo vim /etc/hosts

然后增加一行127.0.0.1 容器id  ,然后保存


#在kafka容器里,bin目录下执行

#查看是否有未被消费的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1

#删除topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称


标签: gokafka

相关文章

使用ngrok进行内网穿透

1、在官网注册账号:https://ngrok.com/ 2、左侧导航Setup & Installtion,下载当前电脑的应用3、左侧导航Your Authtoken,获取token配置命令...

mysql5.6导入mysql8的坑

1、虽然在mysql8里设置数据库的排序规则是utf8mb4_general_ci ,但是导入后,却都被变成了utf8mb4_0900_ai_ci解决:用navicae导出的,表语句中没有COLLAT...

通过frp进行内网穿透

因为ngrok会定义刷新域名,有点限制,如果没有服务器域名,用他合适,如果有自己的服务器、域名,则用frp:按照https://blog.csdn.net/mirage003/article/deta...

go相关踩坑

安装swag根据官方教程,执行下方后,GOPATH/bin中没有swag程序,应该要先配置GOBIN环境变量,有了后再执行。 其他根据官网来再下两个gin-swag和filesgo inst...

mac m1使用docker mysql踩坑

1、etcd版本用3.4.242、mysql镜像要用:mysql/mysql-server:8.0.323、mysql因为是8的版本,启动后本地没权限连接,需要进容器创建用户和添加权限:# ...

宝塔ftp连不上

1、先检查端口是否有开放,涉及宝塔端口 和 云服务器的安全组;2、对Pure-ftpd的配置文件中,大概180来行的ForcePassiveIP  开放,并且把ip改成服务器的外网ip;...

评论列表

test
2023-06-16 16:12:11

支持蛋总

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。