gRPC
Our transporter/grpc is developed upon gRPC, and implements Transporter
interface. You could use it for the communication between services on gRPC protocol.
Server
Options
Network(network string) ServerOption
To set communication protocol such as tcp.
Address(addr string) ServerOption
To set server's listening address.
Timeout(timeout time.Duration) ServerOption
To set the server-side timeout.
Logger(logger log.Logger) ServerOption
To set logger.
Middleware(m ...middleware.Middleware) ServerOption
To set middleware for gRPC server.
TLSConfig(c *tls.Config) ServerOption
To set TLS config.
UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption
To set interceptors for gRPC server.
StreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption
To set stream interceptors for gRPC server.
Options(opts ...grpc.ServerOption) ServerOption
To set some extra grpc.ServerOption
.
Implementation Details
NewServer()
func NewServer(opts ...ServerOption) *Server {
// grpc server default configuration
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
log: log.NewHelper(log.GetLogger()),
}
// apply opts
for _, o := range opts {
o(srv)
}
// convert middleware to grpc interceptor
unaryInts := []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}
streamInts := []grpc.StreamServerInterceptor{
srv.streamServerInterceptor(),
}
if len(srv.unaryInts) > 0 {
unaryInts = append(unaryInts, srv.unaryInts...)
}
if len(srv.streamInts) > 0 {
streamInts = append(streamInts, srv.streamInts...)
}
// convert UnaryInterceptor and StreamInterceptor to ServerOption
var grpcOpts = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(unaryInts...),
grpc.ChainStreamInterceptor(streamInts...),
}
// convert LTS config to ServerOption
if srv.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(srv.tlsConf)))
}
// convert srv.grpcOpts to ServerOption
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
// create grpc server
srv.Server = grpc.NewServer(grpcOpts...)
// create metadata server
srv.metadata = apimd.NewServer(srv.Server)
// set lis and endpoint
srv.err = srv.listenAndEndpoint()
// register these internal API
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
return srv
}
unaryServerInterceptor()
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// merge two ctx
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// get metadata from ctx
md, _ := grpcmd.FromIncomingContext(ctx)
// bind some information into ctx
replyHeader := grpcmd.MD{}
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
})
// set timeout
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
// execute handler
reply, err := h(ctx, req)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return reply, err
}
}
Usage
These are some basic usage of gRPC, you could refer to gRPC Docs for advanced examples.
Register gRPC Server
gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)
Set middleware in gRPC Server
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
logging.Server(),
),
)
Process Request in gRPC Middleware
if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}
Client
Options
WithEndpoint(endpoint string) ClientOption
To set the endpoint which the client will connect to.
WithTimeout(timeout time.Duration) ClientOption
To set the client-side timeout.
WithMiddleware(m ...middleware.Middleware) ClientOption
To set middleware.
WithDiscovery(d registry.Discovery) ClientOption
To set the discovery for gRPC client.
WithTLSConfig(c *tls.Config) ClientOption
To set TLS config.
WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption
To set interceptors for gRPC client.
WithOptions(opts ...grpc.DialOption) ClientOption
To set some extra grpc.ClientOption
.
Implementation Details
dial()
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// default options
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: wrr.Name,
logger: log.GetLogger(),
}
// apply opts
for _, o := range opts {
o(&options)
}
// convert middleware to grpc interceptor
ints := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout, options.filters),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
// client side balancer
grpcOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// To use service discovery
grpcOpts = append(grpcOpts,
grpc.WithResolvers(
discovery.NewBuilder(
options.discovery,
discovery.WithInsecure(insecure),
discovery.WithLogger(options.logger),
)))
}
if insecure {
// to disable transport security for connection
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
}
// TLS config
if options.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
unaryClientInterceptor()
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// bind some information into ctx
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
reqHeader: headerCarrier{},
filters: filters,
})
if timeout > 0 {
// set the timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
header := tr.RequestHeader()
keys := header.Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, header.Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
_, err := h(ctx, req)
return err
}
}
Usage
Client Connection
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)
Middleware
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
grpc.WithTimeout(3600 * time.Second),
grpc.WithMiddleware(
recovery.Recovery(),
validate.Validator(),
),
)
Service Discovery
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)