Skip to content

Commit

Permalink
add more logs
Browse files Browse the repository at this point in the history
Signed-off-by: youhangwang <[email protected]>
  • Loading branch information
youhangwang committed Jun 17, 2024
1 parent d2f6f35 commit 9456ca5
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 50 deletions.
50 changes: 45 additions & 5 deletions controllers/cephfscg/cghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,14 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupDestination(
replicationGroupDestinationName, replicationGroupDestinationNamespace string,
rdSpecs []ramendrv1alpha1.VolSyncReplicationDestinationSpec,
) (*ramendrv1alpha1.ReplicationGroupDestination, error) {
log := c.logger.WithName("CreateOrUpdateReplicationGroupDestination").
WithValues("ReplicationGroupDestinationName", replicationGroupDestinationName,
"ReplicationGroupDestinationNamespace", replicationGroupDestinationNamespace)

if err := util.DeleteReplicationGroupSource(c.ctx, c.Client,
replicationGroupDestinationName, replicationGroupDestinationNamespace); err != nil {
log.Error(err, "Failed to delete ReplicationGroupSource before creating ReplicationGroupDestination")

return nil, err
}

Expand All @@ -110,6 +116,8 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupDestination(
return nil
})
if err != nil {
log.Error(err, "Failed to create or update ReplicationGroupDestination")

return nil, err
}

Expand All @@ -121,11 +129,18 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
replicationGroupSourceName, replicationGroupSourceNamespace string,
runFinalSync bool,
) (*ramendrv1alpha1.ReplicationGroupSource, bool, error) {
log := c.logger.WithName("CreateOrUpdateReplicationGroupSource").
WithValues("ReplicationGroupSourceName", replicationGroupSourceName,
"ReplicationGroupSourceNamespace", replicationGroupSourceNamespace)

log.Info("Get RDs which are owned by RGD", "RGD", c.instance.Name)
// Get the rd if it exist when change secondary to primary
rdList := &volsyncv1alpha1.ReplicationDestinationList{}
if err := c.ListByOwner(rdList,
map[string]string{util.RGDOwnerLabel: c.instance.Name}, c.instance.Namespace,
); err != nil {
log.Error(err, "Failed to get RDs which are owned by RGD", "RGD", c.instance.Name)

return nil, false, err
}

Expand All @@ -134,8 +149,12 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
if c.VSHandler.IsCopyMethodDirect() {
// Before creating a new RGS, make sure any LocalReplicationDestination for this PVC is cleaned up first
// DeleteRD only delete LRD&LRS here, as only the lrd&lrs have vrg owner and also belongs a CG
log.Info("Delete local RD and RS if they exists for RD", "RD", rd)

err := c.DeleteLocalRDAndRS(&rd)
if err != nil {
log.Error(err, "Failed to delete local RD and RS for RD", "RD", rd)

return nil, false, err
}
}
Expand All @@ -144,6 +163,8 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
if err := util.DeleteReplicationGroupDestination(
c.ctx, c.Client,
replicationGroupSourceName, replicationGroupSourceNamespace); err != nil {
log.Error(err, "Failed to delete ReplicationGroupDestination before creating ReplicationGroupSource")

return nil, false, err
}

Expand All @@ -157,6 +178,8 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
c.instance.Spec.CephFSConsistencyGroupSelector, c.instance.Namespace, c.logger,
)
if err != nil {
log.Error(err, "Failed to get volume group snapshot class name")

return nil, false, err
}

Expand Down Expand Up @@ -204,6 +227,8 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
return nil
})
if err != nil {
log.Error(err, "Failed to create or update ReplicationGroupSource")

return nil, false, err
}

Expand All @@ -212,6 +237,8 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
// and also run cleanup (removes PVC we just ran the final sync from)
//
if runFinalSync && isFinalSyncComplete(rgs) {
log.Info("ReplicationGroupSource complete final sync")

return rgs, true, nil
}

Expand All @@ -221,22 +248,27 @@ func (c *cgHandler) CreateOrUpdateReplicationGroupSource(
func (c *cgHandler) GetLatestImageFromRGD(
ctx context.Context, pvcName string,
) (*corev1.TypedLocalObjectReference, error) {
log := c.logger.WithName("GetLatestImageFromRGD").WithValues("PVCName", pvcName)

log.Info("List ReplicationGroupDestination with VRGOwner label")

rgdList := &ramendrv1alpha1.ReplicationGroupDestinationList{}

if err := c.ListByOwner(rgdList, map[string]string{
volsync.VRGOwnerNameLabel: c.instance.GetName(),
volsync.VRGOwnerNamespaceLabel: c.instance.GetNamespace(),
}, c.instance.Namespace); err != nil {
log.Error(err, "Failed to list ReplicationGroupDestination with VRGOwner label")

return nil, err
}

c.logger.Info("Get rgdList", "rgdList", rgdList)
c.logger.Info("Get latest image from ReplicationGroupDestination list", "rgdList", rgdList)

var latestImage *corev1.TypedLocalObjectReference

for _, rgd := range rgdList.Items {
if util.GetPVCLatestImageRGD(pvcName, rgd) != nil {
c.logger.Info("Get latest image from RDG for PVC", "PVC", pvcName)
latestImage = util.GetPVCLatestImageRGD(pvcName, rgd)
}
}
Expand All @@ -258,8 +290,12 @@ func (c *cgHandler) GetLatestImageFromRGD(
func (c *cgHandler) EnsurePVCfromRGD(
rdSpec ramendrv1alpha1.VolSyncReplicationDestinationSpec, failoverAction bool,
) error {
log := c.logger.WithName("EnsurePVCfromRGD")

latestImage, err := c.GetLatestImageFromRGD(c.ctx, rdSpec.ProtectedPVC.Name)
if err != nil {
log.Error(err, "Failed to get latest image from RGD")

return err
}

Expand All @@ -282,7 +318,7 @@ func (c *cgHandler) DeleteLocalRDAndRS(rd *volsyncv1alpha1.ReplicationDestinatio
return err
}

c.logger.Info("Clean up local resources. Latest Image for main RD", "name", latestRDImage.Name)
c.logger.Info("Clean up local resources. Latest Image for RD", "name", latestRDImage.Name)

lrs := &volsyncv1alpha1.ReplicationSource{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -303,6 +339,8 @@ func (c *cgHandler) DeleteLocalRDAndRS(rd *volsyncv1alpha1.ReplicationDestinatio
)
}

c.logger.Error(err, "Failed to get local ReplicationSource")

return err
}

Expand All @@ -311,13 +349,15 @@ func (c *cgHandler) DeleteLocalRDAndRS(rd *volsyncv1alpha1.ReplicationDestinatio
if lrs.Spec.Trigger != nil && lrs.Spec.Trigger.Manual == latestRDImage.Name {
// When local final sync is complete, we cleanup all locally created resources except the app PVC
if lrs.Status != nil && lrs.Status.LastManualSync == lrs.Spec.Trigger.Manual {
c.logger.Info("Clean up local resources for RD", "name", rd.Name)

err = c.VSHandler.CleanupLocalResources(lrs)
if err != nil {
c.logger.Info("Failed to cleaned up local resources for RD", "name", rd.Name)

return err
}

c.logger.Info("Cleaned up local resources for RD", "name", rd.Name)

return nil
}
}
Expand Down
64 changes: 25 additions & 39 deletions controllers/cephfscg/replicationgroupdestination.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (
"github.com/backube/volsync/controllers/mover"
"github.com/backube/volsync/controllers/statemachine"
"github.com/go-logr/logr"
vsv1 "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
ramendrv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
"github.com/ramendr/ramen/controllers/util"
"github.com/ramendr/ramen/controllers/volsync"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -85,6 +83,8 @@ func (m *rgdMachine) Conditions() *[]metav1.Condition {

//nolint:cyclop,funlen
func (m *rgdMachine) Synchronize(ctx context.Context) (mover.Result, error) {
m.Logger.Info("Start Synchronize")

createdRDs := []*volsyncv1alpha1.ReplicationDestination{}
rds := []*corev1.ObjectReference{}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (m *rgdMachine) Synchronize(ctx context.Context) (mover.Result, error) {
latestImages := make(map[string]*corev1.TypedLocalObjectReference)

for _, rd := range createdRDs {
m.Logger.Info("Check replication destination is completed", "ReplicationDestinationName", rd.Name)
m.Logger.Info("Check if replication destination is completed", "ReplicationDestinationName", rd.Name)

if rd.Spec.Trigger.Manual != rd.Status.LastManualSync {
m.Logger.Info("replication destination is not completed", "ReplicationDestinationName", rd.Name)
Expand All @@ -123,17 +123,25 @@ func (m *rgdMachine) Synchronize(ctx context.Context) (mover.Result, error) {
}

if rd.Status.LatestImage != nil && rd.Spec.RsyncTLS != nil && rd.Spec.RsyncTLS.DestinationPVC != nil {
m.Logger.Info("Append latest image in the list",
"ReplicationDestinationName", rd.Name, "LatestImage", rd.Status.LatestImage)

latestImages[*rd.Spec.RsyncTLS.DestinationPVC] = rd.Status.LatestImage
}

m.Logger.Info("Set DoNotDeleteLabel to the image",
"ReplicationDestinationName", rd.Name, "LatestImage", rd.Status.LatestImage)

if err := util.DeferDeleteImage(
ctx, m.Client, rd.Status.LatestImage.Name, rd.Namespace, rd.Spec.Trigger.Manual, m.ReplicationGroupDestination.Name,
); err != nil {
return mover.InProgress(), err
}
}

readytoUse, err := m.CheckImagesReadyToUse(ctx, latestImages, m.ReplicationGroupDestination.Namespace)
readytoUse, err := util.CheckImagesReadyToUse(
ctx, m.Client, latestImages, m.ReplicationGroupDestination.Namespace, m.Logger,
)
if err != nil {
m.Logger.Error(err, "Failed to check if images are ready to use")

Expand Down Expand Up @@ -167,40 +175,6 @@ func (m *rgdMachine) SetOutOfSync(bool) {}
func (m *rgdMachine) IncMissedIntervals() {}
func (m *rgdMachine) ObserveSyncDuration(dur time.Duration) {}

func (m *rgdMachine) CheckImagesReadyToUse(
ctx context.Context,
latestImages map[string]*corev1.TypedLocalObjectReference,
namespace string,
) (bool, error) {
for pvcName := range latestImages {
latestImage := latestImages[pvcName]
if latestImage == nil {
m.Logger.Info("Image is nil to check")

return false, nil
}

volumeSnapshot := &vsv1.VolumeSnapshot{}
if err := m.Client.Get(ctx,
types.NamespacedName{Name: latestImage.Name, Namespace: namespace},
volumeSnapshot,
); err != nil {
m.Logger.Error(err, "Failed to get volume snapshot")

return false, err
}

if volumeSnapshot.Status.ReadyToUse == nil ||
(volumeSnapshot.Status.ReadyToUse != nil && !*volumeSnapshot.Status.ReadyToUse) {
m.Logger.Info("Volume snapshot is not ready to use", "VolumeSnapshot", volumeSnapshot.Name)

return false, nil
}
}

return true, nil
}

//nolint:cyclop
func (m *rgdMachine) ReconcileRD(
rdSpec ramendrv1alpha1.VolSyncReplicationDestinationSpec, manual string,
Expand All @@ -210,16 +184,26 @@ func (m *rgdMachine) ReconcileRD(
return nil, fmt.Errorf("protectedPVC %s is not VolSync Enabled", rdSpec.ProtectedPVC.Name)
}

log := m.Logger.WithValues("ProtectedPVCName", rdSpec.ProtectedPVC.Name)

// Pre-allocated shared secret - DRPC will generate and propagate this secret from hub to clusters
pskSecretName := volsync.GetVolSyncPSKSecretNameFromVRGName(m.ReplicationGroupDestination.Name)
// Need to confirm this secret exists on the cluster before proceeding, otherwise volsync will generate it
secretExists, err := m.VSHandler.ValidateSecretAndAddVRGOwnerRef(pskSecretName)
if err != nil || !secretExists {
if err != nil {
log.Error(err, "Failed to ValidateSecretAndAddVRGOwnerRef", "PSKSecretName", pskSecretName)

return nil, err
}

if !secretExists {
return nil, fmt.Errorf("psk secret: %s is not found", pskSecretName)
}

dstPVC, err := m.VSHandler.PrecreateDestPVCIfEnabled(rdSpec)
if err != nil {
log.Error(err, "Failed to PrecreateDestPVCIfEnabled", "PSKSecretName", pskSecretName)

return nil, err
}

Expand All @@ -232,6 +216,8 @@ func (m *rgdMachine) ReconcileRD(

err = m.VSHandler.ReconcileServiceExportForRD(rd)
if err != nil {
log.Error(err, "Failed to ReconcileServiceExportForRD", "RD", rd)

return nil, err
}

Expand Down
2 changes: 2 additions & 0 deletions controllers/cephfscg/replicationgroupsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (m *replicationGroupSourceMachine) Synchronize(ctx context.Context) (mover.
// Need to confirm this secret exists on the cluster before proceeding, otherwise volsync will generate it
secretExists, err := m.VSHandler.ValidateSecretAndAddVRGOwnerRef(pskSecretName)
if err != nil || !secretExists {
m.Logger.Error(err, "Failed to validate secret and add VRGOwnerRef")

return mover.InProgress(), err
}

Expand Down
5 changes: 3 additions & 2 deletions controllers/replicationgroupsource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ In this design:
1. ReplicationGroupSource Name = ReplicationGroupDestination Name = VRG Name = Application Name
ReplicationGroupSource create VolumeGroupSnapshot, Restored PVC and ReplicationSource in each sync.
At the end of each sync, VolumeGroupSnapshot, Restored PVC and ReplicationSource will be deleted by ramen.
At the end of each sync, VolumeGroupSnapshot, Restored PVC will be deleted by ramen,
ReplicationSource will not be deleted.
2. VolumeGroupSnapshot Name = cephfscg-<ReplicationGroupSource Name>
3. Restored PVC Name = cephfscg-<Application PVC Name>
4. ReplicationSource Name = <Application PVC Name>
4. ReplicationSource Name = ReplicationDestination Name = <Application PVC Name>
5. ReplicationDestinationServiceName = volsync-rsync-tls-dst-<Application PVC Name>.<RD Namespace>.svc.clusterset.local
6. Volsync Secret Name = <VRG Name>-vs-secret
Expand Down
41 changes: 37 additions & 4 deletions controllers/util/cephfs_cg.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ const (
// do not delete the vs in a vgs, only for testing
SkipDeleteAnnotaion = "rgd.ramendr.openshift.io/do-not-delete"

RGDOwnerLabel = "ramendr.openshift.io/rgd"
CleanupLabelKey = "volsync.backube/cleanup"
RGSOwnerLabel string = "ramendr.openshift.io/rgs"
RGDOwnerLabel = "ramendr.openshift.io/rgd"
RGSOwnerLabel string = "ramendr.openshift.io/rgs"
)

func IsFSCGSupport(mgr manager.Manager) (bool, error) {
Expand Down Expand Up @@ -267,7 +266,6 @@ func DeferDeleteImage(ctx context.Context,
labels = make(map[string]string)
}

delete(labels, CleanupLabelKey)
labels[ramenutils.DoNotDeleteLabelKey] = "true"

labels[RGDOwnerLabel] = rgdName
Expand Down Expand Up @@ -316,3 +314,38 @@ func vsInRGD(vs vsv1.VolumeSnapshot, rgd *ramendrv1alpha1.ReplicationGroupDestin

return false
}

func CheckImagesReadyToUse(
ctx context.Context, k8sClient client.Client,
latestImages map[string]*corev1.TypedLocalObjectReference,
namespace string,
log logr.Logger,
) (bool, error) {
for pvcName := range latestImages {
latestImage := latestImages[pvcName]
if latestImage == nil {
log.Info("Image is nil to check")

return false, nil
}

volumeSnapshot := &vsv1.VolumeSnapshot{}
if err := k8sClient.Get(ctx,
types.NamespacedName{Name: latestImage.Name, Namespace: namespace},
volumeSnapshot,
); err != nil {
log.Error(err, "Failed to get volume snapshot")

return false, err
}

if volumeSnapshot.Status.ReadyToUse == nil ||
(volumeSnapshot.Status.ReadyToUse != nil && !*volumeSnapshot.Status.ReadyToUse) {
log.Info("Volume snapshot is not ready to use", "VolumeSnapshot", volumeSnapshot.Name)

return false, nil
}
}

return true, nil
}

0 comments on commit 9456ca5

Please sign in to comment.