|
|
|
|
@ -7,7 +7,9 @@ import (
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"maps"
|
|
|
|
|
"net"
|
|
|
|
|
"os"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
@ -485,11 +487,19 @@ func newVendorClients(conns map[string]*BackendConn) (*VendorClients, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewGrpcConn(ctx context.Context, conf *model.Backend) (*grpc.ClientConn, error) {
|
|
|
|
|
if conf.Endpoint == "" {
|
|
|
|
|
return nil, errors.New("new grpc client failed, endpoint is empty")
|
|
|
|
|
if err := conf.Validate(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if conf.Consul.ServiceName != "" && conf.Etcd.ServiceName != "" {
|
|
|
|
|
return nil, errors.New("new grpc client failed, consul and etcd can't be used at the same time")
|
|
|
|
|
_, _, err := net.SplitHostPort(conf.Endpoint)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if !strings.Contains(err.Error(), "missing port in address") {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if conf.TLS {
|
|
|
|
|
conf.Endpoint += ":443"
|
|
|
|
|
} else {
|
|
|
|
|
conf.Endpoint += ":80"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
middlewares := []middleware.Middleware{kcircuitbreaker.Client(kcircuitbreaker.WithCircuitBreaker(func() circuitbreaker.CircuitBreaker {
|
|
|
|
|
return sre.NewBreaker(
|
|
|
|
|
@ -551,10 +561,7 @@ func NewGrpcConn(ctx context.Context, conf *model.Backend) (*grpc.ClientConn, er
|
|
|
|
|
log.Infof("new grpc client with endpoint: %s", conf.Endpoint)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
con *grpc.ClientConn
|
|
|
|
|
err error
|
|
|
|
|
)
|
|
|
|
|
var con *grpc.ClientConn
|
|
|
|
|
if conf.TLS {
|
|
|
|
|
var rootCAs *x509.CertPool
|
|
|
|
|
rootCAs, err = x509.SystemCertPool()
|
|
|
|
|
@ -588,6 +595,17 @@ func NewHttpClientConn(ctx context.Context, conf *model.Backend) (*http.Client,
|
|
|
|
|
if err := conf.Validate(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
_, _, err := net.SplitHostPort(conf.Endpoint)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if !strings.Contains(err.Error(), "missing port in address") {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if conf.TLS {
|
|
|
|
|
conf.Endpoint += ":443"
|
|
|
|
|
} else {
|
|
|
|
|
conf.Endpoint += ":80"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
middlewares := []middleware.Middleware{kcircuitbreaker.Client(kcircuitbreaker.WithCircuitBreaker(func() circuitbreaker.CircuitBreaker {
|
|
|
|
|
return sre.NewBreaker(
|
|
|
|
|
sre.WithRequest(25),
|
|
|
|
|
|