Skip to content

Commit 26aa05d

Browse files
committed
feat: postgres database resource
Adds a new `PostgresDatabaseResource` that shifts the Postgres database creation logic to happen later in the database creation process. Now, the `InstanceResource` is only responsible for creating the database users, and the `PostgresDatabase` resource is responsible for creating the Postgres database, granting role privileges to the new database, and initializing the Spock node. This change is necessary because we need to use `pg_service.conf` for SystemD support, and we need that file to exist when we create the Spock node. This change shifts the Spock node initialization until after all of the instances are created and all nodes are ready, so we're able to construct the DSNs for all nodes before we initialize Spock. This change has an added benefit that it opens up the possibility of support for multiple Postgres databases per Control Plane database since we're able to make multiple databases/spock nodes per `NodeResource`/Patroni cluster. I've added the `DatabaseName` field to most, if not all, of the resources that would need to change to accommodate this. However, I did stop short of incorporating the database name into these resource's identities, which will be necessary to complete that multi-database support. PLAT-417
1 parent bf3dd8b commit 26aa05d

64 files changed

Lines changed: 1542 additions & 630 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
{
22
"profiling_enabled": true,
3-
"client_addresses": ["127.0.0.1"]
3+
"client_addresses": ["127.0.0.1"],
4+
"logging": {
5+
"component_levels": {
6+
"api_server": "error"
7+
}
8+
}
49
}

server/internal/database/instance_resource.go

Lines changed: 5 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package database
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"fmt"
87
"slices"
@@ -16,7 +15,6 @@ import (
1615
"github.com/pgEdge/control-plane/server/internal/patroni"
1716
"github.com/pgEdge/control-plane/server/internal/postgres"
1817
"github.com/pgEdge/control-plane/server/internal/resource"
19-
"github.com/pgEdge/control-plane/server/internal/utils"
2018
)
2119

2220
var _ resource.Resource = (*InstanceResource)(nil)
@@ -122,6 +120,10 @@ func (r *InstanceResource) Delete(ctx context.Context, rc *resource.Context) err
122120
}
123121

124122
func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, dbName string) (*pgx.Conn, error) {
123+
if rc.HostID != r.Spec.HostID {
124+
return nil, fmt.Errorf("cannot connect to an instance running on a different host. executing host = '%s', instance host = '%s'", rc.HostID, r.Spec.HostID)
125+
}
126+
125127
certs, err := do.Invoke[*certificates.Service](rc.Injector)
126128
if err != nil {
127129
return nil, err
@@ -178,77 +180,17 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
178180
return fmt.Errorf("failed to get TLS config: %w", err)
179181
}
180182

181-
firstTimeSetup, err := r.isFirstTimeSetup(rc)
182-
if err != nil {
183-
return err
184-
}
185-
186-
var spockSets []postgres.ReplicationSet
187-
var spockTables []postgres.ReplicationSetTable
188-
if r.Spec.RestoreConfig != nil && firstTimeSetup {
189-
err = r.renameDB(ctx, tlsCfg)
190-
if err != nil {
191-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
192-
}
193-
194-
spockSets, spockTables, err = r.backupReplicationSets(ctx, tlsCfg)
195-
if err != nil {
196-
return err
197-
}
198-
199-
err = r.dropSpock(ctx, tlsCfg)
200-
if err != nil {
201-
return fmt.Errorf("failed to drop spock: %w", err)
202-
}
203-
}
204-
205-
err = r.createDB(ctx, tlsCfg)
206-
if err != nil {
207-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
208-
}
209-
210183
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
211-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
184+
DSN: r.ConnectionInfo.AdminDSN("postgres"),
212185
TLS: tlsCfg,
213186
})
214187
if err != nil {
215188
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
216189
}
217190
defer conn.Close(ctx)
218191

219-
tx, err := conn.Begin(ctx)
220-
if err != nil {
221-
return fmt.Errorf("failed to begin transaction: %w", err)
222-
}
223-
defer tx.Rollback(ctx)
224-
225-
enabled, err := postgres.IsSpockEnabled().Scalar(ctx, tx)
226-
if err != nil {
227-
return fmt.Errorf("failed to check if spock is enabled: %w", err)
228-
}
229-
230-
if enabled {
231-
err = postgres.EnableRepairMode().Exec(ctx, tx)
232-
if err != nil {
233-
return fmt.Errorf("failed to enable repair mode: %w", err)
234-
}
235-
}
236-
237-
err = postgres.InitializePgEdgeExtensions(
238-
r.Spec.NodeName,
239-
r.ConnectionInfo.PeerDSN(r.Spec.DatabaseName),
240-
).Exec(ctx, conn)
241-
if err != nil {
242-
return fmt.Errorf("failed to initialize pgedge extensions: %w", err)
243-
}
244-
if len(spockSets) > 0 || len(spockTables) > 0 {
245-
if err := postgres.RestoreReplicationSets(spockSets, spockTables).Exec(ctx, conn); err != nil {
246-
return fmt.Errorf("failed to restore spock metadata: %w", err)
247-
}
248-
}
249192
roleStatements, err := postgres.CreateBuiltInRoles(postgres.BuiltinRoleOptions{
250193
PGVersion: r.Spec.PgEdgeVersion.PostgresVersion.String(),
251-
DBName: r.Spec.DatabaseName,
252194
})
253195
if err != nil {
254196
return fmt.Errorf("failed to generate built-in role statements: %w", err)
@@ -261,8 +203,6 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
261203
statement, err := postgres.CreateUserRole(postgres.UserRoleOptions{
262204
Name: user.Username,
263205
Password: user.Password,
264-
DBName: r.Spec.DatabaseName,
265-
DBOwner: user.DBOwner,
266206
Attributes: user.Attributes,
267207
Roles: user.Roles,
268208
})
@@ -274,10 +214,6 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
274214
}
275215
}
276216

277-
if err := tx.Commit(ctx); err != nil {
278-
return fmt.Errorf("failed to commit transaction: %w", err)
279-
}
280-
281217
err = r.updateInstanceState(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable})
282218
if err != nil {
283219
return r.recordError(ctx, rc, err)
@@ -337,109 +273,3 @@ func (r *InstanceResource) updateConnectionInfo(ctx context.Context, rc *resourc
337273
func (r *InstanceResource) patroniClient() *patroni.Client {
338274
return patroni.NewClient(r.ConnectionInfo.PatroniURL(), nil)
339275
}
340-
341-
func (r *InstanceResource) createDB(ctx context.Context, tlsCfg *tls.Config) error {
342-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
343-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
344-
TLS: tlsCfg,
345-
})
346-
if err != nil {
347-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
348-
}
349-
defer createDBConn.Close(ctx)
350-
351-
err = postgres.CreateDatabase(r.Spec.DatabaseName).Exec(ctx, createDBConn)
352-
if err != nil {
353-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
354-
}
355-
356-
return nil
357-
}
358-
359-
func (r *InstanceResource) renameDB(ctx context.Context, tlsCfg *tls.Config) error {
360-
// Short circuit if the restore config doesn't include a dbname or if the
361-
// database name is the same.
362-
if r.Spec.RestoreConfig.SourceDatabaseName == "" || r.Spec.RestoreConfig.SourceDatabaseName == r.Spec.DatabaseName {
363-
return nil
364-
}
365-
366-
// This operation can be flaky because of other processes connected to the
367-
// database. We retry it a few times to avoid failing the entire create
368-
// operation.
369-
err := utils.Retry(3, 500*time.Millisecond, func() error {
370-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
371-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
372-
TLS: tlsCfg,
373-
})
374-
if err != nil {
375-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
376-
}
377-
defer createDBConn.Close(ctx)
378-
379-
return postgres.
380-
RenameDB(r.Spec.RestoreConfig.SourceDatabaseName, r.Spec.DatabaseName).
381-
Exec(ctx, createDBConn)
382-
})
383-
if err != nil {
384-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
385-
}
386-
387-
return nil
388-
}
389-
390-
func (r *InstanceResource) dropSpock(ctx context.Context, tlsCfg *tls.Config) error {
391-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
392-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
393-
TLS: tlsCfg,
394-
})
395-
if err != nil {
396-
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
397-
}
398-
defer conn.Close(ctx)
399-
400-
err = postgres.DropSpockAndCleanupSlots(r.Spec.DatabaseName).Exec(ctx, conn)
401-
if err != nil {
402-
return fmt.Errorf("failed to drop spock: %w", err)
403-
}
404-
405-
return nil
406-
}
407-
408-
func (r *InstanceResource) isFirstTimeSetup(rc *resource.Context) (bool, error) {
409-
// This instance will already exist in the state if it's been successfully
410-
// created before.
411-
_, err := resource.FromContext[*InstanceResource](rc, r.Identifier())
412-
if errors.Is(err, resource.ErrNotFound) {
413-
return true, nil
414-
} else if err != nil {
415-
return false, fmt.Errorf("failed to check state for previous version of this instance: %w", err)
416-
}
417-
418-
return false, nil
419-
}
420-
421-
func (r *InstanceResource) backupReplicationSets(
422-
ctx context.Context,
423-
tlsCfg *tls.Config,
424-
) ([]postgres.ReplicationSet, []postgres.ReplicationSetTable, error) {
425-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
426-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
427-
TLS: tlsCfg,
428-
})
429-
if err != nil {
430-
return nil, nil, fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
431-
}
432-
defer conn.Close(ctx)
433-
434-
sets, err := postgres.GetReplicationSets().Structs(ctx, conn)
435-
if err != nil {
436-
return nil, nil, fmt.Errorf("spock backup failed to get replication sets: %w", err)
437-
}
438-
439-
tabs, err := postgres.GetReplicationSetTables().Structs(ctx, conn)
440-
if err != nil {
441-
return nil, nil, fmt.Errorf("spock backup failed to get replication set tables: %w", err)
442-
}
443-
444-
return sets, tabs, nil
445-
}

server/internal/database/lag_tracker_commit_ts_resource.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type LagTrackerCommitTimestampResource struct {
2626
// Planner fields
2727
OriginNode string `json:"origin_node"`
2828
ReceiverNode string `json:"receiver_node"`
29+
DatabaseName string `json:"database_name"`
2930

3031
// Dependency wiring
3132
ExtraDependencies []resource.Identifier `json:"dependent_resources,omitempty"`
@@ -56,7 +57,8 @@ func (r *LagTrackerCommitTimestampResource) Identifier() resource.Identifier {
5657

5758
func (r *LagTrackerCommitTimestampResource) Dependencies() []resource.Identifier {
5859
deps := []resource.Identifier{
59-
NodeResourceIdentifier(r.ReceiverNode),
60+
PostgresDatabaseResourceIdentifier(r.ReceiverNode, r.DatabaseName),
61+
PostgresDatabaseResourceIdentifier(r.OriginNode, r.DatabaseName),
6062
}
6163
deps = append(deps, r.ExtraDependencies...)
6264
return deps

server/internal/database/node_resource.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package database
33
import (
44
"context"
55
"fmt"
6+
"slices"
7+
"strings"
68

9+
"github.com/pgEdge/control-plane/server/internal/postgres"
710
"github.com/pgEdge/control-plane/server/internal/resource"
811
)
912

@@ -83,3 +86,62 @@ func (n *NodeResource) Update(ctx context.Context, rc *resource.Context) error {
8386
func (n *NodeResource) Delete(ctx context.Context, rc *resource.Context) error {
8487
return nil
8588
}
89+
90+
func (n *NodeResource) DSN(
91+
ctx context.Context,
92+
rc *resource.Context,
93+
fromInstance *InstanceResource,
94+
dbName string,
95+
) (*postgres.DSN, error) {
96+
// Sort the instances so that our final DSN is deterministic
97+
instanceIDs := slices.Clone(n.InstanceIDs)
98+
slices.SortFunc(instanceIDs, func(a, b string) int {
99+
// Always sort the primary instance to the beginning of the list since
100+
// these DSNs are used for subscriptions, so we'll want to try
101+
// connecting to the known primary instance first.
102+
switch {
103+
case a == n.PrimaryInstanceID:
104+
return -1
105+
case b == n.PrimaryInstanceID:
106+
return 1
107+
default:
108+
return strings.Compare(a, b)
109+
}
110+
})
111+
112+
hosts := make([]string, len(instanceIDs))
113+
ports := make([]int, len(instanceIDs))
114+
for i, instanceID := range instanceIDs {
115+
instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(instanceID))
116+
if err != nil {
117+
return nil, fmt.Errorf("failed to get instance '%s': %w", instanceID, err)
118+
}
119+
hosts[i] = instance.ConnectionInfo.PeerHost
120+
ports[i] = instance.ConnectionInfo.PeerPort
121+
}
122+
123+
return &postgres.DSN{
124+
Hosts: hosts,
125+
Ports: ports,
126+
DBName: dbName,
127+
User: "pgedge",
128+
SSLCert: fromInstance.ConnectionInfo.PeerSSLCert,
129+
SSLKey: fromInstance.ConnectionInfo.PeerSSLKey,
130+
SSLRootCert: fromInstance.ConnectionInfo.PeerSSLRootCert,
131+
Extra: map[string]string{
132+
"target_session_attrs": "primary",
133+
},
134+
}, nil
135+
}
136+
137+
func (n *NodeResource) PrimaryInstance(ctx context.Context, rc *resource.Context) (*InstanceResource, error) {
138+
if n.PrimaryInstanceID == "" {
139+
return nil, fmt.Errorf("%w: primary instance id not set", resource.ErrNotFound)
140+
}
141+
instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(n.PrimaryInstanceID))
142+
if err != nil {
143+
return nil, fmt.Errorf("failed to get primary instance '%s': %w", n.PrimaryInstanceID, err)
144+
}
145+
146+
return instance, nil
147+
}

server/internal/database/operations/add_nodes.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package operations
33
import (
44
"fmt"
55

6-
"github.com/pgEdge/control-plane/server/internal/database"
76
"github.com/pgEdge/control-plane/server/internal/resource"
87
)
98

@@ -17,14 +16,12 @@ func AddNode(node *NodeResources) ([]*resource.State, error) {
1716
return nil, fmt.Errorf("got empty instances for node %s", node.NodeName)
1817
}
1918

20-
instanceIDs := make([]string, 0, len(node.InstanceResources))
2119
states := make([]*resource.State, 0, 2)
2220

2321
primary, err := instanceState(node.InstanceResources[0])
2422
if err != nil {
2523
return nil, err
2624
}
27-
instanceIDs = append(instanceIDs, node.InstanceResources[0].InstanceID())
2825
states = append(states, primary)
2926

3027
var replicas *resource.State
@@ -38,20 +35,17 @@ func AddNode(node *NodeResources) ([]*resource.State, error) {
3835
} else {
3936
replicas.Merge(replica)
4037
}
41-
instanceIDs = append(instanceIDs, inst.InstanceID())
4238
}
4339

4440
if replicas != nil {
4541
states = append(states, replicas)
4642
}
4743

48-
err = addNodeResource(states, &database.NodeResource{
49-
Name: node.NodeName,
50-
InstanceIDs: instanceIDs,
51-
})
44+
nodeState, err := node.nodeResourceState()
5245
if err != nil {
5346
return nil, err
5447
}
48+
states[len(states)-1].Merge(nodeState)
5549

5650
return states, nil
5751
}

0 commit comments

Comments
 (0)