From 8f53a99d279b09aae6ae1975b7338b99afb0826b Mon Sep 17 00:00:00 2001 From: Ingo Date: Mon, 27 Jan 2025 18:28:48 +0100 Subject: [PATCH 1/3] Create a unique cluster file for each AdminClient instance --- fdbclient/admin_client.go | 3 +-- fdbclient/common.go | 30 +++++++++++++++--------------- fdbclient/common_test.go | 30 +----------------------------- 3 files changed, 17 insertions(+), 46 deletions(-) diff --git a/fdbclient/admin_client.go b/fdbclient/admin_client.go index dd51c3016..4177cdc71 100644 --- a/fdbclient/admin_client.go +++ b/fdbclient/admin_client.go @@ -687,8 +687,7 @@ func (client *cliAdminClient) GetRestoreStatus() (string, error) { // Close cleans up any pending resources. func (client *cliAdminClient) Close() error { - // Allow to reuse the same file. - return nil + return os.Remove(client.clusterFilePath) } // GetCoordinatorSet gets the current coordinators from the status diff --git a/fdbclient/common.go b/fdbclient/common.go index aa1dc03a3..015e7c0e7 100644 --- a/fdbclient/common.go +++ b/fdbclient/common.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "io/fs" + "math/rand" "os" "path" "time" @@ -110,23 +111,22 @@ func createClusterFile(cluster *fdbv1beta2.FoundationDBCluster) (string, error) // ensureClusterFileIsPresent will ensure that the cluster file with the specified connection string is present. func ensureClusterFileIsPresent(dir string, uid string, connectionString string) (string, error) { - clusterFileName := path.Join(dir, uid) + for { + clusterFileName := path.Join(dir, fmt.Sprintf("%s-%d", uid, rand.Uint64())) - // Try to read the file to check if the file already exists and if so, if the content matches - content, err := os.ReadFile(clusterFileName) - - // If the file doesn't exist we have to create it - if errors.Is(err, fs.ErrNotExist) { - return clusterFileName, os.WriteFile(clusterFileName, []byte(connectionString), 0777) - } - - // The content of the cluster file is already correct. - if string(content) == connectionString { - return clusterFileName, nil + f, err := os.OpenFile(clusterFileName, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0777) + if err != nil { + if errors.Is(err, fs.ErrExist) { + continue + } + return "", err + } + _, err = f.Write([]byte(connectionString)) + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + return clusterFileName, err } - - // The content doesn't match, so we have to write the new content to the cluster file. - return clusterFileName, os.WriteFile(clusterFileName, []byte(connectionString), 0777) } // getConnectionStringFromDB gets the database's connection string directly from the system key diff --git a/fdbclient/common_test.go b/fdbclient/common_test.go index 9a4d8f112..c3961c160 100644 --- a/fdbclient/common_test.go +++ b/fdbclient/common_test.go @@ -44,35 +44,7 @@ var _ = Describe("common_test", func() { When("the cluster file doesn't exist", func() { It("should create the cluster file with the correct content", func() { - Expect(clusterFile).To(Equal(path.Join(tmpDir, uid))) - content, err := os.ReadFile(clusterFile) - Expect(err).NotTo(HaveOccurred()) - Expect(string(content)).To(Equal(connectionString)) - }) - }) - - When("the cluster file exist with the wrong content", func() { - BeforeEach(func() { - err := os.WriteFile(path.Join(os.TempDir(), uid), []byte("wrong"), 0777) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should update the cluster file with the correct content", func() { - Expect(clusterFile).To(Equal(path.Join(tmpDir, uid))) - content, err := os.ReadFile(clusterFile) - Expect(err).NotTo(HaveOccurred()) - Expect(string(content)).To(Equal(connectionString)) - }) - }) - - When("the cluster file exist with the correct content", func() { - BeforeEach(func() { - err := os.WriteFile(path.Join(os.TempDir(), uid), []byte(connectionString), 0777) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should keep the cluster file with the correct content", func() { - Expect(clusterFile).To(Equal(path.Join(tmpDir, uid))) + Expect(clusterFile).To(HavePrefix(path.Join(tmpDir, uid+"-"))) content, err := os.ReadFile(clusterFile) Expect(err).NotTo(HaveOccurred()) Expect(string(content)).To(Equal(connectionString)) From b532445243d4317b4fcbf25983855a490ce7181a Mon Sep 17 00:00:00 2001 From: Ingo Date: Wed, 29 Jan 2025 12:15:28 +0100 Subject: [PATCH 2/3] Make the FDB database a singleton. This lets us not open many connections, each using its own cluster file. If each instance has its private cluster file, they can run out of sync when the cluster changes. If they share a cluster file, there can be races as multiple threads write to the same file. --- fdbclient/common.go | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/fdbclient/common.go b/fdbclient/common.go index 015e7c0e7..ac63981c6 100644 --- a/fdbclient/common.go +++ b/fdbclient/common.go @@ -25,9 +25,11 @@ import ( "errors" "fmt" "io/fs" + "k8s.io/apimachinery/pkg/types" "math/rand" "os" "path" + "sync" "time" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" @@ -43,6 +45,10 @@ var DefaultCLITimeout = 10 * time.Second // MaxCliTimeout is the maximum CLI timeout that will be used for requests that might be slower to respond. var MaxCliTimeout = 40 * time.Second +// Keeps a singleton databases. The key is the cluster UID. Guarded by the mutex below for read and write access. +var databaseSingleton map[types.UID]*fdb.Database +var databaseSingletonMutex = &sync.Mutex{} + const ( defaultTransactionTimeout = 5 * time.Second ) @@ -84,24 +90,29 @@ func parseMachineReadableStatus(logger logr.Logger, contents []byte, checkForPro return status, nil } -// getFDBDatabase opens an FDB database. +// getFDBDatabase returns the singleton FDB database. May return an error if initializing the singleton failed. func getFDBDatabase(cluster *fdbv1beta2.FoundationDBCluster) (fdb.Database, error) { - clusterFile, err := createClusterFile(cluster) - if err != nil { - return fdb.Database{}, err - } + databaseSingletonMutex.Lock() + defer databaseSingletonMutex.Unlock() + if databaseSingleton[cluster.UID] == nil { + clusterFile, err := createClusterFile(cluster) + if err != nil { + return fdb.Database{}, err + } - database, err := fdb.OpenDatabase(clusterFile) - if err != nil { - return fdb.Database{}, err - } + database, err := fdb.OpenDatabase(clusterFile) + if err != nil { + return fdb.Database{}, err + } - err = database.Options().SetTransactionTimeout(defaultTransactionTimeout.Milliseconds()) - if err != nil { - return fdb.Database{}, err + err = database.Options().SetTransactionTimeout(defaultTransactionTimeout.Milliseconds()) + if err != nil { + return fdb.Database{}, err + } + databaseSingleton[cluster.UID] = &database } - - return database, nil + // This is a copy, but fdb.Database is just a pointer-to-implementation and cheap to copy. + return *databaseSingleton[cluster.UID], nil } // createClusterFile will create or update the cluster file for the specified cluster. From cf82c5816a5ee7233df2efbe28ac10ebc1244bcf Mon Sep 17 00:00:00 2001 From: Ingo Walther Date: Fri, 31 Jan 2025 13:49:21 +0100 Subject: [PATCH 3/3] Get updated connection string from database Instead of reading the updated connection string from fdbcli's cluster file, ask the cluster. This is meant to make it less likely to have races in case the file is unexpectedly being written to. --- fdbclient/admin_client.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/fdbclient/admin_client.go b/fdbclient/admin_client.go index 4177cdc71..6a6abad4f 100644 --- a/fdbclient/admin_client.go +++ b/fdbclient/admin_client.go @@ -490,17 +490,11 @@ func (client *cliAdminClient) ChangeCoordinators(addresses []fdbv1beta2.ProcessA if err != nil { return "", err } - - connectionStringBytes, err := os.ReadFile(client.clusterFilePath) - if err != nil { - return "", err - } - - connectionString, err := fdbv1beta2.ParseConnectionString(string(connectionStringBytes)) + connectionString, err := client.GetConnectionString() if err != nil { return "", err } - return connectionString.String(), nil + return connectionString, nil } // cleanConnectionStringOutput is a helper method to remove unrelated output from the get command in the connection string