1、简介
srteam 顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream ,源源不断地推送数据。
2、四种流模式
2.1 简单模式(Simple RPC)
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,所以不再详细介绍。
2.2 服务端数据流模式(Server-side streaming RPC)
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。
2.3 客户端数据流模式(Client-side streaming RPC)
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。
2.4 双向数据流模式(Bidirectional streaming RPC)
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。
3、定义protobuf文件
syntax = "proto3";
option go_package = "../proto";
service Greeter{
// 服务端流模式
rpc GetStream(StreamReqData) returns(stream StreamResData);
// 客户端流模式
rpc PutStream(stream StreamReqData)returns(StreamResData);
// 双向流模式
rpc AllStream(stream StreamReqData)returns(stream StreamResData);
}
message StreamReqData{
string data = 1;
}
message StreamResData{
string data = 1;
}
4、生成go的rpc文件
protoc -I . goods.proto --go_out=plugins=grpc:.
服务端
server.go
package main
import (
"demo/stream_grpc_test/proto"
"fmt"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
const PORT = "0.0.0.0:8080"
type Server struct {
}
func (s *Server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
i := 0
for {
i++
_ = res.Send(&proto.StreamResData{Data: fmt.Sprintf("%v", time.Now().Unix())})
time.Sleep(1 * time.Second)
if i > 10 {
break
}
}
return nil
}
func (s *Server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
if tem, err := cliStr.Recv(); err == nil {
fmt.Println(tem)
} else {
fmt.Println("break, err :", err)
break
}
}
return nil
}
func (s *Server) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
data, _ := allStr.Recv()
fmt.Println(data)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
_ = allStr.Send(&proto.StreamResData{
Data: "jason",
})
time.Sleep(1 * time.Second)
}
}()
wg.Wait()
return nil
}
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
s := grpc.NewServer()
proto.RegisterGreeterServer(s, &Server{})
err = s.Serve(lis)
if err != nil {
panic(err)
}
}
服务端流模式的客户端
client.go
package main
import (
"context"
"demo/stream_grpc_test/proto"
"fmt"
"google.golang.org/grpc"
"time"
)
const ADDRESS = "127.0.0.1:8080"
func main() {
conn, err := grpc.Dial(ADDRESS, grpc.WithInsecure())
if err != nil {
panic(err.Error())
}
defer func() {
_ = conn.Close()
}()
c := proto.NewGreeterClient(conn)
res, err := c.GetStream(context.Background(), &proto.StreamReqData{Data: "aaa"})
if err != nil {
panic(err.Error())
}
for {
getData, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(getData)
}
time.Sleep(30 * time.Second)
}
客户端流模式的客户端
client.go
package main
import (
"context"
"demo/stream_grpc_test/proto"
"google.golang.org/grpc"
"time"
)
const ADDRESS = "127.0.0.1:8080"
func main() {
conn, err := grpc.Dial(ADDRESS, grpc.WithInsecure())
if err != nil {
panic(err.Error())
}
defer func() {
_ = conn.Close()
}()
c := proto.NewGreeterClient(conn)
putRes, err := c.PutStream(context.Background())
for i := 0; i < 10; i++ {
_ = putRes.Send(&proto.StreamReqData{Data: "zhou"})
time.Sleep(1 * time.Second)
}
time.Sleep(30 * time.Second)
}
双向流模式的客户端
client.go
package main
import (
"context"
"demo/stream_grpc_test/proto"
"fmt"
"google.golang.org/grpc"
"time"
)
const ADDRESS = "127.0.0.1:8080"
func main() {
conn, err := grpc.Dial(ADDRESS, grpc.WithInsecure())
if err != nil {
panic(err.Error())
}
defer func() {
_ = conn.Close()
}()
c := proto.NewGreeterClient(conn)
allStr, err := c.AllStream(context.Background())
if err != nil {
panic(err.Error())
}
go func() {
for {
data, _ := allStr.Recv()
fmt.Println("接收数据:" + (*data).Data)
}
}()
go func() {
for {
err = allStr.Send(&proto.StreamReqData{Data: "我来自发送"})
if err != nil {
fmt.Println(err.Error())
break
}
time.Sleep(1 * time.Second)
}
}()
time.Sleep(30 * time.Second)
}
|