// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
	"bytes"
	"context"
	"io"
	"net"
	"net/http"
	"net/http/httptest"
	"strconv"
	"testing"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/status"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/component/componenttest"
	"go.opentelemetry.io/collector/config/configgrpc"
	"go.opentelemetry.io/collector/config/confighttp"
	"go.opentelemetry.io/collector/config/confignet"
	"go.opentelemetry.io/collector/config/configoptional"
	"go.opentelemetry.io/collector/config/configtls"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/exporter/exporterhelper"
	"go.opentelemetry.io/collector/exporter/exportertest"
	"go.opentelemetry.io/collector/exporter/otlpexporter"
	"go.opentelemetry.io/collector/exporter/otlphttpexporter"
	"go.opentelemetry.io/collector/internal/testutil"
	"go.opentelemetry.io/collector/pdata/plog"
	"go.opentelemetry.io/collector/pdata/plog/plogotlp"
	"go.opentelemetry.io/collector/pdata/testdata"
	"go.opentelemetry.io/collector/receiver"
	"go.opentelemetry.io/collector/receiver/otlpreceiver"
)

var _ plogotlp.GRPCServer = &logsServer{}

type logsServer struct {
	plogotlp.UnimplementedGRPCServer

	exportError error
}

func (r *logsServer) Export(_ context.Context, _ plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
	return plogotlp.NewExportResponse(), r.exportError
}

func TestGRPCToGRPC(t *testing.T) {
	// gRPC supports 17 different status codes.
	// Source: https://github.com/grpc/grpc/blob/41788c90bc66caf29f28ef808d066db806389792/doc/statuscodes.md
	for i := range uint32(16) {
		s := status.New(codes.Code(i), "Testing error")
		t.Run("Code "+s.Code().String(), func(t *testing.T) {
			e := createGRPCExporter(t, s)
			assertOnGRPCCode(t, e, s)
		})
	}
}

func TestHTTPToGRPC(t *testing.T) {
	testCases := []struct {
		grpc codes.Code
		http int
	}{
		{codes.OK, http.StatusOK},
		{codes.Canceled, http.StatusServiceUnavailable},
		{codes.DeadlineExceeded, http.StatusServiceUnavailable},
		{codes.Aborted, http.StatusServiceUnavailable},
		{codes.OutOfRange, http.StatusServiceUnavailable},
		{codes.Unavailable, http.StatusServiceUnavailable},
		{codes.DataLoss, http.StatusServiceUnavailable},
		{codes.ResourceExhausted, http.StatusTooManyRequests},
		{codes.InvalidArgument, http.StatusBadRequest},
		{codes.Unauthenticated, http.StatusUnauthorized},
		{codes.PermissionDenied, http.StatusForbidden},
		{codes.Unimplemented, http.StatusNotFound},
	}

	for _, tt := range testCases {
		s := status.New(tt.grpc, "Testing error")
		t.Run("Code "+s.Code().String(), func(t *testing.T) {
			e := createGRPCExporter(t, s)
			assertOnHTTPCode(t, e, tt.http)
		})
	}
}

func TestGRPCToHTTP(t *testing.T) {
	testCases := []struct {
		http int
		grpc codes.Code
	}{
		{http.StatusOK, codes.OK},
		{http.StatusBadRequest, codes.InvalidArgument},
		{http.StatusUnauthorized, codes.Unauthenticated},
		{http.StatusForbidden, codes.PermissionDenied},
		{http.StatusNotFound, codes.Unimplemented},
		{http.StatusTooManyRequests, codes.ResourceExhausted},
		{http.StatusBadGateway, codes.Unavailable},
		{http.StatusServiceUnavailable, codes.Unavailable},
		{http.StatusGatewayTimeout, codes.Unavailable},
	}

	for _, tt := range testCases {
		s := status.New(tt.grpc, "Testing error")
		t.Run("Code "+s.Code().String(), func(t *testing.T) {
			e := createHTTPExporter(t, tt.http)
			assertOnGRPCCode(t, e, s)
		})
	}
}

func TestHTTPToHTTP(t *testing.T) {
	testCases := []struct {
		code    int
		mapping int
	}{
		{code: http.StatusOK},
		{code: http.StatusServiceUnavailable},
		{code: http.StatusTooManyRequests},
		{code: http.StatusBadRequest},
		{code: http.StatusUnauthorized},
		{code: http.StatusForbidden},
		{code: http.StatusNotFound},
		{code: http.StatusInternalServerError},
		// Mappings won't be necessary once the OTLP/HTTP Exporter returns consumererror.Error types.
		{code: http.StatusBadGateway, mapping: http.StatusServiceUnavailable},
		{code: http.StatusGatewayTimeout, mapping: http.StatusServiceUnavailable},
		{code: http.StatusTeapot, mapping: http.StatusInternalServerError},
		{code: http.StatusConflict, mapping: http.StatusInternalServerError},
	}

	for _, tt := range testCases {
		t.Run("Code "+strconv.Itoa(tt.code), func(t *testing.T) {
			e := createHTTPExporter(t, tt.code)
			code := tt.code
			if tt.mapping != 0 {
				code = tt.mapping
			}
			assertOnHTTPCode(t, e, code)
		})
	}
}

func createGRPCExporter(t *testing.T, s *status.Status) consumer.Logs {
	t.Helper()

	ln, err := net.Listen("tcp", "localhost:")
	require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)

	srv := grpc.NewServer()
	rcv := &logsServer{
		exportError: s.Err(),
	}

	plogotlp.RegisterGRPCServer(srv, rcv)
	go func() {
		assert.NoError(t, srv.Serve(ln))
	}()
	t.Cleanup(func() {
		srv.Stop()
	})

	f := otlpexporter.NewFactory()
	cfg := f.CreateDefaultConfig().(*otlpexporter.Config)
	cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]()
	cfg.RetryConfig.Enabled = false
	cfg.ClientConfig = configgrpc.ClientConfig{
		Endpoint: ln.Addr().String(),
		TLS: configtls.ClientConfig{
			Insecure: true,
		},
	}
	e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlp")), cfg)
	require.NoError(t, err)
	err = e.Start(context.Background(), componenttest.NewNopHost())
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, e.Shutdown(context.Background()))
	})

	return e
}

func createHTTPExporter(t *testing.T, code int) consumer.Logs {
	t.Helper()

	mux := http.NewServeMux()
	mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) {
		writer.WriteHeader(code)
	})

	srv := httptest.NewServer(mux)
	t.Cleanup(func() {
		srv.Close()
	})

	f := otlphttpexporter.NewFactory()
	cfg := f.CreateDefaultConfig().(*otlphttpexporter.Config)
	cfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]()
	cfg.RetryConfig.Enabled = false
	cfg.Encoding = otlphttpexporter.EncodingProto
	cfg.LogsEndpoint = srv.URL + "/v1/logs"
	e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlp_http")), cfg)
	require.NoError(t, err)
	err = e.Start(context.Background(), componenttest.NewNopHost())
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, e.Shutdown(context.Background()))
	})

	return e
}

func assertOnGRPCCode(t *testing.T, l consumer.Logs, s *status.Status) {
	t.Helper()

	rf := otlpreceiver.NewFactory()
	rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
	rcfg.Protocols.GRPC = configoptional.Some(
		configgrpc.ServerConfig{
			NetAddr: confignet.AddrConfig{
				Endpoint:  testutil.GetAvailableLocalAddress(t),
				Transport: confignet.TransportTypeTCP,
			},
		},
	)
	r, err := rf.CreateLogs(context.Background(), receiver.Settings{
		ID:                component.MustNewID("otlp"),
		TelemetrySettings: componenttest.NewNopTelemetrySettings(),
	}, rcfg, l)
	require.NoError(t, err)
	err = r.Start(context.Background(), componenttest.NewNopHost())
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, r.Shutdown(context.Background()))
	})

	conn, err := grpc.NewClient(rcfg.Protocols.GRPC.Get().NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, conn.Close())
	})

	ld := testdata.GenerateLogs(2)

	acc := plogotlp.NewGRPCClient(conn)
	req := plogotlp.NewExportRequestFromLogs(ld)
	res, err := acc.Export(context.Background(), req)

	if s.Code() == codes.OK {
		require.NoError(t, err)
	} else {
		got := status.Convert(err).Code()
		require.Equal(t, s.Code(), got, "Expected code %s but got %s", s.Code().String(), got.String())
	}
	require.NotNil(t, res)
}

func assertOnHTTPCode(t *testing.T, l consumer.Logs, code int) {
	t.Helper()

	ld := testdata.GenerateLogs(2)
	protoMarshaler := &plog.ProtoMarshaler{}
	logProto, err := protoMarshaler.MarshalLogs(ld)
	require.NoError(t, err)

	addr := testutil.GetAvailableLocalAddress(t)
	rf := otlpreceiver.NewFactory()
	rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
	rcfg.Protocols.HTTP = configoptional.Some(
		otlpreceiver.HTTPConfig{
			ServerConfig: confighttp.ServerConfig{
				NetAddr: confignet.AddrConfig{
					Endpoint:  addr,
					Transport: confignet.TransportTypeTCP,
				},
			},
			LogsURLPath: "/v1/logs",
		},
	)
	r, err := rf.CreateLogs(context.Background(), receiver.Settings{
		ID:                component.MustNewID("otlp"),
		TelemetrySettings: componenttest.NewNopTelemetrySettings(),
	}, rcfg, l)
	require.NoError(t, err)
	err = r.Start(context.Background(), componenttest.NewNopHost())
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, r.Shutdown(context.Background()))
	})

	doHTTPRequest(t, addr+"/v1/logs", logProto, code)
}

func doHTTPRequest(
	t *testing.T,
	url string,
	data []byte,
	expectStatusCode int,
) []byte {
	req := createHTTPRequest(t, url, data)
	resp, err := http.DefaultClient.Do(req)
	require.NoError(t, err)

	respBytes, err := io.ReadAll(resp.Body)
	require.NoError(t, err)

	require.NoError(t, resp.Body.Close())

	if expectStatusCode == 0 {
		require.Equal(t, http.StatusOK, resp.StatusCode)
	} else {
		require.Equal(t, expectStatusCode, resp.StatusCode)
	}

	return respBytes
}

func createHTTPRequest(
	t *testing.T,
	url string,
	data []byte,
) *http.Request {
	buf := bytes.NewBuffer(data)

	req, err := http.NewRequest(http.MethodPost, "http://"+url, buf)
	require.NoError(t, err)
	req.Header.Set("Content-Type", "application/x-protobuf")

	return req
}
