-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathexample_grpc_ip_limiter_test.go
136 lines (126 loc) · 3.89 KB
/
example_grpc_ip_limiter_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Package examples implements a gRPC server for Greeter service using rate limiters.
package examples
import (
"context"
"fmt"
"log"
"net"
"os"
"strings"
"time"
"google.golang.org/grpc/credentials/insecure"
"github.com/redis/go-redis/v9"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"github.com/mennanov/limiters"
pb "github.com/mennanov/limiters/examples/helloworld"
)
func Example_ipGRPCLimiter() {
// Set up a gRPC server.
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
// Connect to etcd.
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(os.Getenv("ETCD_ENDPOINTS"), ","),
DialTimeout: time.Second,
})
if err != nil {
log.Fatalf("could not connect to etcd: %v", err)
}
defer etcdClient.Close()
// Connect to Redis.
redisClient := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_ADDR"),
})
defer redisClient.Close()
logger := limiters.NewStdLogger()
// Registry is needed to keep track of previously created limiters. It can remove the expired limiters to free up
// memory.
registry := limiters.NewRegistry()
// The rate is used to define the token bucket refill rate and also the TTL for the limiters (both in Redis and in
// the registry).
rate := time.Second * 3
clock := limiters.NewSystemClock()
go func() {
// Garbage collect the old limiters to prevent memory leaks.
for {
<-time.After(rate)
registry.DeleteExpired(clock.Now())
}
}()
// Add a unary interceptor middleware to rate limit requests.
s := grpc.NewServer(grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
p, ok := peer.FromContext(ctx)
var ip string
if !ok {
log.Println("no peer info available")
ip = "unknown"
} else {
ip = p.Addr.String()
}
// Create an IP address based rate limiter.
bucket := registry.GetOrCreate(ip, func() interface{} {
return limiters.NewTokenBucket(
2,
rate,
limiters.NewLockEtcd(etcdClient, fmt.Sprintf("/lock/ip/%s", ip), logger),
limiters.NewTokenBucketRedis(
redisClient,
fmt.Sprintf("/ratelimiter/ip/%s", ip),
rate, false),
clock, logger)
}, rate, clock.Now())
w, err := bucket.(*limiters.TokenBucket).Limit(ctx)
if err == limiters.ErrLimitExhausted {
return nil, status.Errorf(codes.ResourceExhausted, "try again later in %s", w)
} else if err != nil {
// The limiter failed. This error should be logged and examined.
log.Println(err)
return nil, status.Error(codes.Internal, "internal error")
}
return handler(ctx, req)
}))
pb.RegisterGreeterServer(s, &server{})
go func() {
// Start serving.
if err = s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
defer s.GracefulStop()
// Set up a client connection to the server.
conn, err := grpc.NewClient(fmt.Sprintf("localhost%s", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Bob"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Peter"})
if err == nil {
log.Fatal("error expected, but got nil")
}
fmt.Println(err)
// Output: Hello Alice
// Hello Bob
// rpc error: code = ResourceExhausted desc = try again later in 3s
}