-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnections.go
82 lines (75 loc) · 2.2 KB
/
connections.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package pgperms
import (
"context"
"fmt"
"github.com/jackc/pgx/v4"
)
// Connections is a set of connections to the same cluster, but connected to different databases.
type Connections struct {
ctx context.Context
primary *pgx.Conn
perDatabase map[string]*pgx.Conn
refcounts map[string]int
}
// NewConnections creates a new set of connections, starting with given connection as the primary connection.
// Other connections will be made based on its config.
func NewConnections(ctx context.Context, primary *pgx.Conn) *Connections {
c := primary.Config()
return &Connections{
ctx: ctx,
primary: primary,
perDatabase: map[string]*pgx.Conn{
c.Database: primary,
},
refcounts: map[string]int{},
}
}
// Get (or create) a connection to a specific database.
// You need to call the returned function when done with the connection.
func (c *Connections) Get(database string) (*pgx.Conn, func(), error) {
if database == "" {
return c.primary, func() {}, nil
}
deref := func() {
c.refcounts[database]--
if c.refcounts[database] == 0 && (database == "template0" || database == "template1") {
// We want to drop connections to the template databases as soon as possible, because it blocks new databases from being created based on them.
c.DropCachedConnection(database)
}
}
if conn, ok := c.perDatabase[database]; ok {
c.refcounts[database]++
return conn, deref, nil
}
cc := c.primary.Config()
cc.Database = database
dbconn, err := pgx.ConnectConfig(c.ctx, cc)
if err != nil {
return nil, nil, err
}
c.perDatabase[database] = dbconn
c.refcounts[database]++
return dbconn, deref, nil
}
// DropCachedConnection disconnects from the given database name if needed.
func (c *Connections) DropCachedConnection(database string) {
conn, ok := c.perDatabase[database]
if !ok {
return
}
if c.refcounts[database] > 0 {
panic(fmt.Errorf("Connection to database %q is still in use", database))
}
_ = conn.Close(c.ctx)
delete(c.perDatabase, database)
delete(c.refcounts, database)
}
// Close all connections except for the primary.
func (c *Connections) Close() {
for name, conn := range c.perDatabase {
if c.primary == conn {
continue
}
c.DropCachedConnection(name)
}
}