您需要在生产者端使用proto#Marshal和sarama#ByteEncoder,在消费者端使用proto#Unmarshal。
制作人:
pixelToSend := &pixel.Pixel{SessionId: t.String()}
pixelToSendBytes,err := proto.Marshal(pixelToSend)
if err != nil {
log.Fatalln("Failed to marshal pixel:",err)
}
msg := &sarama.ProducerMessage{
Topic: topic,Value: sarama.ByteEncoder(pixelToSendBytes),}
消费者:
receivedPixel := &pixel.Pixel{}
err := proto.Unmarshal(msg.Value,receivedPixel)
if err != nil {
log.Fatalln("Failed to unmarshal pixel:",err)
}
log.Printf("Pixel received: %s",receivedPixel)
完整示例:
package main
import (
pixel "example/pixel"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
func main() {
topic := "your-topic-name"
brokerList := []string{"localhost:29092"}
producer,err := newSyncProducer(brokerList)
if err != nil {
log.Fatalln("Failed to start Sarama producer:",err)
}
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
pixelToSend := &pixel.Pixel{SessionId: t.String()}
pixelToSendBytes,err := proto.Marshal(pixelToSend)
if err != nil {
log.Fatalln("Failed to marshal pixel:",err)
}
msg := &sarama.ProducerMessage{
Topic: topic,}
producer.SendMessage(msg)
log.Printf("Pixel sent: %s",pixelToSend)
}
}
}()
signals := make(chan os.Signal,1)
signal.Notify(signals,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM)
partitionConsumer,err := newPartitionConsumer(brokerList,topic)
if err != nil {
log.Fatalln("Failed to create Sarama partition consumer:",err)
}
log.Println("Waiting for messages...")
for {
select {
case msg := <-partitionConsumer.Messages():
receivedPixel := &pixel.Pixel{}
err := proto.Unmarshal(msg.Value,receivedPixel)
if err != nil {
log.Fatalln("Failed to unmarshal pixel:",err)
}
log.Printf("Pixel received: %s",receivedPixel)
case <-signals:
log.Print("Received termination signal. Exiting.")
return
}
}
}
func newSyncProducer(brokerList []string) (sarama.SyncProducer,error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// TODO configure producer
producer,err := sarama.NewSyncProducer(brokerList,config)
if err != nil {
return nil,err
}
return producer,nil
}
func newPartitionConsumer(brokerList []string,topic string) (sarama.PartitionConsumer,error) {
conf := sarama.NewConfig()
// TODO configure consumer
consumer,err := sarama.NewConsumer(brokerList,conf)
if err != nil {
return nil,err
}
partitionConsumer,err := consumer.ConsumePartition(topic,sarama.OffsetOldest)
if err != nil {
return nil,err
}
return partitionConsumer,err
}
本文链接:https://www.f2er.com/3107898.html