//go:build !no_embedded_executor

package embed

import (
	"context"
	"flag"
	"fmt"
	"net"
	"net/http"
	"os/exec"
	"path/filepath"
	"runtime/debug"
	"strconv"
	"sync"
	"time"

	"github.com/k3s-io/k3s/pkg/agent/containerd"
	"github.com/k3s-io/k3s/pkg/agent/cri"
	"github.com/k3s-io/k3s/pkg/agent/cridockerd"
	"github.com/k3s-io/k3s/pkg/agent/flannel"
	"github.com/k3s-io/k3s/pkg/agent/netpol"
	"github.com/k3s-io/k3s/pkg/cli/cmds"
	daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
	"github.com/k3s-io/k3s/pkg/daemons/executor"
	"github.com/k3s-io/k3s/pkg/executor/embed/etcd"
	"github.com/k3s-io/k3s/pkg/signals"
	"github.com/k3s-io/k3s/pkg/util"
	"github.com/k3s-io/k3s/pkg/util/errors"
	"github.com/k3s-io/k3s/pkg/version"
	"github.com/k3s-io/k3s/pkg/vpn"
	"github.com/sirupsen/logrus"
	"k8s.io/apiserver/pkg/authentication/authenticator"
	cloudprovider "k8s.io/cloud-provider"
	ccmapp "k8s.io/cloud-provider/app"
	cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
	"k8s.io/cloud-provider/names"
	ccmopt "k8s.io/cloud-provider/options"
	cliflag "k8s.io/component-base/cli/flag"
	"k8s.io/klog/v2"
	apiapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
	cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
	proxy "k8s.io/kubernetes/cmd/kube-proxy/app"
	sapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
	kubelet "k8s.io/kubernetes/cmd/kubelet/app"
	utilsnet "k8s.io/utils/net"

	// registering k3s cloud provider
	_ "github.com/k3s-io/k3s/pkg/cloudprovider"
)

var once sync.Once

func init() {
	executor.Set(&Embedded{})
}

// explicit type check
var _ executor.Executor = &Embedded{}

type Embedded struct {
	apiServerReady <-chan struct{}
	etcdReady      chan struct{}
	criReady       chan struct{}
	nodeConfig     *daemonconfig.Node
}

func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
	e.apiServerReady = util.APIServerReadyChan(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout)
	e.etcdReady = make(chan struct{})
	e.criReady = make(chan struct{})
	e.nodeConfig = nodeConfig

	go once.Do(func() {
		// Ensure that the log verbosity remains set to the configured level by resetting it at 1-second intervals
		// for the first 2 minutes that K3s is starting up. This is necessary because each of the Kubernetes
		// components will initialize klog and reset the verbosity flag when they are starting.
		logCtx, cancel := context.WithTimeout(ctx, time.Second*120)
		defer cancel()

		klog.InitFlags(nil)
		for {
			flag.Set("v", strconv.Itoa(cmds.LogConfig.VLevel))

			select {
			case <-time.After(time.Second):
			case <-logCtx.Done():
				return
			}
		}
	})

	if nodeConfig.Flannel.Backend != flannel.BackendNone {
		var err error

		if len(cfg.FlannelIface) > 0 {
			nodeConfig.Flannel.Iface, err = net.InterfaceByName(cfg.FlannelIface)
			if err != nil {
				return errors.WithMessagef(err, "unable to find interface %s", cfg.FlannelIface)
			}
		}

		// If there is a VPN, we must overwrite NodeIP and flannel interface
		var vpnInfo *vpn.Info
		if cfg.VPNAuth != "" {
			vpnInfo, err = vpn.GetInfo(cfg.VPNAuth)
			if err != nil {
				return err
			}

			// Pass ipv4, ipv6 or both depending on nodeIPs mode
			nodeIPs := nodeConfig.AgentConfig.NodeIPs
			var vpnIPs []net.IP
			if utilsnet.IsIPv4(nodeIPs[0]) && vpnInfo.IPv4Address != nil {
				vpnIPs = append(vpnIPs, vpnInfo.IPv4Address)
				if vpnInfo.IPv6Address != nil {
					vpnIPs = append(vpnIPs, vpnInfo.IPv6Address)
				}
			} else if utilsnet.IsIPv6(nodeIPs[0]) && vpnInfo.IPv6Address != nil {
				vpnIPs = append(vpnIPs, vpnInfo.IPv6Address)
				if vpnInfo.IPv4Address != nil {
					vpnIPs = append(vpnIPs, vpnInfo.IPv4Address)
				}
			} else {
				return fmt.Errorf("address family mismatch when assigning VPN addresses to node: node=%v, VPN ipv4=%v ipv6=%v", nodeIPs, vpnInfo.IPv4Address, vpnInfo.IPv6Address)
			}

			// Overwrite nodeip and flannel interface and throw a warning if user explicitly set those parameters
			if len(vpnIPs) != 0 {
				logrus.Infof("Node-ip changed to %v due to VPN", vpnIPs)
				if len(cfg.NodeIP.Value()) != 0 {
					logrus.Warn("VPN provider overrides configured node-ip parameter")
				}
				if len(cfg.NodeExternalIP.Value()) != 0 {
					logrus.Warn("VPN provider overrides node-external-ip parameter")
				}
				nodeIPs = vpnIPs
				nodeConfig.AgentConfig.NodeIPs = vpnIPs
				nodeConfig.AgentConfig.NodeIP = vpnIPs[0].String()
				nodeConfig.Flannel.Iface, err = net.InterfaceByName(vpnInfo.Interface)
				if err != nil {
					return errors.WithMessagef(err, "unable to find vpn interface: %s", vpnInfo.Interface)
				}
			}
		}

		// set paths for embedded flannel if enabled
		hostLocal, err := exec.LookPath("host-local")
		if err != nil {
			return errors.WithMessagef(err, "failed to find host-local")
		}

		if cfg.FlannelConf == "" {
			nodeConfig.Flannel.ConfFile = filepath.Join(cfg.DataDir, "agent", "etc", "flannel", "net-conf.json")
		} else {
			nodeConfig.Flannel.ConfFile = cfg.FlannelConf
			nodeConfig.Flannel.ConfOverride = true
		}
		nodeConfig.AgentConfig.CNIBinDir = filepath.Dir(hostLocal)
		nodeConfig.AgentConfig.CNIConfDir = filepath.Join(cfg.DataDir, "agent", "etc", "cni", "net.d")

		// It does not make sense to use VPN without its flannel backend
		if cfg.VPNAuth != "" {
			nodeConfig.Flannel.Backend = vpnInfo.ProviderName
		}
	}

	return nil
}

func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
	command := kubelet.NewKubeletCommand(context.Background())
	command.SetArgs(args)

	go func() {
		<-e.APIServerReadyChan()
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "kubelet exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (e *Embedded) KubeProxy(ctx context.Context, args []string) error {
	command := proxy.NewProxyCommand()
	command.SetArgs(util.GetArgs(platformKubeProxyArgs(e.nodeConfig), args))

	go func() {
		<-e.APIServerReadyChan()
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "kube-proxy exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (*Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
	startupConfig := <-apiapp.StartupConfig
	return startupConfig.Authenticator, startupConfig.Handler, nil
}

func (e *Embedded) APIServer(ctx context.Context, args []string) error {
	command := apiapp.NewAPIServerCommand(ctx.Done())
	command.SetArgs(args)

	go func() {
		<-e.ETCDReadyChan()
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("apiserver panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "apiserver exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (e *Embedded) Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
	command := sapp.NewSchedulerCommand(ctx.Done())
	command.SetArgs(args)

	go func() {
		<-e.APIServerReadyChan()
		<-nodeReady
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "scheduler exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (e *Embedded) ControllerManager(ctx context.Context, args []string) error {
	command := cmapp.NewControllerManagerCommand()
	command.SetArgs(args)

	go func() {
		<-e.APIServerReadyChan()
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "controller-manager exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
	ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
	if err != nil {
		logrus.Fatalf("unable to initialize command options: %v", err)
	}

	cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface {
		cloud, err := cloudprovider.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
		if err != nil {
			logrus.Fatalf("Cloud provider could not be initialized: %v", err)
		}
		if cloud == nil {
			logrus.Fatalf("Cloud provider is nil")
		}
		return cloud
	}

	controllerAliases := names.CCMControllerAliases()

	command := ccmapp.NewCloudControllerManagerCommand(
		ccmOptions,
		cloudInitializer,
		ccmapp.DefaultInitFuncConstructors,
		controllerAliases,
		cliflag.NamedFlagSets{},
		ctx.Done())
	command.SetArgs(args)

	go func() {
		<-ccmRBACReady
		defer func() {
			if err := recover(); err != nil {
				logrus.WithField("stack", string(debug.Stack())).Fatalf("cloud-controller-manager panic: %v", err)
			}
		}()
		err := command.ExecuteContext(ctx)
		if err != nil && !errors.Is(err, context.Canceled) {
			signals.RequestShutdown(errors.WithMessage(err, "cloud-controller-manager exited"))
		}
		signals.RequestShutdown(nil)
	}()

	return nil
}

func (e *Embedded) CurrentETCDOptions() (executor.InitialOptions, error) {
	return executor.InitialOptions{}, nil
}

func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error {
	// Start a goroutine to call the provided test function until it returns true.
	// The test function is reponsible for ensuring that the etcd server is up
	// and ready to accept client requests.
	if e.etcdReady != nil {
		go func() {
			for {
				if err := test(ctx, true); err != nil {
					logrus.Infof("Failed to test etcd connection: %v", err)
				} else {
					logrus.Info("Connection to etcd is ready")
					close(e.etcdReady)
					return
				}

				select {
				case <-time.After(5 * time.Second):
				case <-ctx.Done():
					return
				}
			}
		}()
	}
	return etcd.StartETCD(ctx, wg, args, extraArgs)
}

func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error {
	return executor.CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
}

func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
	return executor.CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
}

func (e *Embedded) CRI(ctx context.Context, cfg *daemonconfig.Node) error {
	// agentless sets cri socket path to /dev/null to indicate no CRI is needed
	if cfg.ContainerRuntimeEndpoint != "/dev/null" {
		return executor.CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
	}
	return executor.CloseIfNilErr(nil, e.criReady)
}

func (e *Embedded) CNI(ctx context.Context, wg *sync.WaitGroup, cfg *daemonconfig.Node) error {
	if cfg.Flannel.Backend != flannel.BackendNone {
		if (cfg.Flannel.ExternalIP) && (len(cfg.AgentConfig.NodeExternalIPs) == 0) {
			logrus.Warnf("Server has flannel-external-ip flag set but this node does not set node-external-ip. Flannel will use internal address when connecting to this node.")
		} else if (cfg.Flannel.ExternalIP) && (cfg.Flannel.Backend != flannel.BackendWireguardNative) {
			logrus.Warnf("Flannel is using external addresses with an insecure backend: %v. Please consider using an encrypting flannel backend.", cfg.Flannel.Backend)
		}
		if err := flannel.Prepare(ctx, cfg); err != nil {
			return err
		}

		if err := flannel.Run(ctx, wg, cfg); err != nil {
			return err
		}
	}

	if !cfg.AgentConfig.DisableNPC {
		if err := netpol.Run(ctx, wg, cfg); err != nil {
			return err
		}
	}

	return nil
}

func (e *Embedded) APIServerReadyChan() <-chan struct{} {
	if e.apiServerReady == nil {
		panic("executor not bootstrapped")
	}
	return e.apiServerReady
}

func (e *Embedded) ETCDReadyChan() <-chan struct{} {
	if e.etcdReady == nil {
		panic("executor not bootstrapped")
	}
	return e.etcdReady
}

func (e *Embedded) CRIReadyChan() <-chan struct{} {
	if e.criReady == nil {
		panic("executor not bootstrapped")
	}
	return e.criReady
}

func (e Embedded) IsSelfHosted() bool {
	return false
}
