// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/http/httptest"
	"net/url"
	"os"
	"path/filepath"
	"slices"
	"sync"
	"testing"
	"time"

	"github.com/google/uuid"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"go.uber.org/zap"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/confmap"
	"go.opentelemetry.io/collector/confmap/provider/fileprovider"
	"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
	"go.opentelemetry.io/collector/consumer/consumertest"
	"go.opentelemetry.io/collector/exporter"
	"go.opentelemetry.io/collector/exporter/exportertest"
	"go.opentelemetry.io/collector/otelcol"
	"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
	"go.opentelemetry.io/collector/receiver"
	"go.opentelemetry.io/collector/receiver/otlpreceiver"
	"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
)

// TestInternalTelemetry_ServiceInstanceID verifies that the service.instance.id
// attribute is generated by default (unless overridden), and is is consistent
// across all internal telemetry providers.
func TestInternalTelemetry_ServiceInstanceID(t *testing.T) {
	type testcase struct {
		extraYamlConfig        string
		checkServiceInstanceID func(t *testing.T, serviceInstanceID string)
	}

	for name, tt := range map[string]testcase{
		"default": {
			checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
				// By default, service.instance.id should be a generated UUIDv4
				_, err := uuid.Parse(serviceInstanceID)
				require.NoError(t, err)
			},
		},
		"service.instance.id set in config": {
			extraYamlConfig: `
service:
  telemetry:
    resource:
      service.instance.id: "my-custom-instance-id"`,
			checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
				assert.Equal(t, "my-custom-instance-id", serviceInstanceID)
			},
		},
	} {
		t.Run(name, func(t *testing.T) {
			// Set up HTTP server to capture traces from collector's internal telemetry
			traceSink := new(consumertest.TracesSink)
			traceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
				body, err := io.ReadAll(r.Body)
				if err != nil {
					http.Error(w, err.Error(), http.StatusBadRequest)
					return
				}
				otlpReq := ptraceotlp.NewExportRequest()
				if err := otlpReq.UnmarshalProto(body); err != nil {
					http.Error(w, err.Error(), http.StatusBadRequest)
					return
				}
				_ = traceSink.ConsumeTraces(r.Context(), otlpReq.Traces())
			}))
			defer traceServer.Close()

			logSink := registerTestLogSink(t)

			// Create temporary directory for the config file
			tempdir := t.TempDir()
			configFile := filepath.Join(tempdir, "config.yaml")

			// Create YAML config
			otlphttpPort := getFreePort(t)
			metricsPort := getFreePort(t)
			require.NoError(t, os.WriteFile(configFile, []byte(fmt.Sprintf(`
receivers:
  otlp:
    protocols:
      http:
        endpoint: localhost:%s

exporters:
  nop:

service:
  telemetry:
    logs:
      level: info
      encoding: json
      output_paths: [%q]
    metrics:
      level: normal
      readers:
        - pull:
            exporter:
              prometheus:
                host: localhost
                port: %s
    traces:
      level: normal
      processors:
        - simple:
            exporter:
              otlp:
                protocol: http/protobuf
                endpoint: %s
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [nop]
`, otlphttpPort, logSink.url, metricsPort, traceServer.URL)[1:]), 0o600))

			// Create collector
			configURIs := []string{configFile}
			if tt.extraYamlConfig != "" {
				configURIs = append(configURIs, "yaml:"+tt.extraYamlConfig)
			}
			collector, err := otelcol.NewCollector(otelcol.CollectorSettings{
				BuildInfo: component.NewDefaultBuildInfo(),
				Factories: func() (otelcol.Factories, error) {
					otlpreceiverFactory := otlpreceiver.NewFactory()
					return otelcol.Factories{
						Receivers: map[component.Type]receiver.Factory{
							otlpreceiverFactory.Type(): otlpreceiverFactory,
						},
						Exporters: map[component.Type]exporter.Factory{
							nopType: exportertest.NewNopFactory(),
						},
						Telemetry: otelconftelemetry.NewFactory(),
					}, nil
				},
				ConfigProviderSettings: otelcol.ConfigProviderSettings{
					ResolverSettings: confmap.ResolverSettings{
						URIs: configURIs,
						ProviderFactories: []confmap.ProviderFactory{
							fileprovider.NewFactory(),
							yamlprovider.NewFactory(),
						},
					},
				},
			})
			require.NoError(t, err)

			// Start collector
			go func() {
				assert.NoError(t, collector.Run(t.Context()))
			}()
			waitMetricsReady(t, metricsPort)

			// Send some data through the pipeline to trigger internal telemetry
			err = sendTestTraces(otlphttpPort)
			require.NoError(t, err)

			// Capture service.instance.id from the Prometheus endpoint
			var metricInstanceID string
			parsed := readMetrics(t, metricsPort)
			targetInfo := parsed["target_info"]
			require.NotNil(t, targetInfo, "target_info metric not found")
			require.Len(t, targetInfo.Metric, 1)
			for _, label := range targetInfo.Metric[0].Label {
				if label.GetName() == "service_instance_id" {
					metricInstanceID = label.GetValue()
					break
				}
			}
			tt.checkServiceInstanceID(t, metricInstanceID)

			// Wait for traces, verify service.instance.id matches the one from metrics
			require.EventuallyWithT(t, func(t *assert.CollectT) {
				allTraces := traceSink.AllTraces()
				require.NotEmpty(t, allTraces)

				// Find service.instance.id in resource attributes
				for _, td := range allTraces {
					for i := 0; i < td.ResourceSpans().Len(); i++ {
						rs := td.ResourceSpans().At(i)
						if attr, ok := rs.Resource().Attributes().Get("service.instance.id"); ok {
							traceInstanceID := attr.AsString()
							require.Equal(t, metricInstanceID, traceInstanceID)
						}
					}
				}
			}, 10*time.Second, 500*time.Millisecond)

			// Check service.instance.id in logs matches the one from metrics
			var logsCount int
			logContent := logSink.Bytes()
			for line := range bytes.Lines(bytes.TrimSpace(logContent)) {
				var logEntry map[string]any
				if err := json.Unmarshal(line, &logEntry); err != nil {
					continue
				}
				// Check for resource field with service.instance.id
				// Resource attributes are nested under "resource" key as a dictionary
				resource, ok := logEntry["resource"].(map[string]any)
				require.True(t, ok, "log entry should have resource field")
				logInstanceID, ok := resource["service.instance.id"].(string)
				require.True(t, ok, "resource should have service.instance.id")
				require.Equal(t, metricInstanceID, logInstanceID)
				logsCount++
			}
			assert.NotZero(t, logsCount)
		})
	}
}

// Test-specific zap sink to capture logs as close as possible to logs being written to file.
// The reason we don't actually write to files is because Zap provides no way of closing file
// sinks created by zap.Config.Build.

var (
	testSinksMu sync.Mutex
	testSinks   = make(map[string]*testSink)
)

type testSink struct {
	url string

	mu  sync.RWMutex
	buf bytes.Buffer
}

func (s *testSink) Write(p []byte) (n int, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.buf.Write(p)
}

func (s *testSink) Bytes() []byte {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return slices.Clone(s.buf.Bytes())
}

func (*testSink) Sync() error {
	return nil
}

func (*testSink) Close() error {
	return nil
}

func registerTestLogSink(tb testing.TB) *testSink {
	sink := &testSink{}
	sink.url = fmt.Sprintf("test://%s.%p", tb.Name(), sink)

	testSinksMu.Lock()
	defer testSinksMu.Unlock()

	testSinks[sink.url] = sink
	tb.Cleanup(func() {
		testSinksMu.Lock()
		defer testSinksMu.Unlock()
		delete(testSinks, sink.url)
	})
	return sink
}

func init() {
	if err := zap.RegisterSink("test", func(u *url.URL) (zap.Sink, error) {
		testSinksMu.Lock()
		defer testSinksMu.Unlock()
		sink, ok := testSinks[u.String()]
		if !ok {
			return nil, fmt.Errorf("no test sink registered for URL %q", u.String())
		}
		return sink, nil
	}); err != nil {
		log.Fatal(err)
	}
}
