kratos http原理

概念

kratos 为了使http协议的逻辑代码和grpc的逻辑代码使用同一份,选择了基于protobuf的IDL文件使用proto插件生成辅助代码的方式。

protoc http插件的地址为:https://github.com/go-kratos/kratos/tree/main/cmd/protoc-gen-go-http

示例

 1syntax = "proto3";
 2
 3package helloworld;
 4
 5option go_package = "test/helloworld;helloworld";
 6option java_multiple_files = true;
 7option java_package = "helloworld";
 8import "google/api/annotations.proto";
 9
10service Greeter {
11	rpc SayHello (HelloRequest) returns (HelloReply)  {
12		  option (google.api.http) = {
13			  post: "/helloworld", // 声明路由
14			  body: "*"
15		  };
16	}
17  }
18  
19message HelloRequest {
20	string name = 1;
21}
22  
23message HelloReply {
24	string msg = 1;
25}
26  

使用kratos proto client xxx 生成的代码为:

 1// Code generated by protoc-gen-go-http. DO NOT EDIT.
 2// versions:
 3// - protoc-gen-go-http v2.4.0
 4// - protoc             v3.19.4
 5// source: helloworld/helloworld.proto
 6
 7package helloworld
 8
 9import (
10	context "context"
11	http "github.com/go-kratos/kratos/v2/transport/http"
12	binding "github.com/go-kratos/kratos/v2/transport/http/binding"
13)
14
15// This is a compile-time assertion to ensure that this generated file
16// is compatible with the kratos package it is being compiled against.
17var _ = new(context.Context)
18var _ = binding.EncodeURL
19
20const _ = http.SupportPackageIsVersion1
21
22const OperationGreeterSayHello = "/helloworld.Greeter/SayHello"
23
24type GreeterHTTPServer interface {
25	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
26}
27
28func RegisterGreeterHTTPServer(s *http.Server, srv GreeterHTTPServer) {
29	r := s.Route("/")
30	r.POST("/helloworld", _Greeter_SayHello0_HTTP_Handler(srv))
31}
32
33func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
34	return func(ctx http.Context) error {
35		var in HelloRequest
36		if err := ctx.Bind(&in); err != nil {
37			return err
38		}
39		http.SetOperation(ctx, OperationGreeterSayHello)
40		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
41			return srv.SayHello(ctx, req.(*HelloRequest))
42		})
43		out, err := h(ctx, &in)
44		if err != nil {
45			return err
46		}
47		reply := out.(*HelloReply)
48		return ctx.Result(200, reply)
49	}
50}
51
52type GreeterHTTPClient interface {
53	SayHello(ctx context.Context, req *HelloRequest, opts ...http.CallOption) (rsp *HelloReply, err error)
54}
55
56type GreeterHTTPClientImpl struct {
57	cc *http.Client
58}
59
60func NewGreeterHTTPClient(client *http.Client) GreeterHTTPClient {
61	return &GreeterHTTPClientImpl{client}
62}
63
64func (c *GreeterHTTPClientImpl) SayHello(ctx context.Context, in *HelloRequest, opts ...http.CallOption) (*HelloReply, error) {
65	var out HelloReply
66	pattern := "/helloworld"
67	path := binding.EncodeURL(pattern, in, false)
68	opts = append(opts, http.Operation(OperationGreeterSayHello))
69	opts = append(opts, http.PathTemplate(pattern))
70	err := c.cc.Invoke(ctx, "POST", path, in, &out, opts...)
71	if err != nil {
72		return nil, err
73	}
74	return &out, err
75}

开启一个grpc及http服务:

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log"
 7	"test/helloworld"
 8
 9	"github.com/go-kratos/kratos/v2"
10	"github.com/go-kratos/kratos/v2/middleware/recovery"
11	"github.com/go-kratos/kratos/v2/transport/grpc"
12	"github.com/go-kratos/kratos/v2/transport/http"
13)
14
15type server struct {
16	helloworld.UnimplementedGreeterServer
17}
18
19func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
20	return &helloworld.HelloReply{Msg: fmt.Sprintf("Hello %+v", in.Name)}, nil
21}
22
23func main() {
24	s := &server{}
25	httpSrv := http.NewServer(
26		http.Address(":8000"),
27		http.Middleware(
28			recovery.Recovery(),
29		),
30	)
31	grpcSrv := grpc.NewServer(
32		grpc.Address(":9000"),
33		grpc.Middleware(
34			recovery.Recovery(),
35		),
36	)
37    
38	helloworld.RegisterGreeterServer(grpcSrv, s)
39	helloworld.RegisterGreeterHTTPServer(httpSrv, s)
40
41	app := kratos.New(
42		kratos.Name("test"),
43		kratos.Server(
44			httpSrv,
45			grpcSrv,
46		),
47	)
48
49	if err := app.Run(); err != nil {
50		log.Fatal(err)
51	}
52}

http client:

 1package main
 2
 3import (
 4	"context"
 5	"log"
 6	"test/helloworld"
 7
 8	"github.com/go-kratos/kratos/v2/middleware/recovery"
 9	transhttp "github.com/go-kratos/kratos/v2/transport/http"
10)
11
12func main() {
13	callHTTP()
14}
15
16func callHTTP() {
17	conn, err := transhttp.NewClient(
18		context.Background(),
19		transhttp.WithMiddleware(
20			recovery.Recovery(),
21		),
22		transhttp.WithEndpoint("127.0.0.1:8000"),
23	)
24	if err != nil {
25		panic(err)
26	}
27	defer conn.Close()
28	client := helloworld.NewGreeterHTTPClient(conn)
29	reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
30	if err != nil {
31		log.Fatal(err)
32	}
33	log.Printf("[http] SayHello %s\n", reply.Msg)
34}

http server端实现原理

核心流程为下图 :

http

首先新建一个struct 并实现 http_pb.go种 GreeterHTTPServer interface 的方法,GreeterHTTPServer的命名方式为protobuf文件中的 service+HTTPServer,interface的方法为protobuf中使用google.api.http生命http路由所有的method。

然后使用RegisterGreeterHTTPServer方法把服务注册进去。大体的流程如下:

 1const OperationGreeterSayHello = "/helloworld.Greeter/SayHello"
 2
 3func RegisterGreeterHTTPServer(s *http.Server, srv GreeterHTTPServer) {
 4	r := s.Route("/")
 5	r.POST("/helloworld", _Greeter_SayHello0_HTTP_Handler(srv)) // 注册路由
 6}
 7
 8func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
 9	return func(ctx http.Context) error {
10		var in HelloRequest // protobuf 中声明的request
11 		if err := ctx.Bind(&in); err != nil { // 把http的参数绑定到 in
12			return err
13		}
14		http.SetOperation(ctx, OperationGreeterSayHello) // 设置Operation 和grpc一值,用于middleware select 等
15		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
16			return srv.SayHello(ctx, req.(*HelloRequest)) // 这个方法也就是上文提到的GreeterHTTPServer接口的方法,也就是我们自己实现的struct server里的SayHello方法
17		}) // 使用责任链模式middleware 这里没有任何中间件
18		out, err := h(ctx, &in) // 执行
19		if err != nil {
20			return err
21		}
22		reply := out.(*HelloReply) 
23		return ctx.Result(200, reply) 
24	}
25}

什么事责任链模式?

https://www.zhaohaiyu.com/post/designmode/behavioral/#%E8%B4%A3%E4%BB%BB%E9%93%BE%E6%A8%A1%E5%BC%8F

上段代码中的POST方法为:

代码在https://github.com/go-kratos/kratos/blob/main/transport/http/router.go#L76

 1func (r *Router) POST(path string, h HandlerFunc, m ...FilterFunc) {
 2	r.Handle(http.MethodPost, path, h, m...) // MethodPost = POST net/http下的常量
 3}
 4
 5// h 为上段xxx_http_pb.go代码中_Greeter_SayHello0_HTTP_Handler的返回值
 6func (r *Router) Handle(method, relativePath string, h HandlerFunc, filters ...FilterFunc) {
 7	next := http.Handler(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
 8		ctx := r.pool.Get().(Context)
 9		ctx.Reset(res, req) // 把 net/http的http.ResponseWriter 和*http.Request 设置ctx中
10		if err := h(ctx); err != nil { // 执行h 
11            r.srv.ene(res, req, err) // 如果出错了 执行 ene(EncodeErrorFunc)
12		}
13		ctx.Reset(nil, nil)
14		r.pool.Put(ctx)
15	}))
16	next = FilterChain(filters...)(next)
17	next = FilterChain(r.filters...)(next) // 添加filter 责任链模式
18	r.srv.router.Handle(path.Join(r.prefix, relativePath), next).Methods(method) // router 为 mux的router 把方法注册到路由中
19}

当我们访问 path.Join(r.prefix, relativePath)也就是/helloworld 时,会执行上段代码中的next方法,next是一个责任链。

核心为会执行_Greeter_SayHello0_HTTP_Handler方法,

如果没发生错误,执行ctx.Result(200, reply)

 1type wrapper struct {
 2	router *Router
 3	req    *http.Request
 4	res    http.ResponseWriter
 5	w      responseWriter
 6}
 7
 8func (c *wrapper) Result(code int, v interface{}) error {
 9	c.w.WriteHeader(code)
10	return c.router.srv.enc(&c.w, c.req, v)
11}

enc也就是EncodeResponseFunc, 为kratos预留的返回值函数

1type EncodeResponseFunc func(http.ResponseWriter, *http.Request, interface{}) error

kratos提供了默认的EncodeResponseFunc

 1func DefaultResponseEncoder(w http.ResponseWriter, r *http.Request, v interface{}) error {
 2	if v == nil {
 3		return nil
 4	}
 5	if rd, ok := v.(Redirector); ok { // 检查有无Redirect方法,如果实现了interface 为跳转路由 也就是http的301 302等
 6		url, code := rd.Redirect()
 7		http.Redirect(w, r, url, code) // 跳转
 8		return nil
 9	}
10	codec, _ := CodecForRequest(r, "Accept") // 查看需要返回的参数类型 比如json
11	data, err := codec.Marshal(v) // 把数据Marshal成[]byte
12	if err != nil {
13		return err
14	}
15	w.Header().Set("Content-Type", httputil.ContentType(codec.Name())) // 设置header
16	_, err = w.Write(data) // 写数据
17	if err != nil {
18		return err
19	}
20	return nil
21}

如果没发生错误,执行ene,也就是EncodeErrorFunc, 为kratos预留的错误返回值删除

1type EncodeErrorFunc func(http.ResponseWriter, *http.Request, error)

kratos提供了默认的EncodeErrorFunc

 1func DefaultErrorEncoder(w http.ResponseWriter, r *http.Request, err error) {
 2	se := errors.FromError(err) // 把error变成自定义的实现error的结构体
 3	codec, _ := CodecForRequest(r, "Accept") // 查看需要返回的参数类型 比如json
 4	body, err := codec.Marshal(se)
 5	if err != nil {
 6		w.WriteHeader(http.StatusInternalServerError)
 7		return
 8	}
 9	w.Header().Set("Content-Type", httputil.ContentType(codec.Name()))
10	w.WriteHeader(int(se.Code)) // 写入 error中的code
11	_, _ = w.Write(body) // 返回错误信息
12}

http client端实现原理

在上传的代码中http client的部分为

 1type GreeterHTTPClient interface {
 2	SayHello(ctx context.Context, req *HelloRequest, opts ...http.CallOption) (rsp *HelloReply, err error)
 3}
 4
 5type GreeterHTTPClientImpl struct { // 实现 GreeterHTTPClient 接口
 6	cc *http.Client
 7}
 8
 9func NewGreeterHTTPClient(client *http.Client) GreeterHTTPClient {
10	return &GreeterHTTPClientImpl{client}
11}
12
13func (c *GreeterHTTPClientImpl) SayHello(ctx context.Context, in *HelloRequest, opts ...http.CallOption) (*HelloReply, error) {
14	var out HelloReply // 返回值
15	pattern := "/helloworld" 
16	path := binding.EncodeURL(pattern, in, false) // 整理path 传入in 是由于可能有path参数或者query
17	opts = append(opts, http.Operation(OperationGreeterSayHello))
18	opts = append(opts, http.PathTemplate(pattern))
19	err := c.cc.Invoke(ctx, "POST", path, in, &out, opts...) // 访问接口
20	if err != nil {
21		return nil, err
22	}
23	return &out, err
24}

上段代码中的Invoke方法为:

代码在https://github.com/go-kratos/kratos/blob/main/transport/http/client.go#L192

 1func (client *Client) Invoke(ctx context.Context, method, path string, args interface{}, reply interface{}, opts ...CallOption) error {
 2	var (
 3		contentType string
 4		body        io.Reader
 5	)
 6	c := defaultCallInfo(path)
 7	for _, o := range opts {
 8		if err := o.before(&c); err != nil {
 9			return err
10		}
11	}
12	if args != nil {
13		data, err := client.opts.encoder(ctx, c.contentType, args)
14		if err != nil {
15			return err
16		}
17		contentType = c.contentType
18		body = bytes.NewReader(data)
19	}
20	url := fmt.Sprintf("%s://%s%s", client.target.Scheme, client.target.Authority, path)
21	req, err := http.NewRequest(method, url, body)
22	if err != nil {
23		return err
24	}
25	if contentType != "" {
26		req.Header.Set("Content-Type", c.contentType)
27	}
28	if client.opts.userAgent != "" {
29		req.Header.Set("User-Agent", client.opts.userAgent)
30	}
31	ctx = transport.NewClientContext(ctx, &Transport{
32		endpoint:     client.opts.endpoint,
33		reqHeader:    headerCarrier(req.Header),
34		operation:    c.operation,
35		request:      req,
36		pathTemplate: c.pathTemplate,
37	})
38	return client.invoke(ctx, req, args, reply, c, opts...)
39}
40
41func (client *Client) invoke(ctx context.Context, req *http.Request, args interface{}, reply interface{}, c callInfo, opts ...CallOption) error {
42	h := func(ctx context.Context, in interface{}) (interface{}, error) {
43		res, err := client.do(req.WithContext(ctx))
44		if res != nil {
45			cs := csAttempt{res: res}
46			for _, o := range opts {
47				o.after(&c, &cs)
48			}
49		}
50		if err != nil {
51			return nil, err
52		}
53		defer res.Body.Close()
54		if err := client.opts.decoder(ctx, res, reply); err != nil {
55			return nil, err
56		}
57		return reply, nil
58	}
59	var p selector.Peer
60	ctx = selector.NewPeerContext(ctx, &p)
61	if len(client.opts.middleware) > 0 {
62		h = middleware.Chain(client.opts.middleware...)(h)
63	}
64	_, err := h(ctx, args)
65	return err
66}