In Go, you can create server and client streaming RPCs using the gRPC framework, which is designed for high-performance communication between services. Below is a comprehensive guide with an example of how to implement these types of RPCs.
Server streaming RPCs allow the server to send a stream of messages back to the client after receiving a single request message.
Client streaming RPCs allow the client to send a stream of messages to the server while receiving a single response message from the server.
// server.go
package main
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "path/to/your/proto/package"
)
type server struct {
pb.UnimplementedYourServiceServer
}
// Server streaming method
func (s *server) StreamNumber(req *pb.NumberRequest, stream pb.YourService_StreamNumberServer) error {
for i := 0; i < int(req.Count); i++ {
stream.Send(&pb.NumberResponse{Number: int32(i)})
}
return nil
}
// Client streaming method
func (s *server) CollectNumbers(stream pb.YourService_CollectNumbersServer) error {
sum := int32(0)
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.SumResponse{Sum: sum})
}
if err != nil {
return err
}
sum += req.Number
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterYourServiceServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
// client.go
package main
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
pb "path/to/your/proto/package"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Did not connect: %v", err)
}
defer conn.Close()
c := pb.NewYourServiceClient(conn)
// Client streaming example
stream, err := c.CollectNumbers(context.Background())
if err != nil {
log.Fatalf("Could not collect numbers: %v", err)
}
for i := 1; i <= 5; i++ {
stream.Send(&pb.NumberRequest{Number: int32(i)})
time.Sleep(time.Second)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("Could not receive: %v", err)
}
fmt.Printf("Sum: %d\n", res.Sum)
// Server streaming example
req := &pb.NumberRequest{Count: 5}
stream, err := c.StreamNumber(context.Background(), req)
if err != nil {
log.Fatalf("Could not stream numbers: %v", err)
}
for {
num, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Error while receiving: %v", err)
}
fmt.Println(num.Number)
}
}
How do I avoid rehashing overhead with std::set in multithreaded code?
How do I find elements with custom comparators with std::set for embedded targets?
How do I erase elements while iterating with std::set for embedded targets?
How do I provide stable iteration order with std::unordered_map for large datasets?
How do I reserve capacity ahead of time with std::unordered_map for large datasets?
How do I erase elements while iterating with std::unordered_map in multithreaded code?
How do I provide stable iteration order with std::map for embedded targets?
How do I provide stable iteration order with std::map in multithreaded code?
How do I avoid rehashing overhead with std::map in performance-sensitive code?
How do I merge two containers efficiently with std::map for embedded targets?