mac m1 中go使用kafka

dandan2年前程序开发2915

当前docker-comose:

version: '3'
services:
  Etcd:
    container_name: etcd3
    image: bitnami/etcd:${ETCD_VERSION}
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    privileged: true
    volumes:
      - ${ETCD_DIR}/data:/bitnami/etcd/data
    ports:
      - ${ETCD_PORT}:2379
      - 2380:2380
    networks:
      - dandan_net
  redis:
    container_name: redis
    image: redis:7.0
    ports:
      - 6379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    volumes:
      # 数据文件 - data files
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - dandan_net
  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8.1
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - dandan_net

  #消息队列 - Message queue
  kafka:
    container_name: kafka
    image: bitnami/kafka:3.4.0
    ports:
      - 9092:9092
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - dandan_net
    depends_on:
    - zookeeper
networks:
  dandan_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

运行后,进入容器

 docker exec -it 容器id /bin/bash

进入kafka目录

cd /opt/bitnami/kafka/

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log

go示例

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "normal_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行会报错,找不到hosts,需要将容器id添加到etc/hosts中

sudo vim /etc/hosts

然后增加一行127.0.0.1 容器id  ,然后保存


#在kafka容器里,bin目录下执行

#查看是否有未被消费的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1

#删除topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称


标签: gokafka

相关文章

wsl2安装docker+dify+xinference

1、wsl2安装ubuntu,这里指定版本安装一个新环境 wsl --install -d Ubuntu-22.042、安装完的ubuntu是在c盘的,放到其他...

MQTT协议使用

服务端因为使用的发布/订阅模式,似乎不需要自己写服务端,只要安装现成开源服务器即可,这里选了EMQX#到官网,选择服务器版本,下载安装,已centos7为例: #下载源码包 wget h...

camunda使用

1、 拉取镜像docker pull camunda/camunda-bpm-platform:7.17.02、配置并启动docker run -d ...

Docker使用篇

镜像:image容器:container运行镜像(如果本地有则会运行本地,本地没有会去下载镜像仓库)docker run 镜像运行镜像内系统的命令窗口(如果要退出,输入exit)d...

mac m1 pro 解决微信公众号本地调试问题(未成功)

最终未成功,买natapp先凑合用了,临时记录下nginx安装1、微信公众号后台,公众号设置,功能设置,网页授权域名,添加好该域名。(需要上传文件到根目录)2、natapp上购买vip隧道,9元/月;...

宝塔nginx配置允许各种文件的下载

location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|pdf|doc|docx|xls|xlsx|rar|zip|gz|7z|ppt|pptx|mp3|...

评论列表

test
2023-06-16 16:12:11

支持蛋总

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。