package etcd

import (
	"archive/zip"
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io"
	"math/rand"
	"os"
	"path/filepath"
	"runtime"
	"slices"
	"sort"
	"strconv"
	"strings"
	"time"

	k3s "github.com/k3s-io/api/k3s.cattle.io/v1"
	"github.com/k3s-io/k3s/pkg/cluster/managed"
	"github.com/k3s-io/k3s/pkg/daemons/config"
	"github.com/k3s-io/k3s/pkg/etcd/s3"
	"github.com/k3s-io/k3s/pkg/etcd/snapshot"
	"github.com/k3s-io/k3s/pkg/etcd/snapshotmetrics"
	"github.com/k3s-io/k3s/pkg/util"
	"github.com/k3s-io/k3s/pkg/util/errors"
	"github.com/k3s-io/k3s/pkg/util/metrics"
	"github.com/k3s-io/k3s/pkg/version"
	"github.com/robfig/cron/v3"
	"github.com/sirupsen/logrus"
	snapshotv3 "go.etcd.io/etcd/client/v3/snapshot"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/equality"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	k8sruntime "k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/pager"
	"k8s.io/client-go/util/retry"
)

const (
	errorTTL             = 24 * time.Hour
	s3ReconcileTTL       = time.Minute
	snapshotListPageSize = 20
)

var (
	annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp"
	annotationS3Reconciled    = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp"

	// snapshotDataBackoff will retry at increasing steps for up to ~30 seconds.
	// If the ConfigMap update fails, the list won't be reconciled again until next time
	// the server starts, so we should be fairly persistent in retrying.
	snapshotDataBackoff = wait.Backoff{
		Steps:    9,
		Duration: 10 * time.Millisecond,
		Factor:   3.0,
		Jitter:   0.1,
	}

	// cronLogger wraps logrus's Printf output as cron-compatible logger
	cronLogger = cron.VerbosePrintfLogger(logrus.StandardLogger())
)

// snapshotDir ensures that the snapshot directory exists, and then returns its path.
// Only the default snapshot directory will be created; user-specified non-default
// snapshot directories must already exist.
func snapshotDir(config *config.Control, create bool) (string, error) {
	defaultSnapshotDir := filepath.Join(config.DataDir, "db", "snapshots")
	snapshotDir := config.EtcdSnapshotDir

	if snapshotDir == "" {
		snapshotDir = defaultSnapshotDir
	}

	// Disable creation if not using the default snapshot dir.
	// Non-default snapshot dirs must be created by the user.
	if snapshotDir != defaultSnapshotDir {
		create = false
	}

	s, err := os.Stat(snapshotDir)
	if err != nil {
		if os.IsNotExist(err) && create {
			if err := os.MkdirAll(snapshotDir, 0700); err != nil {
				return "", err
			}
			return snapshotDir, nil
		}
		return "", err
	}

	if !s.IsDir() {
		return "", fmt.Errorf("%s is not a directory", snapshotDir)
	}

	return snapshotDir, nil
}

// compressSnapshot compresses the given snapshot and provides the
// caller with the path to the file.
func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string, now time.Time) (string, error) {
	logrus.Info("Compressing etcd snapshot file: " + snapshotName)

	zippedSnapshotName := snapshotName + snapshot.CompressedExtension
	zipPath := filepath.Join(snapshotDir, zippedSnapshotName)

	zf, err := os.Create(zipPath)
	if err != nil {
		return "", err
	}
	defer zf.Close()

	zipWriter := zip.NewWriter(zf)
	defer zipWriter.Close()

	uncompressedPath := filepath.Join(snapshotDir, snapshotName)
	fileToZip, err := os.Open(uncompressedPath)
	if err != nil {
		os.Remove(zipPath)
		return "", err
	}
	defer fileToZip.Close()

	info, err := fileToZip.Stat()
	if err != nil {
		os.Remove(zipPath)
		return "", err
	}

	header, err := zip.FileInfoHeader(info)
	if err != nil {
		os.Remove(zipPath)
		return "", err
	}

	header.Name = snapshotName
	header.Method = zip.Deflate
	header.Modified = now

	writer, err := zipWriter.CreateHeader(header)
	if err != nil {
		os.Remove(zipPath)
		return "", err
	}
	_, err = io.Copy(writer, fileToZip)

	return zipPath, err
}

// decompressSnapshot decompresses the given snapshot and provides the caller
// with the full path to the uncompressed snapshot.
func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, error) {
	logrus.Info("Decompressing etcd snapshot file: " + snapshotFile)

	r, err := zip.OpenReader(snapshotFile)
	if err != nil {
		return "", err
	}
	defer r.Close()

	var decompressed *os.File
	for _, sf := range r.File {
		decompressed, err = os.OpenFile(strings.Replace(sf.Name, snapshot.CompressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode())
		if err != nil {
			return "", err
		}

		//revive:disable-next-line:defer
		defer decompressed.Close()

		ss, err := sf.Open()
		if err != nil {
			return "", err
		}

		//revive:disable-next-line:defer
		defer ss.Close()

		if _, err := io.Copy(decompressed, ss); err != nil {
			os.Remove(decompressed.Name())
			return "", err
		}
	}

	return decompressed.Name(), nil
}

// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
// snapshots in excess of the retention limits. Note that one snapshot request may result in creation and pruning
// of multiple snapshots, if S3 is enabled.
// Note that the prune step is generally disabled when snapshotting from the CLI, as there is a separate
// subcommand for prune that can be run manually if the user wants to remove old snapshots.
// Returns metadata about the new and pruned snapshots.
func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
	res, err := e.snapshot(ctx)
	if err != nil {
		return res, err
	}
	return res, e.reconcileSnapshotData(ctx, res)
}

// snapshot is the actual snapshot save/upload implementation.
// This is not inline in the Snapshot function so that the save and reconcile operation
// metrics do not overlap.
func (e *ETCD) snapshot(ctx context.Context) (_ *managed.SnapshotResult, rerr error) {
	snapshotStart := time.Now()
	defer metrics.ObserveWithStatus(snapshotmetrics.SaveCount, snapshotStart, rerr)

	if !e.snapshotMu.TryLock() {
		return nil, errors.New("snapshot save already in progress")
	}
	defer e.snapshotMu.Unlock()
	// make sure the core.Factory is initialized before attempting to add snapshot metadata
	var extraMetadata *v1.ConfigMap
	if e.config.Runtime.Core == nil {
		logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshot.ExtraMetadataConfigMapName)
	} else {
		logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshot.ExtraMetadataConfigMapName)
		if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshot.ExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil {
			logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshot.ExtraMetadataConfigMapName, err)
		} else {
			logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshot.ExtraMetadataConfigMapName)
			extraMetadata = snapshotExtraMetadataConfigMap
		}
	}

	endpoints := getEndpoints(e.config)
	status, err := e.client.Status(ctx, endpoints[0])
	if err != nil {
		return nil, errors.WithMessage(err, "failed to check etcd status for snapshot")
	}

	if status.IsLearner {
		logrus.Warnf("Unable to take snapshot: not supported for learner")
		return nil, nil
	}

	snapshotDir, err := snapshotDir(e.config, true)
	if err != nil {
		return nil, errors.WithMessage(err, "failed to get etcd-snapshot-dir")
	}

	cfg, err := getClientConfig(ctx, e.config)
	if err != nil {
		return nil, errors.WithMessage(err, "failed to get config for etcd snapshot")
	}

	tokenHash, err := util.GetTokenHash(e.config)
	if err != nil {
		return nil, errors.WithMessage(err, "failed to get server token hash for etcd snapshot")
	}

	nodeName := os.Getenv("NODE_NAME")
	now := time.Now().Round(time.Second)
	snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix())
	snapshotPath := filepath.Join(snapshotDir, snapshotName)
	logrus.Infof("Saving etcd snapshot to %s", snapshotPath)

	var sf *snapshot.File

	saveStart := time.Now()
	_, err = snapshotv3.SaveWithVersion(ctx, e.client.GetLogger(), *cfg, snapshotPath)
	metrics.ObserveWithStatus(snapshotmetrics.SaveLocalCount, saveStart, err)

	if err != nil {
		sf = &snapshot.File{
			Name:     snapshotName,
			Location: "",
			NodeName: nodeName,
			CreatedAt: &metav1.Time{
				Time: now,
			},
			Status:         snapshot.FailedStatus,
			Message:        base64.StdEncoding.EncodeToString([]byte(err.Error())),
			Size:           0,
			MetadataSource: extraMetadata,
		}
		logrus.Errorf("Failed to take etcd snapshot: %v", err)
		if err := e.addSnapshotData(*sf); err != nil {
			return nil, errors.WithMessage(err, "failed to sync ETCDSnapshotFile")
		}
	}

	res := &managed.SnapshotResult{}
	// If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message.
	if sf == nil {
		if e.config.EtcdSnapshotCompress {
			zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now)

			// ensure that the unncompressed snapshot is cleaned up even if compression fails
			if err := os.Remove(snapshotPath); err != nil && !os.IsNotExist(err) {
				logrus.Warnf("Failed to remove uncompress snapshot file: %v", err)
			}

			if err != nil {
				return nil, errors.WithMessage(err, "failed to compress snapshot")
			}
			snapshotPath = zipPath
			logrus.Info("Compressed snapshot: " + snapshotPath)
		}

		f, err := os.Stat(snapshotPath)
		if err != nil {
			return nil, errors.WithMessage(err, "unable to retrieve snapshot information from local snapshot")
		}

		sf = &snapshot.File{
			Name:     f.Name(),
			Location: "file://" + snapshotPath,
			NodeName: nodeName,
			CreatedAt: &metav1.Time{
				Time: now,
			},
			Status:         snapshot.SuccessfulStatus,
			Size:           f.Size(),
			Compressed:     e.config.EtcdSnapshotCompress,
			MetadataSource: extraMetadata,
			TokenHash:      tokenHash,
		}
		res.Created = append(res.Created, sf.Name)

		// Failing to save snapshot metadata is not fatal, the snapshot can still be used without it.
		if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil {
			logrus.Warnf("Failed to save local snapshot metadata: %v", err)
		}

		// If this fails, just log an error - the snapshot file will remain on disk
		// and will be recorded next time the snapshot list is reconciled.
		if err := e.addSnapshotData(*sf); err != nil {
			logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err)
		}

		// Snapshot retention may prune some files before returning an error. Failing to prune is not fatal.
		deleted, err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir)
		if err != nil {
			logrus.Warnf("Failed to apply local snapshot retention policy: %v", err)
		}
		res.Deleted = append(res.Deleted, deleted...)

		if e.config.EtcdS3 != nil {
			s3Start := time.Now()
			if s3client, err := e.getS3Client(ctx); err != nil {
				logrus.Warnf("Unable to initialize S3 client: %v", err)
				if !errors.Is(err, s3.ErrNoConfigSecret) {
					metrics.ObserveWithStatus(snapshotmetrics.SaveS3Count, s3Start, err)
					err = errors.WithMessage(err, "failed to initialize S3 client")
					sf = &snapshot.File{
						Name:     f.Name(),
						NodeName: "s3",
						CreatedAt: &metav1.Time{
							Time: now,
						},
						Message:        base64.StdEncoding.EncodeToString([]byte(err.Error())),
						Size:           0,
						Status:         snapshot.FailedStatus,
						S3:             &snapshot.S3Config{EtcdS3: *e.config.EtcdS3},
						MetadataSource: extraMetadata,
					}
				}
			} else {
				logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
				// upload will return a snapshot.File even on error - if there was an
				// error, it will be reflected in the status and message.
				sf, err = s3client.Upload(ctx, snapshotPath, extraMetadata, now)
				metrics.ObserveWithStatus(snapshotmetrics.SaveS3Count, s3Start, err)
				if err != nil {
					logrus.Errorf("Error received during snapshot upload to S3: %s", err)
				} else {
					res.Created = append(res.Created, sf.Name)
					logrus.Infof("S3 upload complete for %s", snapshotName)
				}
				// Attempt to apply retention even if the upload failed; failure may be due to bucket
				// being full or some other condition that retention policy would resolve.
				// Snapshot retention may prune some files before returning an error. Failing to prune is not fatal.
				deleted, err := s3client.SnapshotRetention(ctx, e.config.EtcdSnapshotName)
				res.Deleted = append(res.Deleted, deleted...)
				if err != nil {
					logrus.Warnf("Failed to apply s3 snapshot retention policy: %v", err)
				}
			}
			// sf is either s3 snapshot metadata, or s3 init/upload failure record.
			// If this fails, just log an error - the snapshot file will remain on s3
			// and will be recorded next time the snapshot list is reconciled.
			if err := e.addSnapshotData(*sf); err != nil {
				logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err)
			}
		}
	}

	return res, nil
}

// listLocalSnapshots provides a list of the currently stored
// snapshots on disk along with their relevant
// metadata.
func (e *ETCD) listLocalSnapshots() (map[string]snapshot.File, error) {
	nodeName := os.Getenv("NODE_NAME")
	snapshots := make(map[string]snapshot.File)
	snapshotDir, err := snapshotDir(e.config, true)
	if err != nil {
		return snapshots, errors.WithMessage(err, "failed to get etcd-snapshot-dir")
	}

	if err := filepath.Walk(snapshotDir, func(path string, file os.FileInfo, err error) error {
		if err != nil || file.IsDir() {
			return err
		}

		basename, compressed := strings.CutSuffix(file.Name(), snapshot.CompressedExtension)
		ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
		if err != nil {
			ts = file.ModTime().Unix()
		}

		// try to read metadata from disk; don't warn if it is missing as it will not exist
		// for snapshot files from old releases or if there was no metadata provided.
		var metadata string
		metadataFile := filepath.Join(filepath.Dir(path), "..", snapshot.MetadataDir, file.Name())
		if m, err := os.ReadFile(metadataFile); err == nil {
			logrus.Debugf("Loading snapshot metadata from %s", metadataFile)
			metadata = base64.StdEncoding.EncodeToString(m)
		}

		sf := snapshot.File{
			Name:     file.Name(),
			Location: "file://" + filepath.Join(snapshotDir, file.Name()),
			NodeName: nodeName,
			Metadata: metadata,
			CreatedAt: &metav1.Time{
				Time: time.Unix(ts, 0),
			},
			Size:       file.Size(),
			Status:     snapshot.SuccessfulStatus,
			Compressed: compressed,
		}
		sfKey := sf.GenerateConfigMapKey()
		snapshots[sfKey] = sf
		return nil
	}); err != nil {
		return nil, err
	}

	return snapshots, nil
}

// getS3Client initializes the S3 controller if it hasn't yet been initialized.
// If S3 is or can be initialized successfully, and valid S3 configuration is
// present, a client for the current S3 configuration is returned.
// The context passed here is only used to validate the configuration,
// it does not need to continue to remain uncancelled after the call returns.
func (e *ETCD) getS3Client(ctx context.Context) (*s3.Client, error) {
	if e.s3 == nil {
		s3, err := s3.Start(ctx, e.config)
		if err != nil {
			return nil, err
		}
		e.s3 = s3
	}

	return e.s3.GetClient(ctx, e.config.EtcdS3)
}

// PruneSnapshots deleted old snapshots in excess of the configured retention count.
// Returns a list of deleted snapshots. Note that snapshots may be deleted
// with a non-nil error return.
func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, error) {
	snapshotDir, err := snapshotDir(e.config, false)
	if err != nil {
		return nil, errors.WithMessage(err, "failed to get etcd-snapshot-dir")
	}

	res := &managed.SnapshotResult{}
	// Note that snapshotRetention functions may return a list of deleted files, as well as
	// an error, if some snapshots are deleted before the error is encountered.
	res.Deleted, err = snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir)
	if err != nil {
		logrus.Errorf("Error applying snapshot retention policy: %v", err)
	}

	if e.config.EtcdS3 != nil {
		if s3client, err := e.getS3Client(ctx); err != nil {
			logrus.Warnf("Unable to initialize S3 client: %v", err)
		} else {
			deleted, err := s3client.SnapshotRetention(ctx, e.config.EtcdSnapshotName)
			if err != nil {
				logrus.Errorf("Error applying S3 snapshot retention policy: %v", err)
			}
			res.Deleted = append(res.Deleted, deleted...)
		}
	}
	return res, e.reconcileSnapshotData(ctx, res)
}

// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
// s3 snapshots are listed if s3 is enabled.
// Snapshots are listed locally, not listed from the apiserver, so results
// are guaranteed to be in sync with what is on disk.
func (e *ETCD) ListSnapshots(ctx context.Context) (*k3s.ETCDSnapshotFileList, error) {
	snapshotFiles := &k3s.ETCDSnapshotFileList{
		TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "List"},
	}

	if e.config.EtcdS3 != nil {
		if s3client, err := e.getS3Client(ctx); err != nil {
			logrus.Warnf("Unable to initialize S3 client: %v", err)
			if !errors.Is(err, s3.ErrNoConfigSecret) {
				return nil, errors.WithMessage(err, "failed to initialize S3 client")
			}
		} else {
			sfs, err := s3client.ListSnapshots(ctx)
			if err != nil {
				return nil, err
			}
			for k, sf := range sfs {
				esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
				sf.ToETCDSnapshotFile(esf)
				snapshotFiles.Items = append(snapshotFiles.Items, *esf)
			}
		}
	}

	sfs, err := e.listLocalSnapshots()
	if err != nil {
		return nil, err
	}
	for k, sf := range sfs {
		esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
		sf.ToETCDSnapshotFile(esf)
		snapshotFiles.Items = append(snapshotFiles.Items, *esf)
	}

	return snapshotFiles, nil
}

// DeleteSnapshots removes the given snapshots from local storage and S3.
// Returns a list of deleted snapshots. Note that snapshots may be deleted
// with a non-nil error return.
func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*managed.SnapshotResult, error) {
	snapshotDir, err := snapshotDir(e.config, false)
	if err != nil {
		return nil, errors.WithMessage(err, "failed to get etcd-snapshot-dir")
	}

	var s3client *s3.Client
	if e.config.EtcdS3 != nil {
		s3client, err = e.getS3Client(ctx)
		if err != nil {
			logrus.Warnf("Unable to initialize S3 client: %v", err)
			if !errors.Is(err, s3.ErrNoConfigSecret) {
				return nil, errors.WithMessage(err, "failed to initialize S3 client")
			}
		}
	}

	res := &managed.SnapshotResult{}
	for _, s := range snapshots {
		if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil {
			if snapshot.IsNotExist(err) {
				logrus.Infof("Snapshot %s not found locally", s)
			} else {
				logrus.Errorf("Failed to delete local snapshot %s: %v", s, err)
			}
		} else {
			res.Deleted = append(res.Deleted, s)
			logrus.Infof("Snapshot %s deleted locally", s)
		}

		if s3client != nil {
			if err := s3client.DeleteSnapshot(ctx, s); err != nil {
				if snapshot.IsNotExist(err) {
					logrus.Infof("Snapshot %s not found in S3", s)
				} else {
					logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err)
				}
			} else {
				res.Deleted = append(res.Deleted, s)
				logrus.Infof("Snapshot %s deleted from S3", s)
			}
		}
	}

	return res, e.reconcileSnapshotData(ctx, res)
}

func (e *ETCD) deleteSnapshot(snapshotPath string) error {
	dir := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir)
	filename := filepath.Base(snapshotPath)
	metadataPath := filepath.Join(dir, filename)

	err := os.Remove(snapshotPath)
	if err == nil || os.IsNotExist(err) {
		if merr := os.Remove(metadataPath); err != nil && !snapshot.IsNotExist(err) {
			err = merr
		}
	}

	return err
}

// addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource
// of the same name. Resources will be created or updated as necessary.
func (e *ETCD) addSnapshotData(sf snapshot.File) error {
	// make sure the K3s factory is initialized.
	for e.config.Runtime.K3s == nil {
		runtime.Gosched()
	}

	snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
	esfName := sf.GenerateName()

	var esf *k3s.ETCDSnapshotFile
	return retry.OnError(snapshotDataBackoff, func(err error) bool {
		return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
	}, func() (err error) {
		// Get current object or create new one
		esf, err = snapshots.Get(esfName, metav1.GetOptions{})
		if err != nil {
			if !apierrors.IsNotFound(err) {
				return err
			}
			esf = &k3s.ETCDSnapshotFile{
				ObjectMeta: metav1.ObjectMeta{
					Name: esfName,
				},
			}
		}

		// mutate object
		existing := esf.DeepCopyObject()
		sf.ToETCDSnapshotFile(esf)

		// create or update as necessary
		if esf.CreationTimestamp.IsZero() {
			var created *k3s.ETCDSnapshotFile
			created, err = snapshots.Create(esf)
			if err == nil {
				// Only emit an event for the snapshot when creating the resource
				e.emitEvent(created)
			}
		} else if !equality.Semantic.DeepEqual(existing, esf) {
			_, err = snapshots.Update(esf)
		}
		return err
	})
}

// generateETCDSnapshotFileConfigMapKey generates a key that the corresponding
// snapshotFile would be stored under in the legacy configmap
func generateETCDSnapshotFileConfigMapKey(esf k3s.ETCDSnapshotFile) string {
	name := snapshot.InvalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_")
	if esf.Spec.S3 != nil {
		return "s3-" + name
	}
	return "local-" + name
}

func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
	switch {
	case e.config.Runtime.Event == nil:
	case !esf.DeletionTimestamp.IsZero():
		e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotDeleted", "Snapshot %s deleted", esf.Spec.SnapshotName)
	case esf.Status.Error != nil:
		message := fmt.Sprintf("Failed to save snapshot %s on %s", esf.Spec.SnapshotName, esf.Spec.NodeName)
		if esf.Status.Error.Message != nil {
			message += ": " + *esf.Status.Error.Message
		}
		e.config.Runtime.Event.Event(esf, v1.EventTypeWarning, "ETCDSnapshotFailed", message)
	default:
		e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotCreated", "Snapshot %s saved on %s", esf.Spec.SnapshotName, esf.Spec.NodeName)
	}
}

// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3.
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
	return e.reconcileSnapshotData(ctx, nil)
}

// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
// the provided SnapshotResult are deleted, even if they are within a retention window.
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) (rerr error) {
	reconcileStart := time.Now()
	defer metrics.ObserveWithStatus(snapshotmetrics.ReconcileCount, reconcileStart, rerr)

	// make sure the core.Factory is initialized. There can
	// be a race between this core code startup.
	for e.config.Runtime.Core == nil {
		runtime.Gosched()
	}

	logrus.Infof("Reconciling ETCDSnapshotFile resources")
	defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete")

	// Get snapshots from local filesystem
	localStart := time.Now()
	snapshotFiles, err := e.listLocalSnapshots()
	metrics.ObserveWithStatus(snapshotmetrics.ReconcileLocalCount, localStart, err)
	if err != nil {
		return err
	}

	nodeNames := []string{os.Getenv("NODE_NAME")}

	// Get snapshots from S3
	if e.config.EtcdS3 != nil {
		s3Start := time.Now()
		if s3client, err := e.getS3Client(ctx); err != nil {
			logrus.Warnf("Unable to initialize S3 client: %v", err)
			if !errors.Is(err, s3.ErrNoConfigSecret) {
				metrics.ObserveWithStatus(snapshotmetrics.ReconcileS3Count, s3Start, err)
				return errors.WithMessage(err, "failed to initialize S3 client")
			}
		} else {
			s3Snapshots, err := s3client.ListSnapshots(ctx)
			metrics.ObserveWithStatus(snapshotmetrics.ReconcileS3Count, s3Start, err)
			if err != nil {
				logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
			} else {
				for k, v := range s3Snapshots {
					snapshotFiles[k] = v
				}
				nodeNames = append(nodeNames, "s3")
			}
		}
	}

	// Try to load metadata from the legacy configmap, in case any local or s3 snapshots
	// were created by an old release that does not write the metadata alongside the snapshot file.
	snapshotConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{})
	if err != nil && !apierrors.IsNotFound(err) {
		return err
	}

	if snapshotConfigMap != nil {
		for sfKey, sf := range snapshotFiles {
			logrus.Debugf("Found snapshotFile for %s with key %s", sf.Name, sfKey)
			// if the configmap has data for this snapshot, and local metadata is empty,
			// deserialize the value from the configmap and attempt to load iM.
			if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.MetadataSource == nil {
				sfTemp := &snapshot.File{}
				if err := json.Unmarshal([]byte(cmSnapshotValue), sfTemp); err != nil {
					logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err)
					continue
				}
				sf.Metadata = sfTemp.Metadata
				snapshotFiles[sfKey] = sf
			}
		}
	}

	labelSelector := &metav1.LabelSelector{
		MatchExpressions: []metav1.LabelSelectorRequirement{{
			Key:      snapshot.LabelStorageNode,
			Operator: metav1.LabelSelectorOpIn,
			Values:   nodeNames,
		}},
	}

	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
	if err != nil {
		return err
	}

	snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
	snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
	snapshotPager.PageSize = snapshotListPageSize
	now := time.Now().Round(time.Second)

	// List all snapshots matching the selector
	// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
	// If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes.
	// The one exception to the last rule is failed snapshots - these must be retained for a period of time.
	if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
		esf, ok := obj.(*k3s.ETCDSnapshotFile)
		if !ok {
			return errors.New("failed to convert object to ETCDSnapshotFile")
		}
		sfKey := generateETCDSnapshotFileConfigMapKey(*esf)
		logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
		if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
			// exists in both and names match, don't need to sync
			delete(snapshotFiles, sfKey)
		} else {
			// doesn't exist on disk/s3
			if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) {
				// snapshot has been intentionally deleted, skip checking for expiration
			} else if esf.Status.Error != nil && esf.Status.Error.Time != nil {
				expires := esf.Status.Error.Time.Add(errorTTL)
				if now.Before(expires) {
					// it's an error that hasn't expired yet, leave it
					return nil
				}
			} else if esf.Spec.S3 != nil {
				expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL)
				if now.Before(expires) {
					// it's an s3 snapshot that's only just been created, leave it to prevent a race condition
					// when multiple nodes are uploading snapshots at the same time.
					return nil
				}
			}
			if ok {
				logrus.Debugf("Name of ETCDSnapshotFile for snapshotFile with key %s does not match: %s vs %s", sfKey, sf.GenerateName(), esf.Name)
			} else {
				logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
			}
			// otherwise remove it
			logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
			if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
				logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
			}
		}
		return nil
	}); err != nil {
		return err
	}

	// Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created
	for _, sf := range snapshotFiles {
		logrus.Infof("Creating ETCDSnapshotFile for %s", sf.Name)
		if err := e.addSnapshotData(sf); err != nil {
			logrus.Errorf("Failed to create ETCDSnapshotFile: %v", err)
		}
	}

	// Agentless servers do not have a node. If we are running agentless, return early to avoid pruning
	// snapshots for nonexistent nodes and trying to patch the reconcile annotations on our node.
	if e.config.DisableAgent {
		return nil
	}

	// List all snapshots in Kubernetes not stored on S3 or a current etcd node.
	// These snapshots are local to a node that no longer runs etcd and cannot be restored.
	// If the node rejoins later and has local snapshots, it will reconcile them itself.
	labelSelector.MatchExpressions[0].Operator = metav1.LabelSelectorOpNotIn
	labelSelector.MatchExpressions[0].Values = []string{"s3"}

	// Get a list of all etcd nodes currently in the cluster and add them to the selector
	nodes := e.config.Runtime.Core.Core().V1().Node()
	etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"}
	nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()})
	if err != nil {
		return err
	}

	for _, node := range nodeList.Items {
		labelSelector.MatchExpressions[0].Values = append(labelSelector.MatchExpressions[0].Values, node.Name)
	}

	selector, err = metav1.LabelSelectorAsSelector(labelSelector)
	if err != nil {
		return err
	}

	// List and remove all snapshots stored on nodes that do not match the selector
	if err := snapshotPager.EachListItem(ctx, metav1.ListOptions{LabelSelector: selector.String()}, func(obj k8sruntime.Object) error {
		esf, ok := obj.(*k3s.ETCDSnapshotFile)
		if !ok {
			return errors.New("failed to convert object to ETCDSnapshotFile")
		}

		if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
			logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err)
		}
		return nil
	}); err != nil {
		return err
	}

	// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
	patch := util.NewPatchList().Add(now.Format(time.RFC3339), "metadata", "annotations", annotationLocalReconciled)
	patcher := util.NewPatcher[*v1.Node](nodes)
	if e.config.EtcdS3 != nil {
		patch.Add(now.Format(time.RFC3339), "metadata", "annotations", annotationS3Reconciled)
	}
	_, err = patcher.Patch(ctx, patch, nodeNames[0])
	return err
}

// setSnapshotFunction schedules snapshots at the configured interval.
func (e *ETCD) setSnapshotFunction(ctx context.Context) {
	skipJob := cron.SkipIfStillRunning(cronLogger)
	e.cron.AddJob(e.config.EtcdSnapshotCron, skipJob(cron.FuncJob(func() {
		// Add a small amount of jitter to the actual snapshot execution. On clusters with multiple servers,
		// having all the nodes take a snapshot at the exact same time can lead to excessive retry thrashing
		// when updating the snapshot list configmap.
		time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax)))
		if _, err := e.Snapshot(ctx); err != nil {
			logrus.Errorf("Failed to take scheduled snapshot: %v", err)
		}
	})))
}

// snapshotRetention iterates through the snapshots and removes the oldest
// leaving the desired number of snapshots. Returns a list of pruned snapshot names.
func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) ([]string, error) {
	if retention < 1 {
		return nil, nil
	}

	logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir)

	var snapshotFiles []snapshot.File
	if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
		if info.IsDir() || err != nil {
			return err
		}
		if strings.HasPrefix(info.Name(), snapshotPrefix) {
			basename, compressed := strings.CutSuffix(info.Name(), snapshot.CompressedExtension)
			ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
			if err != nil {
				ts = info.ModTime().Unix()
			}
			snapshotFiles = append(snapshotFiles, snapshot.File{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
		}
		return nil
	}); err != nil {
		return nil, err
	}
	if len(snapshotFiles) <= retention {
		return nil, nil
	}

	// sort newest-first so we can prune entries past the retention count
	sort.Slice(snapshotFiles, func(i, j int) bool {
		return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt)
	})

	deleted := []string{}
	for _, df := range snapshotFiles[retention:] {
		snapshotPath := filepath.Join(snapshotDir, df.Name)
		metadataPath := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, df.Name)
		logrus.Infof("Removing local snapshot %s", snapshotPath)
		if err := os.Remove(snapshotPath); err != nil {
			return deleted, err
		}
		if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) {
			return deleted, err
		}
		deleted = append(deleted, df.Name)
	}

	return deleted, nil
}

// saveSnapshotMetadata writes extra metadata to disk.
// The upload is silently skipped if no extra metadata is provided.
func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error {
	if extraMetadata == nil || len(extraMetadata.Data) == 0 {
		return nil
	}

	dir := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir)
	filename := filepath.Base(snapshotPath)
	metadataPath := filepath.Join(dir, filename)
	logrus.Infof("Saving snapshot metadata to %s", metadataPath)
	m, err := json.Marshal(extraMetadata.Data)
	if err != nil {
		return err
	}
	if err := os.MkdirAll(dir, 0700); err != nil {
		return err
	}
	return os.WriteFile(metadataPath, m, 0700)
}
