package s3

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"encoding/base64"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/textproto"
	"net/url"
	"os"
	"path"
	"path/filepath"
	"reflect"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/k3s-io/k3s/pkg/daemons/config"
	"github.com/k3s-io/k3s/pkg/etcd/snapshot"
	"github.com/k3s-io/k3s/pkg/util"
	"github.com/k3s-io/k3s/pkg/util/errors"
	"github.com/k3s-io/k3s/pkg/version"
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
	"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
	"github.com/sirupsen/logrus"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/wait"
)

var (
	clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id")
	tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash")
	nodeNameKey  = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name")
)

var defaultEtcdS3 = &config.EtcdS3{
	Endpoint: "s3.amazonaws.com",
	Region:   "us-east-1",
	Timeout: metav1.Duration{
		Duration: 5 * time.Minute,
	},
	Retention: 5,
}

var (
	controller *Controller
	cErr       error
	once       sync.Once
)

// Controller maintains state for S3 functionality,
// and can be used to get clients for interacting with
// an S3 service, given specific client configuration.
type Controller struct {
	clusterID   string
	tokenHash   string
	nodeName    string
	core        core.Interface
	clientCache *util.Cache[*Client]
}

// Client holds state for a given configuration - a preconfigured minio client,
// and reference to the config it was created for.
type Client struct {
	mc         *minio.Client
	etcdS3     *config.EtcdS3
	controller *Controller
}

// Start initializes the cache and sets the cluster id and token hash,
// returning a reference to the the initialized controller. Initialization is
// locked by a sync.Once to prevent races, and multiple calls to start will
// return the same controller or error.
func Start(ctx context.Context, config *config.Control) (*Controller, error) {
	once.Do(func() {
		c := &Controller{
			clientCache: util.NewCache[*Client](5),
			nodeName:    os.Getenv("NODE_NAME"),
		}

		if config.ClusterReset {
			logrus.Debug("Skip setting S3 snapshot cluster ID and server token hash during cluster-reset")
			controller = c
		} else {
			logrus.Debug("Getting S3 snapshot cluster ID and server token hash")
			if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
				if config.Runtime.Core == nil {
					return false, nil
				}
				c.core = config.Runtime.Core.Core()

				// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
				ns, err := c.core.V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{})
				if err != nil {
					return false, errors.WithMessage(err, "failed to set S3 snapshot cluster ID")
				}
				c.clusterID = string(ns.UID)

				tokenHash, err := util.GetTokenHash(config)
				if err != nil {
					return false, errors.WithMessage(err, "failed to set S3 snapshot server token hash")
				}
				c.tokenHash = tokenHash

				return true, nil
			}); err != nil {
				cErr = err
			} else {
				controller = c
			}
		}
	})

	return controller, cErr
}

func (c *Controller) GetClient(ctx context.Context, etcdS3 *config.EtcdS3) (*Client, error) {
	if etcdS3 == nil {
		return nil, errors.New("nil s3 configuration")
	}

	// update ConfigSecret in defaults so that comparisons between current and default config
	// ignore ConfigSecret when deciding if CLI configuration is present.
	defaultEtcdS3.ConfigSecret = etcdS3.ConfigSecret

	// If config is default, try to load config from secret, and fail if it cannot be retrieved or if the secret name is not set.
	// If config is not default, and secret name is set, warn that the secret is being ignored
	isDefault := reflect.DeepEqual(defaultEtcdS3, etcdS3)
	if etcdS3.ConfigSecret != "" {
		if isDefault {
			e, err := c.getConfigFromSecret(etcdS3.ConfigSecret)
			if err != nil {
				return nil, errors.WithMessagef(err, "failed to get config from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
			}
			logrus.Infof("Using etcd s3 configuration from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
			etcdS3 = e
		} else {
			logrus.Warnf("Ignoring s3 configuration from etcd-s3-config-secret %q due to existing configuration from CLI or config file", etcdS3.ConfigSecret)
		}
	} else if isDefault {
		return nil, errors.New("s3 configuration was not set")
	}

	// used just for logging
	scheme := "https://"
	if etcdS3.Insecure {
		scheme = "http://"
	}

	// Try to get an existing client from cache.  The entire EtcdS3 struct
	// (including the key id and secret) is used as the cache key, but we only
	// print the endpoint and bucket name to avoid leaking creds into the logs.
	if client, ok := c.clientCache.Get(*etcdS3); ok {
		logrus.Infof("Reusing cached S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)
		return client, nil
	}
	logrus.Infof("Attempting to create new S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)

	if etcdS3.Bucket == "" {
		return nil, errors.New("s3 bucket name was not set")
	}
	tr := http.DefaultTransport.(*http.Transport).Clone()

	// Set this value so that the underlying transport round-tripper
	// doesn't try to auto decode the body of objects with
	// content-encoding set to `gzip`.
	// Ref: https://github.com/minio/minio-go/pull/752
	tr.DisableCompression = true

	// You can either disable SSL verification or use a custom CA bundle,
	// it doesn't make sense to do both - if verification is disabled,
	// the CA is not checked!
	if etcdS3.SkipSSLVerify {
		tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
	} else if etcdS3.EndpointCA != "" {
		tlsConfig, err := loadEndpointCAs(etcdS3.EndpointCA)
		if err != nil {
			return nil, err
		}
		tr.TLSClientConfig = tlsConfig
	}

	// Set a fixed proxy URL, if requested by the user. This replaces the default,
	// which calls ProxyFromEnvironment to read proxy settings from the environment.
	if etcdS3.Proxy != "" {
		var u *url.URL
		var err error
		// proxy address of literal "none" disables all use of a proxy by S3
		if etcdS3.Proxy != "none" {
			u, err = url.Parse(etcdS3.Proxy)
			if err != nil {
				return nil, errors.WithMessage(err, "failed to parse etcd-s3-proxy value as URL")
			}
			if u.Scheme == "" || u.Host == "" {
				return nil, errors.New("proxy URL must include scheme and host")
			}
		}
		tr.Proxy = http.ProxyURL(u)
	}

	creds := credentials.NewChainCredentials([]credentials.Provider{
		&credentials.Static{
			Value: credentials.Value{
				AccessKeyID:     etcdS3.AccessKey,
				SecretAccessKey: etcdS3.SecretKey,
				SessionToken:    etcdS3.SessionToken,
				SignerType:      credentials.SignatureV4,
			},
		},
		&credentials.FileAWSCredentials{},
		&credentials.IAM{},
	})

	if _, err := creds.Get(); err != nil {
		return nil, errors.WithMessage(err, "failed to get credentials")
	}

	opt := minio.Options{
		Creds:        creds,
		Secure:       !etcdS3.Insecure,
		Region:       etcdS3.Region,
		Transport:    tr,
		BucketLookup: bucketLookupType(etcdS3.Endpoint, etcdS3.BucketLookup),
	}
	mc, err := minio.New(etcdS3.Endpoint, &opt)
	if err != nil {
		return nil, err
	}

	logrus.Infof("Checking if S3 bucket %s exists", etcdS3.Bucket)

	ctx, cancel := context.WithTimeout(ctx, etcdS3.Timeout.Duration)
	defer cancel()

	exists, err := mc.BucketExists(ctx, etcdS3.Bucket)
	if err != nil {
		return nil, errors.WithMessagef(err, "failed to test for existence of bucket %s", etcdS3.Bucket)
	}
	if !exists {
		return nil, fmt.Errorf("bucket %s does not exist", etcdS3.Bucket)
	}
	logrus.Infof("S3 bucket %s exists", etcdS3.Bucket)

	client := &Client{
		mc:         mc,
		etcdS3:     etcdS3,
		controller: c,
	}
	logrus.Infof("Adding S3 client to cache")
	c.clientCache.Add(*etcdS3, client)
	return client, nil
}

// upload uploads the given snapshot to the configured S3
// compatible backend.
func (c *Client) Upload(ctx context.Context, snapshotPath string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshot.File, error) {
	basename := filepath.Base(snapshotPath)
	metadata := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir, basename)
	snapshotKey := path.Join(c.etcdS3.Folder, basename)
	metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, basename)

	sf := &snapshot.File{
		Name:     basename,
		Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, snapshotKey),
		NodeName: "s3",
		CreatedAt: &metav1.Time{
			Time: now,
		},
		S3:             &snapshot.S3Config{EtcdS3: *c.etcdS3},
		Compressed:     strings.HasSuffix(snapshotPath, snapshot.CompressedExtension),
		MetadataSource: extraMetadata,
		NodeSource:     c.controller.nodeName,
	}

	logrus.Infof("Uploading snapshot to s3://%s/%s", c.etcdS3.Bucket, snapshotKey)
	uploadInfo, err := c.uploadSnapshot(ctx, snapshotKey, snapshotPath)
	if err != nil {
		sf.Status = snapshot.FailedStatus
		sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
	} else {
		sf.Status = snapshot.SuccessfulStatus
		sf.Size = uploadInfo.Size
		sf.TokenHash = c.controller.tokenHash
	}
	if uploadInfo, err := c.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil {
		logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err)
	} else if uploadInfo.Size != 0 {
		logrus.Infof("Uploaded snapshot metadata s3://%s/%s", c.etcdS3.Bucket, metadataKey)
	}
	return sf, err
}

// uploadSnapshot uploads the snapshot file to S3 using the minio API.
func (c *Client) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
	opts := minio.PutObjectOptions{
		NumThreads: 2,
		UserMetadata: map[string]string{
			clusterIDKey: c.controller.clusterID,
			nodeNameKey:  c.controller.nodeName,
			tokenHashKey: c.controller.tokenHash,
		},
	}
	if strings.HasSuffix(key, snapshot.CompressedExtension) {
		opts.ContentType = "application/zip"
	} else {
		opts.ContentType = "application/octet-stream"
	}
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()
	return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}

// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
// The upload is silently skipped if no extra metadata is provided.
func (c *Client) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
	if _, err := os.Stat(path); err != nil {
		if os.IsNotExist(err) {
			return minio.UploadInfo{}, nil
		}
		return minio.UploadInfo{}, err
	}

	opts := minio.PutObjectOptions{
		NumThreads:  2,
		ContentType: "application/json",
		UserMetadata: map[string]string{
			clusterIDKey: c.controller.clusterID,
			nodeNameKey:  c.controller.nodeName,
			tokenHashKey: c.controller.tokenHash,
		},
	}
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()
	return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}

// Download downloads the given snapshot from the configured S3
// compatible backend. If the file is successfully downloaded, it returns
// the path the file was downloaded to.
func (c *Client) Download(ctx context.Context, snapshotName, snapshotDir string) (string, error) {
	snapshotKey := path.Join(c.etcdS3.Folder, snapshotName)
	metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, snapshotName)
	snapshotFile := filepath.Join(snapshotDir, snapshotName)
	metadataFile := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, snapshotName)

	if err := c.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil {
		return "", err
	}
	if err := c.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil {
		return "", err
	}

	return snapshotFile, nil
}

// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func (c *Client) downloadSnapshot(ctx context.Context, key, file string) error {
	logrus.Debugf("Downloading snapshot from s3://%s/%s", c.etcdS3.Bucket, key)
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()
	defer os.Chmod(file, 0600)
	return c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
}

// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
// No error is returned if the metadata file does not exist, as it is optional.
func (c *Client) downloadSnapshotMetadata(ctx context.Context, key, file string) error {
	logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, key)
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()
	defer os.Chmod(file, 0600)
	err := c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
	if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
		return nil
	}
	return err
}

// SnapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
// Returns a list of pruned snapshot names.
func (c *Client) SnapshotRetention(ctx context.Context, prefix string) ([]string, error) {
	if c.etcdS3.Retention < 1 {
		return nil, nil
	}

	prefix = path.Join(c.etcdS3.Folder, prefix)
	logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", c.etcdS3.Retention, c.etcdS3.Bucket, prefix)

	var snapshotFiles []minio.ObjectInfo

	toCtx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()

	opts := minio.ListObjectsOptions{
		Prefix:    prefix,
		Recursive: true,
	}
	for info := range c.mc.ListObjects(toCtx, c.etcdS3.Bucket, opts) {
		if info.Err != nil {
			return nil, info.Err
		}

		// skip metadata
		if path.Base(path.Dir(info.Key)) == snapshot.MetadataDir {
			continue
		}

		snapshotFiles = append(snapshotFiles, info)
	}

	if len(snapshotFiles) <= c.etcdS3.Retention {
		return nil, nil
	}

	// sort newest-first so we can prune entries past the retention count
	sort.Slice(snapshotFiles, func(i, j int) bool {
		return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
	})

	deleted := []string{}
	for _, df := range snapshotFiles[c.etcdS3.Retention:] {
		logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)

		key := path.Base(df.Key)
		if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
			return deleted, err
		}
		deleted = append(deleted, key)
	}

	return deleted, nil
}

// DeleteSnapshot deletes the selected snapshot (and its metadata) from S3
func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()

	key = path.Join(c.etcdS3.Folder, key)
	_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
	if err == nil {
		if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
			return err
		}
	}

	// check for and try to delete the metadata regardless of whether or not the
	// snapshot existed, just to ensure that things are cleaned up in the case of
	// ephemeral errors. Metadata delete errors are only exposed if the object
	// exists and fails to delete.
	metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
	_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
	if merr == nil {
		if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
			return err
		}
	}

	// return error from snapshot StatObject call, so that callers can determine
	// if the object was actually deleted or not by checking for a NotFound error.
	return err
}

// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (c *Client) ListSnapshots(ctx context.Context) (map[string]snapshot.File, error) {
	snapshots := map[string]snapshot.File{}
	metadatas := []string{}
	ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
	defer cancel()

	opts := minio.ListObjectsOptions{
		Prefix:    c.etcdS3.Folder,
		Recursive: true,
	}

	objects := c.mc.ListObjects(ctx, c.etcdS3.Bucket, opts)

	for obj := range objects {
		if obj.Err != nil {
			return nil, obj.Err
		}
		if obj.Size == 0 {
			continue
		}

		if o, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, obj.Key, minio.StatObjectOptions{}); err != nil {
			logrus.Warnf("Failed to get object metadata: %v", err)
		} else {
			obj = o
		}

		filename := path.Base(obj.Key)
		if path.Base(path.Dir(obj.Key)) == snapshot.MetadataDir {
			metadatas = append(metadatas, obj.Key)
			continue
		}

		basename, compressed := strings.CutSuffix(filename, snapshot.CompressedExtension)
		ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
		if err != nil {
			ts = obj.LastModified.Unix()
		}

		sf := snapshot.File{
			Name:     filename,
			Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, obj.Key),
			NodeName: "s3",
			CreatedAt: &metav1.Time{
				Time: time.Unix(ts, 0),
			},
			Size:       obj.Size,
			S3:         &snapshot.S3Config{EtcdS3: *c.etcdS3},
			Status:     snapshot.SuccessfulStatus,
			Compressed: compressed,
			NodeSource: obj.UserMetadata[nodeNameKey],
			TokenHash:  obj.UserMetadata[tokenHashKey],
		}
		sfKey := sf.GenerateConfigMapKey()
		snapshots[sfKey] = sf
	}

	for _, metadataKey := range metadatas {
		filename := path.Base(metadataKey)
		dsf := &snapshot.File{Name: filename, NodeName: "s3"}
		sfKey := dsf.GenerateConfigMapKey()
		if sf, ok := snapshots[sfKey]; ok {
			logrus.Debugf("Loading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, metadataKey)
			if obj, err := c.mc.GetObject(ctx, c.etcdS3.Bucket, metadataKey, minio.GetObjectOptions{}); err != nil {
				if snapshot.IsNotExist(err) {
					logrus.Debugf("Failed to get snapshot metadata: %v", err)
				} else {
					logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err)
				}
			} else {
				if m, err := ioutil.ReadAll(obj); err != nil {
					if snapshot.IsNotExist(err) {
						logrus.Debugf("Failed to read snapshot metadata: %v", err)
					} else {
						logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err)
					}
				} else {
					sf.Metadata = base64.StdEncoding.EncodeToString(m)
					snapshots[sfKey] = sf
				}
			}
		}
	}

	return snapshots, nil
}

func loadEndpointCAs(etcdS3EndpointCA string) (*tls.Config, error) {
	var loaded bool
	certPool := x509.NewCertPool()

	for _, ca := range strings.Split(etcdS3EndpointCA, " ") {
		// Try to decode the value as base64-encoded data - yes, a base64 string that itself
		// contains multiline, ascii-armored, base64-encoded certificate data - as would be produced
		// by `base64 --wrap=0 /path/to/cert.pem`. If this fails, assume the value is the path to a
		// file on disk, and try to read that.  This is backwards compatible with RKE1.
		caData, err := base64.StdEncoding.DecodeString(ca)
		if err != nil {
			caData, err = os.ReadFile(ca)
		}
		if err != nil {
			return nil, err
		}
		if certPool.AppendCertsFromPEM(caData) {
			loaded = true
		}
	}

	if loaded {
		return &tls.Config{RootCAs: certPool}, nil
	}
	return nil, errors.New("no certificates loaded from etcd-s3-endpoint-ca")
}

func bucketLookupType(endpoint, lookupType string) minio.BucketLookupType {
	switch strings.ToLower(lookupType) {
	case "dns":
		return minio.BucketLookupDNS
	case "path":
		return minio.BucketLookupPath
	}

	if strings.Contains(endpoint, "aliyun") { // backwards compatible with RKE1
		return minio.BucketLookupDNS
	}
	return minio.BucketLookupAuto
}
