From 5b4569d3de0cc33ab1ea83716b89c8ae4142c10a Mon Sep 17 00:00:00 2001 From: kpango Date: Mon, 11 Nov 2024 16:43:43 +0900 Subject: [PATCH] Add QUIC support Signed-off-by: kpango --- .gitfiles | 2 + .../vald-helm-operator/crds/valdrelease.yaml | 120 ++++++++++ charts/vald/README.md | 26 ++- charts/vald/values.schema.json | 220 +++++++++++++----- charts/vald/values.yaml | 33 ++- example/client/go.mod | 2 +- example/client/go.mod.default | 2 +- go.mod | 8 +- go.sum | 9 +- hack/go.mod.default | 2 +- .../client/v1/client/discoverer/discover.go | 134 ++++++----- internal/config/grpc.go | 6 +- internal/config/net.go | 2 + internal/errors/net.go | 6 + internal/net/dialer.go | 66 ++++-- internal/net/dialer_test.go | 6 +- internal/net/grpc/option.go | 16 +- internal/net/net.go | 31 +++ internal/net/net_test.go | 2 +- internal/net/quic/conn.go | 170 ++++++++++++++ internal/net/quic/listener.go | 82 +++++++ internal/servers/server/server.go | 68 ++++-- internal/servers/servers.go | 15 +- k8s/discoverer/configmap.yaml | 1 + k8s/discoverer/deployment.yaml | 2 +- k8s/gateway/gateway/lb/configmap.yaml | 2 + k8s/gateway/gateway/lb/deployment.yaml | 2 +- k8s/gateway/gateway/mirror/configmap.yaml | 2 +- k8s/gateway/gateway/mirror/deployment.yaml | 2 +- k8s/index/job/correction/configmap.yaml | 2 + k8s/index/job/creation/configmap.yaml | 1 + k8s/index/job/save/configmap.yaml | 1 + k8s/manager/index/configmap.yaml | 1 + k8s/manager/index/deployment.yaml | 2 +- k8s/operator/helm/crds/valdrelease.yaml | 120 ++++++++++ pkg/discoverer/k8s/service/discover.go | 78 ++++--- pkg/index/job/correction/usecase/corrector.go | 5 - pkg/manager/index/service/indexer.go | 6 +- pkg/manager/index/service/indexer_test.go | 18 +- tests/e2e/crud/crud_test.go | 38 ++- tests/e2e/operation/operation.go | 12 + 41 files changed, 1067 insertions(+), 256 deletions(-) create mode 100644 internal/net/quic/conn.go create mode 100644 internal/net/quic/listener.go diff --git a/.gitfiles b/.gitfiles index d24c17dd843..6ca9d915d79 100644 --- a/.gitfiles +++ b/.gitfiles @@ -1251,6 +1251,8 @@ internal/net/net.go internal/net/net_test.go internal/net/option.go internal/net/option_test.go +internal/net/quic/conn.go +internal/net/quic/listener.go internal/observability/attribute/attribute.go internal/observability/attribute/attribute_test.go internal/observability/exporter/exporter.go diff --git a/charts/vald-helm-operator/crds/valdrelease.yaml b/charts/vald-helm-operator/crds/valdrelease.yaml index 3469a37af50..024f1dc95d0 100644 --- a/charts/vald-helm-operator/crds/valdrelease.yaml +++ b/charts/vald-helm-operator/crds/valdrelease.yaml @@ -1273,6 +1273,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -2295,6 +2301,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -3171,6 +3183,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4278,6 +4296,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4477,6 +4501,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4671,6 +4701,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -5783,6 +5819,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -5974,6 +6016,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -6167,6 +6215,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -7278,6 +7332,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -7376,6 +7436,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8444,6 +8510,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8635,6 +8707,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8835,6 +8913,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -9747,6 +9831,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -9938,6 +10028,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -10888,6 +10984,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -11079,6 +11181,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -13011,6 +13119,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -13202,6 +13316,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: diff --git a/charts/vald/README.md b/charts/vald/README.md index ba5352746d1..34b45ab451c 100644 --- a/charts/vald/README.md +++ b/charts/vald/README.md @@ -201,9 +201,10 @@ Run the following command to install the chart, | agent.sidecar.config.client.net.dialer.dual_stack_enabled | bool | `false` | HTTP client TCP dialer dual stack enabled | | agent.sidecar.config.client.net.dialer.keepalive | string | `"5m"` | HTTP client TCP dialer keep alive | | agent.sidecar.config.client.net.dialer.timeout | string | `"5s"` | HTTP client TCP dialer connect timeout | -| agent.sidecar.config.client.net.dns.cache_enabled | bool | `true` | HTTP client TCP DNS cache enabled | +| agent.sidecar.config.client.net.dns.cache_enabled | bool | `true` | HTTP client DNS cache enabled | | agent.sidecar.config.client.net.dns.cache_expiration | string | `"24h"` | | -| agent.sidecar.config.client.net.dns.refresh_duration | string | `"1h"` | HTTP client TCP DNS cache expiration | +| agent.sidecar.config.client.net.dns.refresh_duration | string | `"1h"` | HTTP client DNS cache expiration | +| agent.sidecar.config.client.net.network | string | `"tcp"` | gRPC client dialer network type | | agent.sidecar.config.client.net.socket_option.ip_recover_destination_addr | bool | `false` | server listen socket option for ip_recover_destination_addr functionality | | agent.sidecar.config.client.net.socket_option.ip_transparent | bool | `false` | server listen socket option for ip_transparent functionality | | agent.sidecar.config.client.net.socket_option.reuse_addr | bool | `true` | server listen socket option for reuse_addr functionality | @@ -320,9 +321,10 @@ Run the following command to install the chart, | defaults.grpc.client.dial_option.net.dialer.dual_stack_enabled | bool | `true` | gRPC client TCP dialer dual stack enabled | | defaults.grpc.client.dial_option.net.dialer.keepalive | string | `""` | gRPC client TCP dialer keep alive | | defaults.grpc.client.dial_option.net.dialer.timeout | string | `""` | gRPC client TCP dialer timeout | -| defaults.grpc.client.dial_option.net.dns.cache_enabled | bool | `true` | gRPC client TCP DNS cache enabled | -| defaults.grpc.client.dial_option.net.dns.cache_expiration | string | `"1h"` | gRPC client TCP DNS cache expiration | -| defaults.grpc.client.dial_option.net.dns.refresh_duration | string | `"30m"` | gRPC client TCP DNS cache refresh duration | +| defaults.grpc.client.dial_option.net.dns.cache_enabled | bool | `true` | gRPC client DNS cache enabled | +| defaults.grpc.client.dial_option.net.dns.cache_expiration | string | `"1h"` | gRPC client DNS cache expiration | +| defaults.grpc.client.dial_option.net.dns.refresh_duration | string | `"30m"` | gRPC client DNS cache refresh duration | +| defaults.grpc.client.dial_option.net.network | string | `"tcp"` | gRPC client dialer network type | | defaults.grpc.client.dial_option.net.socket_option.ip_recover_destination_addr | bool | `false` | server listen socket option for ip_recover_destination_addr functionality | | defaults.grpc.client.dial_option.net.socket_option.ip_transparent | bool | `false` | server listen socket option for ip_transparent functionality | | defaults.grpc.client.dial_option.net.socket_option.reuse_addr | bool | `true` | server listen socket option for reuse_addr functionality | @@ -600,9 +602,10 @@ Run the following command to install the chart, | discoverer.discoverer.net.dialer.dual_stack_enabled | bool | `false` | TCP dialer dual stack enabled | | discoverer.discoverer.net.dialer.keepalive | string | `"10m"` | TCP dialer keep alive | | discoverer.discoverer.net.dialer.timeout | string | `"30s"` | TCP dialer timeout | -| discoverer.discoverer.net.dns.cache_enabled | bool | `true` | TCP DNS cache enabled | -| discoverer.discoverer.net.dns.cache_expiration | string | `"24h"` | TCP DNS cache expiration | -| discoverer.discoverer.net.dns.refresh_duration | string | `"5m"` | TCP DNS cache refresh duration | +| discoverer.discoverer.net.dns.cache_enabled | bool | `true` | DNS cache enabled | +| discoverer.discoverer.net.dns.cache_expiration | string | `"24h"` | DNS cache expiration | +| discoverer.discoverer.net.dns.refresh_duration | string | `"5m"` | DNS cache refresh duration | +| discoverer.discoverer.net.network | string | `"tcp"` | gRPC client dialer network type | | discoverer.discoverer.net.socket_option.ip_recover_destination_addr | bool | `false` | server listen socket option for ip_recover_destination_addr functionality | | discoverer.discoverer.net.socket_option.ip_transparent | bool | `false` | server listen socket option for ip_transparent functionality | | discoverer.discoverer.net.socket_option.reuse_addr | bool | `true` | server listen socket option for reuse_addr functionality | @@ -826,9 +829,10 @@ Run the following command to install the chart, | gateway.mirror.gateway_config.net.dialer.dual_stack_enabled | bool | `false` | TCP dialer dual stack enabled | | gateway.mirror.gateway_config.net.dialer.keepalive | string | `"10m"` | TCP dialer keep alive | | gateway.mirror.gateway_config.net.dialer.timeout | string | `"30s"` | TCP dialer timeout | -| gateway.mirror.gateway_config.net.dns.cache_enabled | bool | `true` | TCP DNS cache enabled | -| gateway.mirror.gateway_config.net.dns.cache_expiration | string | `"24h"` | TCP DNS cache expiration | -| gateway.mirror.gateway_config.net.dns.refresh_duration | string | `"5m"` | TCP DNS cache refresh duration | +| gateway.mirror.gateway_config.net.dns.cache_enabled | bool | `true` | DNS cache enabled | +| gateway.mirror.gateway_config.net.dns.cache_expiration | string | `"24h"` | DNS cache expiration | +| gateway.mirror.gateway_config.net.dns.refresh_duration | string | `"5m"` | DNS cache refresh duration | +| gateway.mirror.gateway_config.net.network | string | `"tcp"` | | | gateway.mirror.gateway_config.net.socket_option.ip_recover_destination_addr | bool | `false` | server listen socket option for ip_recover_destination_addr functionality | | gateway.mirror.gateway_config.net.socket_option.ip_transparent | bool | `false` | server listen socket option for ip_transparent functionality | | gateway.mirror.gateway_config.net.socket_option.reuse_addr | bool | `true` | server listen socket option for reuse_addr functionality | diff --git a/charts/vald/values.schema.json b/charts/vald/values.schema.json index 0df5814bd51..ef4245c7b00 100644 --- a/charts/vald/values.schema.json +++ b/charts/vald/values.schema.json @@ -1997,18 +1997,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -3768,18 +3773,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -5259,18 +5269,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -7089,18 +7104,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -7444,18 +7464,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -7791,18 +7816,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -9658,18 +9688,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -10001,18 +10036,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -10345,18 +10385,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -12218,18 +12263,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -12400,18 +12450,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -14204,18 +14259,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -14547,18 +14607,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -14905,18 +14970,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -16531,18 +16601,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -16874,18 +16949,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -18558,18 +18638,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -18901,18 +18986,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -22218,18 +22308,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { @@ -22561,18 +22656,23 @@ "properties": { "cache_enabled": { "type": "boolean", - "description": "gRPC client TCP DNS cache enabled" + "description": "gRPC client DNS cache enabled" }, "cache_expiration": { "type": "string", - "description": "gRPC client TCP DNS cache expiration" + "description": "gRPC client DNS cache expiration" }, "refresh_duration": { "type": "string", - "description": "gRPC client TCP DNS cache refresh duration" + "description": "gRPC client DNS cache refresh duration" } } }, + "network": { + "type": "string", + "description": "gRPC client dialer network type", + "enum": ["tcp", "udp", "unix"] + }, "socket_option": { "type": "object", "properties": { diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 39347bb890d..faf5a1634b1 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -823,16 +823,19 @@ defaults: interceptors: [] # @schema {"name": "defaults.grpc.client.dial_option.net", "type": "object", "anchor": "net"} net: + # @schema {"name": "defaults.grpc.client.dial_option.net.network", "type": "string", "enum": ["tcp", "udp", "unix"]} + # defaults.grpc.client.dial_option.net.network -- gRPC client dialer network type + network: tcp # @schema {"name": "defaults.grpc.client.dial_option.net.dns", "type": "object"} dns: # @schema {"name": "defaults.grpc.client.dial_option.net.dns.cache_enabled", "type": "boolean"} - # defaults.grpc.client.dial_option.net.dns.cache_enabled -- gRPC client TCP DNS cache enabled + # defaults.grpc.client.dial_option.net.dns.cache_enabled -- gRPC client DNS cache enabled cache_enabled: true # @schema {"name": "defaults.grpc.client.dial_option.net.dns.refresh_duration", "type": "string"} - # defaults.grpc.client.dial_option.net.dns.refresh_duration -- gRPC client TCP DNS cache refresh duration + # defaults.grpc.client.dial_option.net.dns.refresh_duration -- gRPC client DNS cache refresh duration refresh_duration: 30m # @schema {"name": "defaults.grpc.client.dial_option.net.dns.cache_expiration", "type": "string"} - # defaults.grpc.client.dial_option.net.dns.cache_expiration -- gRPC client TCP DNS cache expiration + # defaults.grpc.client.dial_option.net.dns.cache_expiration -- gRPC client DNS cache expiration cache_expiration: 1h # @schema {"name": "defaults.grpc.client.dial_option.net.dialer", "type": "object"} dialer: @@ -1815,12 +1818,14 @@ gateway: gateway_config: # @schema {"name": "gateway.mirror.gateway_config.net", "alias": "net"} net: + # gateway.mirror.gateway_config.net.network.cache_enabled -- gRPC client dialer network type + network: tcp dns: - # gateway.mirror.gateway_config.net.dns.cache_enabled -- TCP DNS cache enabled + # gateway.mirror.gateway_config.net.dns.cache_enabled -- DNS cache enabled cache_enabled: true - # gateway.mirror.gateway_config.net.dns.refresh_duration -- TCP DNS cache refresh duration + # gateway.mirror.gateway_config.net.dns.refresh_duration -- DNS cache refresh duration refresh_duration: 5m - # gateway.mirror.gateway_config.net.dns.cache_expiration -- TCP DNS cache expiration + # gateway.mirror.gateway_config.net.dns.cache_expiration -- DNS cache expiration cache_expiration: 24h dialer: # gateway.mirror.gateway_config.net.dialer.timeout -- TCP dialer timeout @@ -2630,12 +2635,14 @@ agent: client: # @schema {"name": "agent.sidecar.config.client.net", "alias": "net"} net: + # agent.sidecar.config.client.net.network -- gRPC client dialer network type + network: tcp dns: - # agent.sidecar.config.client.net.dns.cache_enabled -- HTTP client TCP DNS cache enabled + # agent.sidecar.config.client.net.dns.cache_enabled -- HTTP client DNS cache enabled cache_enabled: true - # agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client TCP DNS cache refresh duration + # agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client DNS cache refresh duration refresh_duration: 1h - # agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client TCP DNS cache expiration + # agent.sidecar.config.client.net.dns.refresh_duration -- HTTP client DNS cache expiration cache_expiration: 24h dialer: # agent.sidecar.config.client.net.dialer.timeout -- HTTP client TCP dialer connect timeout @@ -3005,12 +3012,14 @@ discoverer: fields: {} # @schema {"name": "discoverer.discoverer.net", "alias": "net"} net: + # discoverer.discoverer.net.network -- gRPC client dialer network type + network: tcp dns: - # discoverer.discoverer.net.dns.cache_enabled -- TCP DNS cache enabled + # discoverer.discoverer.net.dns.cache_enabled -- DNS cache enabled cache_enabled: true - # discoverer.discoverer.net.dns.refresh_duration -- TCP DNS cache refresh duration + # discoverer.discoverer.net.dns.refresh_duration -- DNS cache refresh duration refresh_duration: 5m - # discoverer.discoverer.net.dns.cache_expiration -- TCP DNS cache expiration + # discoverer.discoverer.net.dns.cache_expiration -- DNS cache expiration cache_expiration: 24h dialer: # discoverer.discoverer.net.dialer.timeout -- TCP dialer timeout diff --git a/example/client/go.mod b/example/client/go.mod index 5bd06cb1a9c..61a9297d94a 100644 --- a/example/client/go.mod +++ b/example/client/go.mod @@ -1,6 +1,6 @@ module github.com/vdaas/vald/example/client -go 1.23.2 +go 1.23.3 replace ( github.com/envoyproxy/protoc-gen-validate => github.com/envoyproxy/protoc-gen-validate v1.1.0 diff --git a/example/client/go.mod.default b/example/client/go.mod.default index 205d365b8ec..b7cb0ee2a2f 100644 --- a/example/client/go.mod.default +++ b/example/client/go.mod.default @@ -1,6 +1,6 @@ module github.com/vdaas/vald/example/client -go 1.23.2 +go 1.23.3 replace ( github.com/envoyproxy/protoc-gen-validate => github.com/envoyproxy/protoc-gen-validate latest diff --git a/go.mod b/go.mod index 7f10394c441..bdbc797ae0f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/vdaas/vald -go 1.23.2 +go 1.23.3 replace ( cloud.google.com/go => cloud.google.com/go v0.116.0 @@ -196,7 +196,7 @@ replace ( github.com/klauspost/cpuid/v2 => github.com/klauspost/cpuid/v2 v2.2.9 github.com/kpango/fastime => github.com/kpango/fastime v1.1.9 github.com/kpango/fuid => github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1 - github.com/kpango/gache/v2 => github.com/kpango/gache/v2 v2.1.0 + github.com/kpango/gache/v2 => github.com/kpango/gache/v2 v2.1.1 github.com/kpango/glg => github.com/kpango/glg v1.6.15 github.com/kr/fs => github.com/kr/fs v0.1.0 github.com/kr/pretty => github.com/kr/pretty v0.3.1 @@ -378,6 +378,7 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 github.com/quasilyte/go-ruleguard v0.0.0-00010101000000-000000000000 github.com/quasilyte/go-ruleguard/dsl v0.3.22 + github.com/quic-go/quic-go v0.48.1 github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.9.0 github.com/unum-cloud/usearch/golang v0.0.0-20241104182314-dccdd8e4152d @@ -461,6 +462,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-pdf/fpdf v0.9.0 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/go-toolsmith/astcopy v1.0.2 // indirect github.com/go-toolsmith/astequal v1.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -497,6 +499,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + github.com/onsi/ginkgo/v2 v2.21.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -517,6 +520,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect + go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.29.0 // indirect golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect diff --git a/go.sum b/go.sum index 8f92d9b291b..4ede379c087 100644 --- a/go.sum +++ b/go.sum @@ -382,7 +382,6 @@ github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= -github.com/go-task/slim-sprig v2.20.0+incompatible h1:4Xh3bDzO29j4TWNOI+24ubc0vbVFMg2PMnXKxK54/CA= github.com/go-task/slim-sprig v2.20.0+incompatible/go.mod h1:N/mhXZITr/EQAOErEHciKvO1bFei2Lld2Ym6h96pdy0= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= @@ -524,8 +523,8 @@ github.com/kpango/fastime v1.1.9 h1:xVQHcqyPt5M69DyFH7g1EPRns1YQNap9d5eLhl/Jy84= github.com/kpango/fastime v1.1.9/go.mod h1:vyD7FnUn08zxY4b/QFBZVG+9EWMYsNl+QF0uE46urD4= github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1 h1:rxyM+7uaZQ35P9fbixdnld/h4AgEhODoubuy6A4nDdk= github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1/go.mod h1:CAYeq6us9NfnRkSz67/xKVIR6/vaY5ZQZRe6IVcaIKg= -github.com/kpango/gache/v2 v2.1.0 h1:QghBsCQFFSGc3cLuD1rpHRy73Pd095hhv3eyxjf7+40= -github.com/kpango/gache/v2 v2.1.0/go.mod h1:0YTbg//JH4Zylm/8LNHRaSbiXphcFStdVhiftgVpPds= +github.com/kpango/gache/v2 v2.1.1 h1:nOuVy7saIbs+tMtOyvPIf71Be2lUL88ymV7SQoICOkw= +github.com/kpango/gache/v2 v2.1.1/go.mod h1:c5WoO35SM5xq4x8K+QkTwh5xsjokfL3yKsLUUtDll+c= github.com/kpango/glg v1.6.15 h1:nw0xSxpSyrDIWHeb3dvnE08PW+SCbK+aYFETT75IeLA= github.com/kpango/glg v1.6.15/go.mod h1:cmsc7Yeu8AS3wHLmN7bhwENXOpxfq+QoqxCIk2FneRk= github.com/kpango/go-hostpool v0.0.0-20210303030322-aab80263dcd0 h1:orIEVdc68woWO1ZyYWEVOl5Kl33eDjP+kbxgbdpMwi4= @@ -622,6 +621,8 @@ github.com/quasilyte/gogrep v0.5.0 h1:eTKODPXbI8ffJMN+W2aE0+oL0z/nh8/5eNdiO34SOA github.com/quasilyte/gogrep v0.5.0/go.mod h1:Cm9lpz9NZjEoL1tgZ2OgeUKPIxL1meE7eo60Z6Sk+Ng= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 h1:M8mH9eK4OUR4lu7Gd+PU1fV2/qnDNfzT635KRSObncs= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567/go.mod h1:DWNGW8A4Y+GyBgPuaQJuWiy0XYftx4Xm/y5Jqk9I6VQ= +github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA= +github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -716,6 +717,8 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= diff --git a/hack/go.mod.default b/hack/go.mod.default index 328d025592f..436c759ab63 100644 --- a/hack/go.mod.default +++ b/hack/go.mod.default @@ -1,6 +1,6 @@ module github.com/vdaas/vald -go 1.23.2 +go 1.23.3 replace ( cloud.google.com/go => cloud.google.com/go upgrade diff --git a/internal/client/v1/client/discoverer/discover.go b/internal/client/v1/client/discoverer/discover.go index 83c98c46db2..c8afd0916d6 100644 --- a/internal/client/v1/client/discoverer/discover.go +++ b/internal/client/v1/client/discoverer/discover.go @@ -18,8 +18,10 @@ package discoverer import ( + "cmp" "context" "reflect" + "slices" "sync/atomic" "time" @@ -96,16 +98,16 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { } } - ech := make(chan error, 100) - addrs, err := c.dnsDiscovery(ctx, ech) - if err != nil { - close(ech) - return nil, err + addrs, err := c.updateDiscoveryInfo(ctx) + if err != nil || len(addrs) == 0 { + addrs, err = c.dnsDiscovery(ctx) + if err != nil { + return nil, err + } } - c.addrs.Store(&addrs) var aech <-chan error - if c.autoconn { + if c.client == nil { c.client = grpc.New( append( c.opts, @@ -113,23 +115,38 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { grpc.WithErrGroup(c.eg), )..., ) - if c.client != nil { - aech, err = c.client.StartConnectionMonitor(ctx) + aech, err = c.client.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } + for _, addr := range addrs { + if c.onConnect != nil { + err = c.onConnect(ctx, c, addr) + if err != nil { + return nil, err + } + } + } + } else { + for _, addr := range addrs { + err = c.connect(ctx, addr) if err != nil { - close(ech) return nil, err } } + aech, err = c.client.StartConnectionMonitor(ctx) } + if err != nil { + return nil, err + } + c.addrs.Store(&addrs) - err = c.discover(ctx, ech) + err = c.discover(ctx) if err != nil { - close(ech) return nil, errors.Join(c.dscClient.Close(ctx), err) } - + ech := make(chan error, 100) c.eg.Go(safety.RecoverFunc(func() (err error) { - defer close(ech) dt := time.NewTicker(c.dscDur) defer dt.Stop() finalize := func() (err error) { @@ -158,7 +175,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { case err = <-aech: case err = <-rrech: case <-dt.C: - err = c.discover(ctx, ech) + err = c.discover(ctx) } if err != nil { log.Error(err) @@ -177,14 +194,11 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { func (c *client) GetAddrs(ctx context.Context) (addrs []string) { a := c.addrs.Load() if a == nil { - ips, err := net.DefaultResolver.LookupIPAddr(ctx, c.dns) + var err error + addrs, err = c.dnsDiscovery(ctx) if err != nil { return nil } - addrs = make([]string, 0, len(ips)) - for _, ip := range ips { - addrs = append(addrs, ip.String()) - } } else { addrs = *a } @@ -217,7 +231,7 @@ func (c *client) GetReadClient() grpc.Client { func (c *client) connect(ctx context.Context, addr string) (err error) { if c.autoconn && c.client != nil { - _, err = c.client.Connect(ctx, addr) + _, err = c.client.Connect(ctx, addr, c.client.GetDialOption()...) if err != nil { return err } @@ -238,7 +252,7 @@ func (c *client) disconnect(ctx context.Context, addr string) (err error) { return } -func (c *client) dnsDiscovery(ctx context.Context, ech chan<- error) (addrs []string, err error) { +func (c *client) dnsDiscovery(ctx context.Context) (addrs []string, err error) { ips, err := net.DefaultResolver.LookupIPAddr(ctx, c.dns) if err != nil || len(ips) == 0 { return nil, errors.ErrAddrCouldNotDiscover(err, c.dns) @@ -249,7 +263,6 @@ func (c *client) dnsDiscovery(ctx context.Context, ech chan<- error) (addrs []st addr := net.JoinHostPort(ip.String(), uint16(c.port)) if err = c.connect(ctx, addr); err != nil { log.Debugf("dns discovery connect for addr = %s from dns = %s failed %v", addr, c.dns, err) - ech <- err } else { log.Debugf("dns discovery connect for addr = %s from dns = %s succeeded", addr, c.dns) addrs = append(addrs, addr) @@ -264,7 +277,7 @@ func (c *client) dnsDiscovery(ctx context.Context, ech chan<- error) (addrs []st return addrs, nil } -func (c *client) discover(ctx context.Context, ech chan<- error) (err error) { +func (c *client) discover(ctx context.Context) (err error) { if c.dscClient == nil || (c.autoconn && c.client == nil) { return errors.ErrGRPCClientNotFound } @@ -272,7 +285,7 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) { var connected []string if bo := c.client.GetBackoff(); bo != nil { _, err = bo.Do(ctx, func(ctx context.Context) (any, bool, error) { - connected, err = c.updateDiscoveryInfo(ctx, ech) + connected, err = c.updateDiscoveryInfo(ctx) if err != nil { if !errors.Is(err, errors.ErrGRPCClientNotFound) && !errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { @@ -283,11 +296,11 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) { return nil, false, nil }) } else { - connected, err = c.updateDiscoveryInfo(ctx, ech) + connected, err = c.updateDiscoveryInfo(ctx) } if err != nil { log.Warnf("failed to discover addrs from discoverer API, error: %v,\ttrying to dns discovery from %s...", err, c.dns) - connected, err = c.dnsDiscovery(ctx, ech) + connected, err = c.dnsDiscovery(ctx) if err != nil { return err } @@ -295,12 +308,10 @@ func (c *client) discover(ctx context.Context, ech chan<- error) (err error) { oldAddrs := c.GetAddrs(ctx) c.addrs.Store(&connected) - return c.disconnectOldAddrs(ctx, oldAddrs, connected, ech) + return c.disconnectOldAddrs(ctx, oldAddrs, connected) } -func (c *client) updateDiscoveryInfo( - ctx context.Context, ech chan<- error, -) (connected []string, err error) { +func (c *client) updateDiscoveryInfo(ctx context.Context) (connected []string, err error) { nodes, err := c.discoverNodes(ctx) if err != nil { log.Warnf("error detected when discovering nodes,\terrors: %v", err) @@ -310,7 +321,7 @@ func (c *client) updateDiscoveryInfo( log.Warn("no nodes found") return nil, errors.ErrNodeNotFound("all") } - connected, err = c.discoverAddrs(ctx, nodes, ech) + connected, err = c.discoverAddrs(ctx, nodes) if err != nil { return nil, err } @@ -343,19 +354,39 @@ func (c *client) discoverNodes(ctx context.Context) (nodes *payload.Info_Nodes, } return nodes, nil }) - return nodes, err + if err != nil { + return nil, err + } + slices.SortFunc(nodes.Nodes, func(left, right *payload.Info_Node) int { + if left.GetMemory() == nil || right.GetMemory() == nil { + return 0 // Default comparison value; adjust as needed. + } + return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage()) + }) + return nodes, nil } func (c *client) discoverAddrs( - ctx context.Context, nodes *payload.Info_Nodes, ech chan<- error, + ctx context.Context, nodes *payload.Info_Nodes, ) (addrs []string, err error) { + if nodes == nil { + return nil, errors.ErrAddrCouldNotDiscover(err, c.dns) + } maxPodLen := 0 podLength := 0 - for _, node := range nodes.GetNodes() { - l := len(node.GetPods().GetPods()) - podLength += l - if l > maxPodLen { - maxPodLen = l + for i, node := range nodes.GetNodes() { + if node != nil && node.GetPods() != nil && node.GetPods().GetPods() != nil { + l := len(node.GetPods().GetPods()) + podLength += l + if l > maxPodLen { + maxPodLen = l + } + slices.SortFunc(nodes.Nodes[i].Pods.Pods, func(left, right *payload.Info_Pod) int { + if left.GetMemory() == nil || right.GetMemory() == nil { + return 0 // Default comparison value; adjust as needed. + } + return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage()) + }) } } addrs = make([]string, 0, podLength) @@ -371,15 +402,12 @@ func (c *client) discoverAddrs( len(node.GetPods().GetPods()[i].GetIp()) != 0 { addr := net.JoinHostPort(node.GetPods().GetPods()[i].GetIp(), uint16(c.port)) if err = c.connect(ctx, addr); err != nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case ech <- errors.ErrAddrCouldNotDiscover(err, addr): - } + log.Debugf("resource based discovery connect from discoverer API for addr = %s failed %v", addr, errors.ErrAddrCouldNotDiscover(err, addr)) err = nil } else { addrs = append(addrs, addr) } + } } } @@ -388,7 +416,7 @@ func (c *client) discoverAddrs( } func (c *client) disconnectOldAddrs( - ctx context.Context, oldAddrs, connectedAddrs []string, ech chan<- error, + ctx context.Context, oldAddrs, connectedAddrs []string, ) (err error) { if !c.autoconn { return nil @@ -404,7 +432,7 @@ func (c *client) disconnectOldAddrs( c.eg.Go(safety.RecoverFunc(func() error { err = c.disconnect(ctx, old) if err != nil { - ech <- err + log.Error(err) } return nil })) @@ -420,22 +448,12 @@ func (c *client) disconnectOldAddrs( if !ok { err = c.disconnect(ctx, addr) if err != nil { - select { - case <-ctx.Done(): - return errors.Join(ctx.Err(), err) - case ech <- err: - return err - } + return err } } return nil }); err != nil { - select { - case <-ctx.Done(): - return errors.Join(ctx.Err(), err) - case ech <- err: - return err - } + log.Error(err) } } return nil diff --git a/internal/config/grpc.go b/internal/config/grpc.go index 8fc1379ab13..a8cee43dee8 100644 --- a/internal/config/grpc.go +++ b/internal/config/grpc.go @@ -272,8 +272,12 @@ func (g *GRPCClient) Opts() ([]grpc.Option, error) { if err != nil { return nil, err } + network := g.DialOption.Net.Network + if network == "" { + network = net.TCP.String() + } opts = append(opts, - grpc.WithDialer(der), + grpc.WithDialer(network, der), ) } diff --git a/internal/config/net.go b/internal/config/net.go index b76bccc7af2..ad3b26610e0 100644 --- a/internal/config/net.go +++ b/internal/config/net.go @@ -25,6 +25,7 @@ import ( // Net represents the network configuration tcp, udp, unix domain socket. type Net struct { + Network string `json:"network,omitempty" yaml:"network"` DNS *DNS `json:"dns,omitempty" yaml:"dns"` Dialer *Dialer `json:"dialer,omitempty" yaml:"dialer"` SocketOption *SocketOption `json:"socket_option,omitempty" yaml:"socket_option"` @@ -117,6 +118,7 @@ func (s *SocketOption) ToSocketFlag() control.SocketFlag { // Bind binds the actual data from the Net fields. func (t *Net) Bind() *Net { + t.Network = GetActualValue(t.Network) if t.TLS != nil { t.TLS = t.TLS.Bind() } diff --git a/internal/errors/net.go b/internal/errors/net.go index 1a64fa9cf6f..f91599bdbe0 100644 --- a/internal/errors/net.go +++ b/internal/errors/net.go @@ -41,4 +41,10 @@ var ( ErrLookupIPAddrNotFound = func(host string) error { return Errorf("failed to lookup ip addrs for host: %s", host) } + + ErrInvalidAddress = func(network, addr string) error { + return Errorf("invalid address %s detected for network: %s", addr, network) + } + + ErrEmptyALPNs = New("empty ALPN protocols detected") ) diff --git a/internal/net/dialer.go b/internal/net/dialer.go index d7159bb40d2..792aadb5859 100644 --- a/internal/net/dialer.go +++ b/internal/net/dialer.go @@ -30,6 +30,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/control" + "github.com/vdaas/vald/internal/net/quic" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" @@ -123,7 +124,7 @@ func NewDialer(opts ...DialerOption) (der Dialer, err error) { if d.dnsCache, err = cache.New( cache.WithExpireDuration[*dialerCache](d.dnsCacheExpirationStr), cache.WithExpireCheckDuration[*dialerCache](d.dnsRefreshDurationStr), - cache.WithExpiredHook[*dialerCache](d.cacheExpireHook), + cache.WithExpiredHook(d.cacheExpireHook), ); err != nil { return nil, err } @@ -310,6 +311,17 @@ func (d *dialer) cachedDialer(ctx context.Context, network, addr string) (conn C return d.dial(ctx, network, addr) } +func isQUICDial(network, addr string) bool { + if !IsUDP(network) { + return false + } + host, port, err := SplitHostPort(addr) + if err != nil || host == "" || port == 0 { + return false + } + return port != 53 +} + func (d *dialer) dial(ctx context.Context, network, addr string) (conn Conn, err error) { ctx, span := trace.StartSpan(ctx, apiName+"/Dialer.dial") defer func() { @@ -317,26 +329,33 @@ func (d *dialer) dial(ctx context.Context, network, addr string) (conn Conn, err span.End() } }() + if NetworkTypeFromString(network) == Unknown { + network = TCP.String() + } + if addr == "" { + return nil, errors.ErrInvalidAddress(network, addr) + } log.Debugf("%s connection dialing to addr %s", network, addr) - err = safety.RecoverWithoutPanicFunc(func() error { - conn, err = d.der.DialContext(ctx, network, addr) + err = safety.RecoverWithoutPanicFunc(func() (err error) { + if isQUICDial(network, addr) { + conn, err = quic.DialContext(ctx, addr, d.tlsConfig) + } else { + if IsUDP(network) { + network = TCP.String() + } + conn, err = d.der.DialContext(ctx, network, addr) + } return err })() if err != nil { - defer func(conn Conn) { - if conn != nil { - if err != nil { - err = errors.Join(conn.Close(), err) - return - } - err = conn.Close() - } - }(conn) + if conn != nil { + err = errors.Join(conn.Close(), err) + } return nil, err } d.tmu.RLock() - if d.tlsConfig != nil { + if !IsUDP(network) && d.tlsConfig != nil { d.tmu.RUnlock() return d.tlsHandshake(ctx, conn, network, addr) } @@ -423,15 +442,12 @@ func (d *dialer) tlsHandshake( })() } if err != nil || conn == nil { - defer func(conn Conn) { - if conn != nil { - if err != nil { - err = errors.Join(conn.Close(), err) - return - } - err = conn.Close() + if conn != nil { + if err != nil { + return nil, errors.Join(conn.Close(), err) } - }(conn) + return nil, conn.Close() + } return nil, err } tconn, ok := conn.(*tls.Conn) @@ -454,11 +470,15 @@ func (d *dialer) tlsHandshake( return tconn, nil } -func (d *dialer) cacheExpireHook(ctx context.Context, addr string, _ *dialerCache) { +func (d *dialer) cacheExpireHook(ctx context.Context, addr string, dc *dialerCache) { if err := safety.RecoverFunc(func() (err error) { _, err = d.lookup(ctx, addr) return })(); err != nil { - log.Errorf("dns cache expiration hook process returned error: %v\tfor addr:\t%s", err, addr) + if dc != nil { + log.Errorf("dns cache expiration hook process returned error: %v\tfor addr:\t%s\tips: %v\tlen: %d", err, addr, dc.ips, dc.Len()) + } else { + log.Errorf("dns cache expiration hook process returned error: %v\tfor addr:\t%s", err, addr) + } } } diff --git a/internal/net/dialer_test.go b/internal/net/dialer_test.go index 9c6261aa0cc..ba92251178d 100644 --- a/internal/net/dialer_test.go +++ b/internal/net/dialer_test.go @@ -1425,7 +1425,7 @@ func Test_dialer_dial(t *testing.T) { return nil }, want: want{ - err: errors.New("missing address"), + err: errors.ErrInvalidAddress(TCP.String(), ""), }, }, { @@ -1452,7 +1452,7 @@ func Test_dialer_dial(t *testing.T) { return nil }, want: want{ - err: net.UnknownNetworkError("invalid"), + err: errors.ErrInvalidAddress(TCP.String(), ""), }, }, { @@ -1477,7 +1477,7 @@ func Test_dialer_dial(t *testing.T) { return nil }, want: want{ - err: net.UnknownNetworkError(""), + err: errors.ErrInvalidAddress(TCP.String(), ""), }, }, } diff --git a/internal/net/grpc/option.go b/internal/net/grpc/option.go index 79a9e50131c..26dee5da124 100644 --- a/internal/net/grpc/option.go +++ b/internal/net/grpc/option.go @@ -485,18 +485,26 @@ func WithKeepaliveParams(t, to string, permitWithoutStream bool) Option { } } -func WithDialer(der net.Dialer) Option { +func WithDialer(network string, der net.Dialer) Option { return func(g *gRPCClient) { if der != nil { g.dialer = der if g.dopts == nil && cap(g.dopts) == 0 { g.dopts = make([]grpc.DialOption, 0, defaultDialOptionLength) } + nt := net.NetworkTypeFromString(network) + switch nt { + case net.UDP, net.UDP4, net.UDP6: + nt = net.UDP + case net.UNIX, net.UNIXGRAM, net.UNIXPACKET: + nt = net.UNIX + default: + nt = net.TCP + } g.dopts = append(g.dopts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { - // TODO we need change network type dynamically - log.Debugf("gRPC context Dialer addr is %s", addr) - return der.GetDialer()(ctx, net.TCP.String(), addr) + log.Debugf("gRPC context Dialer for network %s, addr is %s", nt.String(), addr) + return g.dialer.GetDialer()(ctx, nt.String(), addr) }), ) } diff --git a/internal/net/net.go b/internal/net/net.go index 8f87a858613..56d3fcf3524 100644 --- a/internal/net/net.go +++ b/internal/net/net.go @@ -34,6 +34,9 @@ import ( ) type ( + // Addr is an alias of net.Addr. + Addr = net.Addr + // Conn is an alias of net.Conn. Conn = net.Conn @@ -46,6 +49,15 @@ type ( // Resolver is an alias of net.Resolver. Resolver = net.Resolver + // UDPConn is an alias of net.UDPConn. + UDPConn = net.UDPConn + + // TCPListener is an alias of net.TCPListener. + TCPListener = net.TCPListener + + // UnixListener is an alias of net.UnixListener. + UnixListener = net.UnixListener + // NetworkType represents a network type such as TCP, TCP6, etc. NetworkType uint ) @@ -84,6 +96,9 @@ var ( // NetworkTypeFromString returns the corresponding network type from string. func NetworkTypeFromString(str string) NetworkType { + if str == "" { + return Unknown + } switch strings.ToLower(str) { case UNIX.String(): return UNIX @@ -151,6 +166,22 @@ func IsLocal(host string) bool { host == localIPv6 } +// IsUDP returns if the network type is the udp or udp4 or udp6. +func IsUDP(network string) bool { + rip := NetworkTypeFromString(network) + return rip == UDP || + rip == UDP4 || + rip == UDP6 +} + +// IsTCP returns if the network type is the tcp or tcp4 or tcp6. +func IsTCP(network string) bool { + rip := NetworkTypeFromString(network) + return rip == TCP || + rip == TCP4 || + rip == TCP6 +} + // Parse parses the hostname, IPv4 or IPv6 address and return the hostname/IP, port number, // whether the address is local IP and IPv4 or IPv6, and any parsing error occurred. // The address should contains the port number, otherwise an error will return. diff --git a/internal/net/net_test.go b/internal/net/net_test.go index 1c055fa0a85..1b0bbd6249e 100644 --- a/internal/net/net_test.go +++ b/internal/net/net_test.go @@ -262,7 +262,7 @@ func TestParse(t *testing.T) { return errors.Errorf("host got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotHost, w.wantHost) } if !reflect.DeepEqual(gotPort, w.wantPort) { - return errors.Errorf("port got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotPort, w.wantPort) + return errors.Errorf("port got: \"%d\",\n\t\t\t\twant: \"%d\"", gotPort, w.wantPort) } if !reflect.DeepEqual(gotIsLocal, w.isLocal) { return errors.Errorf("isLocal got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotIsLocal, w.isLocal) diff --git a/internal/net/quic/conn.go b/internal/net/quic/conn.go new file mode 100644 index 00000000000..6aa206f1073 --- /dev/null +++ b/internal/net/quic/conn.go @@ -0,0 +1,170 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package quic + +import ( + "context" + "net" + + quic "github.com/quic-go/quic-go" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/sync" + "github.com/vdaas/vald/internal/tls" +) + +type Conn struct { + quic.Connection + quic.Stream +} + +type qconn struct { + connectionCache sync.Map[string, quic.Connection] +} + +var defaultQconn = new(qconn) + +func NewConn(ctx context.Context, conn quic.Connection) (net.Conn, error) { + stream, err := conn.OpenStreamSync(ctx) + if err != nil { + return nil, err + } + return &Conn{ + Connection: conn, + Stream: stream, + }, nil +} + +func (c *Conn) Close() (err error) { + return c.Stream.Close() +} + +func DialContext(ctx context.Context, addr string, tcfg *tls.Config) (net.Conn, error) { + if tcfg == nil { + return nil, errors.ErrCertificationFailed + } + if len(tcfg.NextProtos) == 0 { + return nil, errors.ErrEmptyALPNs + } + return defaultQconn.dialQuicContext(ctx, addr, tcfg) +} + +func (q *qconn) dialQuicContext( + ctx context.Context, addr string, tcfg *tls.Config, +) (net.Conn, error) { + si, ok := q.connectionCache.Load(addr) + if ok { + if conn, ok := si.(quic.Connection); ok { + return NewConn(ctx, conn) + } + } + conn, err := quic.DialAddr(ctx, addr, tcfg, &quic.Config{ + /* + // GetConfigForClient is called for incoming connections. + // If the error is not nil, the connection attempt is refused. + GetConfigForClient func(info *ClientHelloInfo) (*Config, error) + // The QUIC versions that can be negotiated. + // If not set, it uses all versions available. + Versions []Version + // HandshakeIdleTimeout is the idle timeout before completion of the handshake. + // If we don't receive any packet from the peer within this time, the connection attempt is aborted. + // Additionally, if the handshake doesn't complete in twice this time, the connection attempt is also aborted. + // If this value is zero, the timeout is set to 5 seconds. + HandshakeIdleTimeout time.Duration + // MaxIdleTimeout is the maximum duration that may pass without any incoming network activity. + // The actual value for the idle timeout is the minimum of this value and the peer's. + // This value only applies after the handshake has completed. + // If the timeout is exceeded, the connection is closed. + // If this value is zero, the timeout is set to 30 seconds. + MaxIdleTimeout time.Duration + // The TokenStore stores tokens received from the server. + // Tokens are used to skip address validation on future connection attempts. + // The key used to store tokens is the ServerName from the tls.Config, if set + // otherwise the token is associated with the server's IP address. + TokenStore TokenStore + // InitialStreamReceiveWindow is the initial size of the stream-level flow control window for receiving data. + // If the application is consuming data quickly enough, the flow control auto-tuning algorithm + // will increase the window up to MaxStreamReceiveWindow. + // If this value is zero, it will default to 512 KB. + // Values larger than the maximum varint (quicvarint.Max) will be clipped to that value. + InitialStreamReceiveWindow uint64 + // MaxStreamReceiveWindow is the maximum stream-level flow control window for receiving data. + // If this value is zero, it will default to 6 MB. + // Values larger than the maximum varint (quicvarint.Max) will be clipped to that value. + MaxStreamReceiveWindow uint64 + // InitialConnectionReceiveWindow is the initial size of the stream-level flow control window for receiving data. + // If the application is consuming data quickly enough, the flow control auto-tuning algorithm + // will increase the window up to MaxConnectionReceiveWindow. + // If this value is zero, it will default to 512 KB. + // Values larger than the maximum varint (quicvarint.Max) will be clipped to that value. + InitialConnectionReceiveWindow uint64 + // MaxConnectionReceiveWindow is the connection-level flow control window for receiving data. + // If this value is zero, it will default to 15 MB. + // Values larger than the maximum varint (quicvarint.Max) will be clipped to that value. + MaxConnectionReceiveWindow uint64 + // AllowConnectionWindowIncrease is called every time the connection flow controller attempts + // to increase the connection flow control window. + // If set, the caller can prevent an increase of the window. Typically, it would do so to + // limit the memory usage. + // To avoid deadlocks, it is not valid to call other functions on the connection or on streams + // in this callback. + AllowConnectionWindowIncrease func(conn Connection, delta uint64) bool + // MaxIncomingStreams is the maximum number of concurrent bidirectional streams that a peer is allowed to open. + // If not set, it will default to 100. + // If set to a negative value, it doesn't allow any bidirectional streams. + // Values larger than 2^60 will be clipped to that value. + MaxIncomingStreams int64 + // MaxIncomingUniStreams is the maximum number of concurrent unidirectional streams that a peer is allowed to open. + // If not set, it will default to 100. + // If set to a negative value, it doesn't allow any unidirectional streams. + // Values larger than 2^60 will be clipped to that value. + MaxIncomingUniStreams int64 + // KeepAlivePeriod defines whether this peer will periodically send a packet to keep the connection alive. + // If set to 0, then no keep alive is sent. Otherwise, the keep alive is sent on that period (or at most + // every half of MaxIdleTimeout, whichever is smaller). + KeepAlivePeriod time.Duration + // InitialPacketSize is the initial size of packets sent. + // It is usually not necessary to manually set this value, + // since Path MTU discovery very quickly finds the path's MTU. + // If set too high, the path might not support packets that large, leading to a timeout of the QUIC handshake. + // Values below 1200 are invalid. + InitialPacketSize uint16 + // DisablePathMTUDiscovery disables Path MTU Discovery (RFC 8899). + // This allows the sending of QUIC packets that fully utilize the available MTU of the path. + // Path MTU discovery is only available on systems that allow setting of the Don't Fragment (DF) bit. + DisablePathMTUDiscovery bool + // Allow0RTT allows the application to decide if a 0-RTT connection attempt should be accepted. + // Only valid for the server. + Allow0RTT bool + // Enable QUIC datagram support (RFC 9221). + EnableDatagrams bool + Tracer func(context.Context, logging.Perspective, ConnectionID) *logging.ConnectionTracer + */ + }) + if err != nil { + return nil, err + } + q.connectionCache.Store(addr, conn) + return NewConn(ctx, conn) +} + +func (q *qconn) Close() (err error) { + q.connectionCache.Range(func(addr string, conn quic.Connection) bool { + e := conn.CloseWithError(0, addr) + if e != nil { + err = errors.Wrap(err, e.Error()) + } + return true + }) + return nil +} diff --git a/internal/net/quic/listener.go b/internal/net/quic/listener.go new file mode 100644 index 00000000000..6372ab45870 --- /dev/null +++ b/internal/net/quic/listener.go @@ -0,0 +1,82 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package quic + +import ( + "context" + "net" + + quic "github.com/quic-go/quic-go" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/io" + "github.com/vdaas/vald/internal/tls" +) + +type Listener struct { + quic.Listener + + ctx context.Context +} + +func Listen(ctx context.Context, addr string, tcfg *tls.Config) (net.Listener, error) { + ql, err := quic.ListenAddr(addr, tcfg, &quic.Config{ + // Versions: nil, + // ConnectionIDLength: 0, + // HandshakeIdleTimeout: 0, + // MaxIdleTimeout: 0, + // AcceptToken: func(clientAddr net.Addr, token *quic.Token) bool { + // return true + // }, + // TokenStore: quic.NewLRUTokenStore(clientAddr), + // InitialStreamReceiveWindow: 0, + // InitialConnectionReceiveWindow: 0, + // MaxStreamReceiveWindow: 0, + // MaxConnectionReceiveWindow: 0, + // MaxIncomingStreams: 0, + // MaxIncomingUniStreams: 0, + // StatelessResetKey: nil, + // KeepAlive: true, + // DisablePathMTUDiscovery: false, + EnableDatagrams: true, + // Tracer: logging.NewMultiplexedTracer(), + }) + if err != nil { + return nil, err + } + return &Listener{ + Listener: *ql, + ctx: ctx, + }, nil +} + +func (l *Listener) Accept() (net.Conn, error) { + sess, err := l.Listener.Accept(l.ctx) + if err != nil { + return nil, err + } + + stream, err := sess.AcceptStream(l.ctx) + if err != nil { + if errors.Is(err, io.EOF) { + // The session was closed gracefully by the peer + return nil, err + } + _ = sess.CloseWithError(0, "failed to accept stream") + return nil, err + } + return &Conn{ + Connection: sess, + Stream: stream, + }, nil +} diff --git a/internal/servers/server/server.go b/internal/servers/server/server.go index 959910b46ed..51aec64b1fd 100644 --- a/internal/servers/server/server.go +++ b/internal/servers/server/server.go @@ -288,12 +288,13 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro } } - l, err := s.lc.Listen(ctx, func() string { + network := func() string { if s.network == 0 || s.network == net.Unknown || strings.EqualFold(s.network.String(), net.Unknown.String()) { return net.TCP.String() } return s.network.String() - }(), func() string { + }() + addr := func() string { if s.network == net.UNIX { if s.socketPath == "" { sockFile := strings.Join([]string{s.name, strconv.Itoa(os.Getpid()), "sock"}, ".") @@ -302,17 +303,47 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro return s.socketPath } return net.JoinHostPort(s.host, s.port) - }()) - if err != nil { - log.Errorf("failed to listen socket %v", err) - return err - } - - if s.tcfg != nil && - (len(s.tcfg.Certificates) != 0 || - s.tcfg.GetCertificate != nil || - s.tcfg.GetConfigForClient != nil) { - l = tls.NewListener(l, s.tcfg) + }() + var l net.Listener + if s.tcfg != nil && net.IsUDP(network) { + log.Error("QUIC protocol is not supported yet") + return errors.ErrUnsupportedClientMethod + } else { + if net.IsUDP(network) { + network = net.TCP.String() + } + l, err = s.lc.Listen(ctx, network, addr) + if err != nil { + log.Errorf("failed to listen socket %v", err) + return err + } + var file *os.File + switch lt := l.(type) { + case *net.TCPListener: + file, err = lt.File() + if err != nil { + log.Errorf("failed to listen tcp socket %v", err) + return err + } + case *net.UnixListener: + file, err = lt.File() + if err != nil { + log.Errorf("failed to listen unix socket %v", err) + return err + } + } + if file != nil { + err = syscall.SetNonblock(int(file.Fd()), true) + if err != nil { + return err + } + } + if s.tcfg != nil && + (len(s.tcfg.Certificates) != 0 || + s.tcfg.GetCertificate != nil || + s.tcfg.GetConfigForClient != nil) { + l = tls.NewListener(l, s.tcfg) + } } if l == nil { @@ -427,9 +458,18 @@ func (s *server) Shutdown(ctx context.Context) (rerr error) { if err != nil && err != http.ErrServerClosed && err != grpc.ErrServerStopped { rerr = errors.Join(rerr, err) } + if err != nil && + !errors.Is(err, http.ErrServerClosed) && + !errors.Is(err, grpc.ErrServerStopped) && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { + rerr = errors.Join(rerr, err) + } err = sctx.Err() - if err != nil && err != context.Canceled { + if err != nil && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { rerr = errors.Join(rerr, err) } diff --git a/internal/servers/servers.go b/internal/servers/servers.go index ddc6d4c4dd4..467fe51bf6d 100644 --- a/internal/servers/servers.go +++ b/internal/servers/servers.go @@ -63,14 +63,20 @@ func (l *listener) ListenAndServe(ctx context.Context) <-chan error { srv, ok := l.servers[name] if !ok || srv == nil { - ech <- errors.ErrServerNotFound(name) + select { + case <-ctx.Done(): + case ech <- errors.ErrServerNotFound(name): + } continue } if !l.servers[name].IsRunning() { err := l.servers[name].ListenAndServe(ctx, ech) if err != nil { - ech <- err + select { + case <-ctx.Done(): + case ech <- err: + } } } } @@ -79,7 +85,10 @@ func (l *listener) ListenAndServe(ctx context.Context) <-chan error { if !l.servers[name].IsRunning() { err := l.servers[name].ListenAndServe(ctx, ech) if err != nil { - ech <- err + select { + case <-ctx.Done(): + case ech <- err: + } } } } diff --git a/k8s/discoverer/configmap.yaml b/k8s/discoverer/configmap.yaml index 738816e3168..86ac6e9760f 100644 --- a/k8s/discoverer/configmap.yaml +++ b/k8s/discoverer/configmap.yaml @@ -269,6 +269,7 @@ data: cache_enabled: true cache_expiration: 24h refresh_duration: 5m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/discoverer/deployment.yaml b/k8s/discoverer/deployment.yaml index d74f417316c..3e88680bf7f 100644 --- a/k8s/discoverer/deployment.yaml +++ b/k8s/discoverer/deployment.yaml @@ -46,7 +46,7 @@ spec: app.kubernetes.io/instance: release-name app.kubernetes.io/component: discoverer annotations: - checksum/configmap: f84f9a4347f265eda0f017bc36b547522041d0612901f77e7d4ac38361c61f87 + checksum/configmap: 59dc2f613bfcf9d9a74e6a53e3fd58ffb09e9213c03ec7f4181be055c4f59edb profefe.com/enable: "true" profefe.com/port: "6060" profefe.com/service: vald-discoverer diff --git a/k8s/gateway/gateway/lb/configmap.yaml b/k8s/gateway/gateway/lb/configmap.yaml index d1710bf750c..d188c411d92 100644 --- a/k8s/gateway/gateway/lb/configmap.yaml +++ b/k8s/gateway/gateway/lb/configmap.yaml @@ -306,6 +306,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false @@ -392,6 +393,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/gateway/gateway/lb/deployment.yaml b/k8s/gateway/gateway/lb/deployment.yaml index 2697492e13c..07e176bb7db 100644 --- a/k8s/gateway/gateway/lb/deployment.yaml +++ b/k8s/gateway/gateway/lb/deployment.yaml @@ -45,7 +45,7 @@ spec: app.kubernetes.io/instance: release-name app.kubernetes.io/component: gateway-lb annotations: - checksum/configmap: 63a6ba288fa5fba551f7383b7fa986cfe2b70ef620a1ed78600bbddae185e9e5 + checksum/configmap: 34833ada0d57883cd1515c5a66afe131e89919e947127f303f52f4f265316915 profefe.com/enable: "true" profefe.com/port: "6060" profefe.com/service: vald-lb-gateway diff --git a/k8s/gateway/gateway/mirror/configmap.yaml b/k8s/gateway/gateway/mirror/configmap.yaml index 305ee92678a..bf3208622e5 100644 --- a/k8s/gateway/gateway/mirror/configmap.yaml +++ b/k8s/gateway/gateway/mirror/configmap.yaml @@ -25,4 +25,4 @@ metadata: app.kubernetes.io/version: v1.7.14 app.kubernetes.io/component: gateway-mirror data: - config.yaml: "---\nversion: v0.0.0\ntime_zone: UTC\nlogging:\n format: raw\n level: debug\n logger: glg\nserver_config:\n servers:\n - name: grpc\n host: 0.0.0.0\n port: 8081\n grpc:\n bidirectional_stream_concurrency: 20\n connection_timeout: \"\"\n enable_admin: true\n enable_channelz: true\n enable_reflection: true\n header_table_size: 0\n initial_conn_window_size: 2097152\n initial_window_size: 1048576\n interceptors:\n - RecoverInterceptor\n keepalive:\n max_conn_age: \"\"\n max_conn_age_grace: \"\"\n max_conn_idle: \"\"\n min_time: 10m\n permit_without_stream: false\n time: 3h\n timeout: 60s\n max_concurrent_streams: 0\n max_header_list_size: 0\n max_receive_message_size: 0\n max_send_message_size: 0\n num_stream_workers: 0\n read_buffer_size: 0\n shared_write_buffer: false\n wait_for_handlers: true\n write_buffer_size: 0\n mode: GRPC\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n socket_path: \"\"\n health_check_servers:\n - name: liveness\n host: 0.0.0.0\n port: 3000\n http:\n handler_timeout: \"\"\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: \"\"\n read_header_timeout: \"\"\n read_timeout: \"\"\n shutdown_duration: 5s\n write_timeout: \"\"\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n socket_path: \"\"\n - name: readiness\n host: 0.0.0.0\n port: 3001\n http:\n handler_timeout: \"\"\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: \"\"\n read_header_timeout: \"\"\n read_timeout: \"\"\n shutdown_duration: 0s\n write_timeout: \"\"\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n socket_path: \"\"\n metrics_servers:\n - name: pprof\n host: 0.0.0.0\n port: 6060\n http:\n handler_timeout: 5s\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: 2s\n read_header_timeout: 1s\n read_timeout: 1s\n shutdown_duration: 5s\n write_timeout: 1m\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: true\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n socket_path: \"\"\n startup_strategy:\n - liveness\n - pprof\n - grpc\n - readiness\n shutdown_strategy:\n - readiness\n - grpc\n - pprof\n - liveness\n full_shutdown_duration: 600s\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\nobservability:\n enabled: false\n otlp:\n collector_endpoint: \"\"\n trace_batch_timeout: \"1s\"\n trace_export_timeout: \"1m\"\n trace_max_export_batch_size: 1024\n trace_max_queue_size: 256\n metrics_export_interval: \"1s\"\n metrics_export_timeout: \"1m\"\n attribute:\n namespace: \"_MY_POD_NAMESPACE_\"\n pod_name: \"_MY_POD_NAME_\"\n node_name: \"_MY_NODE_NAME_\"\n service_name: \"vald-mirror-gateway\"\n metrics:\n enable_cgo: true\n enable_goroutine: true\n enable_memory: true\n enable_version_info: true\n version_info_labels:\n - vald_version\n - server_name\n - git_commit\n - build_time\n - go_version\n - go_os\n - go_arch\n - algorithm_info\n trace:\n enabled: false\ngateway:\n pod_name: _MY_POD_NAME_\n register_duration: 1s\n namespace: _MY_POD_NAMESPACE_\n discovery_duration: 1s\n colocation: dc1\n group: \n net:\n dialer:\n dual_stack_enabled: false\n keepalive: 10m\n timeout: 30s\n dns:\n cache_enabled: true\n cache_expiration: 24h\n refresh_duration: 5m\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: true\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n client:\n addrs:\n - vald-lb-gateway.default.svc.cluster.local:8081\n health_check_duration: \"1s\"\n connection_pool:\n enable_dns_resolver: true\n enable_rebalance: true\n old_conn_close_duration: 2m\n rebalance_duration: 30m\n size: 3\n backoff:\n backoff_factor: 1.1\n backoff_time_limit: 5s\n enable_error_log: true\n initial_duration: 5ms\n jitter_limit: 100ms\n maximum_duration: 5s\n retry_count: 100\n circuit_breaker:\n closed_error_rate: 0.7\n closed_refresh_timeout: 10s\n half_open_error_rate: 0.5\n min_samples: 1000\n open_timeout: 1s\n call_option:\n content_subtype: \"\"\n max_recv_msg_size: 0\n max_retry_rpc_buffer_size: 0\n max_send_msg_size: 0\n wait_for_ready: true\n dial_option:\n authority: \"\"\n backoff_base_delay: 1s\n backoff_jitter: 0.2\n backoff_max_delay: 120s\n backoff_multiplier: 1.6\n disable_retry: false\n enable_backoff: false\n idle_timeout: 1h\n initial_connection_window_size: 2097152\n initial_window_size: 1048576\n insecure: true\n interceptors: []\n keepalive:\n permit_without_stream: false\n time: \"\"\n timeout: 30s\n max_call_attempts: 0\n max_header_list_size: 0\n max_msg_size: 0\n min_connection_timeout: 20s\n net:\n dialer:\n dual_stack_enabled: true\n keepalive: \"\"\n timeout: \"\"\n dns:\n cache_enabled: true\n cache_expiration: 1h\n refresh_duration: 30m\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n read_buffer_size: 0\n shared_write_buffer: false\n timeout: \"\"\n user_agent: Vald-gRPC\n write_buffer_size: 0\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n self_mirror_addr: vald-mirror-gateway.default.svc.cluster.local:8081\n gateway_addr: vald-lb-gateway.default.svc.cluster.local:8081\n" + config.yaml: "---\nversion: v0.0.0\ntime_zone: UTC\nlogging:\n format: raw\n level: debug\n logger: glg\nserver_config:\n servers:\n - name: grpc\n host: 0.0.0.0\n port: 8081\n grpc:\n bidirectional_stream_concurrency: 20\n connection_timeout: \"\"\n enable_admin: true\n enable_channelz: true\n enable_reflection: true\n header_table_size: 0\n initial_conn_window_size: 2097152\n initial_window_size: 1048576\n interceptors:\n - RecoverInterceptor\n keepalive:\n max_conn_age: \"\"\n max_conn_age_grace: \"\"\n max_conn_idle: \"\"\n min_time: 10m\n permit_without_stream: false\n time: 3h\n timeout: 60s\n max_concurrent_streams: 0\n max_header_list_size: 0\n max_receive_message_size: 0\n max_send_message_size: 0\n num_stream_workers: 0\n read_buffer_size: 0\n shared_write_buffer: false\n wait_for_handlers: true\n write_buffer_size: 0\n mode: GRPC\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n socket_path: \"\"\n health_check_servers:\n - name: liveness\n host: 0.0.0.0\n port: 3000\n http:\n handler_timeout: \"\"\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: \"\"\n read_header_timeout: \"\"\n read_timeout: \"\"\n shutdown_duration: 5s\n write_timeout: \"\"\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n socket_path: \"\"\n - name: readiness\n host: 0.0.0.0\n port: 3001\n http:\n handler_timeout: \"\"\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: \"\"\n read_header_timeout: \"\"\n read_timeout: \"\"\n shutdown_duration: 0s\n write_timeout: \"\"\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n socket_path: \"\"\n metrics_servers:\n - name: pprof\n host: 0.0.0.0\n port: 6060\n http:\n handler_timeout: 5s\n http2:\n enabled: false\n handler_limit: 0\n max_concurrent_streams: 0\n max_decoder_header_table_size: 4096\n max_encoder_header_table_size: 4096\n max_read_frame_size: 0\n max_upload_buffer_per_connection: 0\n max_upload_buffer_per_stream: 0\n permit_prohibited_cipher_suites: true\n idle_timeout: 2s\n read_header_timeout: 1s\n read_timeout: 1s\n shutdown_duration: 5s\n write_timeout: 1m\n mode: REST\n network: tcp\n probe_wait_time: 3s\n restart: true\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: true\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n socket_path: \"\"\n startup_strategy:\n - liveness\n - pprof\n - grpc\n - readiness\n shutdown_strategy:\n - readiness\n - grpc\n - pprof\n - liveness\n full_shutdown_duration: 600s\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\nobservability:\n enabled: false\n otlp:\n collector_endpoint: \"\"\n trace_batch_timeout: \"1s\"\n trace_export_timeout: \"1m\"\n trace_max_export_batch_size: 1024\n trace_max_queue_size: 256\n metrics_export_interval: \"1s\"\n metrics_export_timeout: \"1m\"\n attribute:\n namespace: \"_MY_POD_NAMESPACE_\"\n pod_name: \"_MY_POD_NAME_\"\n node_name: \"_MY_NODE_NAME_\"\n service_name: \"vald-mirror-gateway\"\n metrics:\n enable_cgo: true\n enable_goroutine: true\n enable_memory: true\n enable_version_info: true\n version_info_labels:\n - vald_version\n - server_name\n - git_commit\n - build_time\n - go_version\n - go_os\n - go_arch\n - algorithm_info\n trace:\n enabled: false\ngateway:\n pod_name: _MY_POD_NAME_\n register_duration: 1s\n namespace: _MY_POD_NAMESPACE_\n discovery_duration: 1s\n colocation: dc1\n group: \n net:\n dialer:\n dual_stack_enabled: false\n keepalive: 10m\n timeout: 30s\n dns:\n cache_enabled: true\n cache_expiration: 24h\n refresh_duration: 5m\n network: tcp\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: true\n tcp_fast_open: true\n tcp_no_delay: true\n tcp_quick_ack: true\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n client:\n addrs:\n - vald-lb-gateway.default.svc.cluster.local:8081\n health_check_duration: \"1s\"\n connection_pool:\n enable_dns_resolver: true\n enable_rebalance: true\n old_conn_close_duration: 2m\n rebalance_duration: 30m\n size: 3\n backoff:\n backoff_factor: 1.1\n backoff_time_limit: 5s\n enable_error_log: true\n initial_duration: 5ms\n jitter_limit: 100ms\n maximum_duration: 5s\n retry_count: 100\n circuit_breaker:\n closed_error_rate: 0.7\n closed_refresh_timeout: 10s\n half_open_error_rate: 0.5\n min_samples: 1000\n open_timeout: 1s\n call_option:\n content_subtype: \"\"\n max_recv_msg_size: 0\n max_retry_rpc_buffer_size: 0\n max_send_msg_size: 0\n wait_for_ready: true\n dial_option:\n authority: \"\"\n backoff_base_delay: 1s\n backoff_jitter: 0.2\n backoff_max_delay: 120s\n backoff_multiplier: 1.6\n disable_retry: false\n enable_backoff: false\n idle_timeout: 1h\n initial_connection_window_size: 2097152\n initial_window_size: 1048576\n insecure: true\n interceptors: []\n keepalive:\n permit_without_stream: false\n time: \"\"\n timeout: 30s\n max_call_attempts: 0\n max_header_list_size: 0\n max_msg_size: 0\n min_connection_timeout: 20s\n net:\n dialer:\n dual_stack_enabled: true\n keepalive: \"\"\n timeout: \"\"\n dns:\n cache_enabled: true\n cache_expiration: 1h\n refresh_duration: 30m\n network: tcp\n socket_option:\n ip_recover_destination_addr: false\n ip_transparent: false\n reuse_addr: true\n reuse_port: true\n tcp_cork: false\n tcp_defer_accept: false\n tcp_fast_open: false\n tcp_no_delay: false\n tcp_quick_ack: false\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n read_buffer_size: 0\n shared_write_buffer: false\n timeout: \"\"\n user_agent: Vald-gRPC\n write_buffer_size: 0\n tls:\n ca: /path/to/ca\n cert: /path/to/cert\n enabled: false\n insecure_skip_verify: false\n key: /path/to/key\n self_mirror_addr: vald-mirror-gateway.default.svc.cluster.local:8081\n gateway_addr: vald-lb-gateway.default.svc.cluster.local:8081\n" diff --git a/k8s/gateway/gateway/mirror/deployment.yaml b/k8s/gateway/gateway/mirror/deployment.yaml index 9d59372c798..5a0c8ab491b 100644 --- a/k8s/gateway/gateway/mirror/deployment.yaml +++ b/k8s/gateway/gateway/mirror/deployment.yaml @@ -45,7 +45,7 @@ spec: app.kubernetes.io/instance: release-name app.kubernetes.io/component: gateway-mirror annotations: - checksum/configmap: 0ef2f36d15c8d866aaa135cc46793f9c7d3111b07c4b7988526ce391b99fd9b5 + checksum/configmap: 5fec186a5052df1bd407582ddeb40b15289021ecfe3aa111d14fff4222d07291 pyroscope.io/scrape: "true" pyroscope.io/application-name: vald-mirror-gateway pyroscope.io/profile-cpu-enabled: "true" diff --git a/k8s/index/job/correction/configmap.yaml b/k8s/index/job/correction/configmap.yaml index a122e57f493..72550c006b9 100644 --- a/k8s/index/job/correction/configmap.yaml +++ b/k8s/index/job/correction/configmap.yaml @@ -306,6 +306,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false @@ -395,6 +396,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/index/job/creation/configmap.yaml b/k8s/index/job/creation/configmap.yaml index 0aad0f9eb95..b3353178004 100644 --- a/k8s/index/job/creation/configmap.yaml +++ b/k8s/index/job/creation/configmap.yaml @@ -306,6 +306,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/index/job/save/configmap.yaml b/k8s/index/job/save/configmap.yaml index 4c47cbb7d25..c815e56b083 100644 --- a/k8s/index/job/save/configmap.yaml +++ b/k8s/index/job/save/configmap.yaml @@ -306,6 +306,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/manager/index/configmap.yaml b/k8s/manager/index/configmap.yaml index 5b6374ed5c6..dc325556e9b 100644 --- a/k8s/manager/index/configmap.yaml +++ b/k8s/manager/index/configmap.yaml @@ -304,6 +304,7 @@ data: cache_enabled: true cache_expiration: 1h refresh_duration: 30m + network: tcp socket_option: ip_recover_destination_addr: false ip_transparent: false diff --git a/k8s/manager/index/deployment.yaml b/k8s/manager/index/deployment.yaml index 135c3d7070d..11e81cf4274 100644 --- a/k8s/manager/index/deployment.yaml +++ b/k8s/manager/index/deployment.yaml @@ -46,7 +46,7 @@ spec: app.kubernetes.io/instance: release-name app.kubernetes.io/component: manager-index annotations: - checksum/configmap: 0a58a9a201594a2afa98b16b0716b684a03269702de2690609a66710251c2b30 + checksum/configmap: f3baa2e31b4339536e8adfc395a9ef81906c592f02cf1ec0d892715a5313d7af profefe.com/enable: "true" profefe.com/port: "6060" profefe.com/service: vald-manager-index diff --git a/k8s/operator/helm/crds/valdrelease.yaml b/k8s/operator/helm/crds/valdrelease.yaml index 3469a37af50..024f1dc95d0 100644 --- a/k8s/operator/helm/crds/valdrelease.yaml +++ b/k8s/operator/helm/crds/valdrelease.yaml @@ -1273,6 +1273,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -2295,6 +2301,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -3171,6 +3183,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4278,6 +4296,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4477,6 +4501,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -4671,6 +4701,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -5783,6 +5819,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -5974,6 +6016,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -6167,6 +6215,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -7278,6 +7332,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -7376,6 +7436,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8444,6 +8510,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8635,6 +8707,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -8835,6 +8913,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -9747,6 +9831,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -9938,6 +10028,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -10888,6 +10984,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -11079,6 +11181,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -13011,6 +13119,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: @@ -13202,6 +13316,12 @@ spec: type: string refresh_duration: type: string + network: + type: string + enum: + - tcp + - udp + - unix socket_option: type: object properties: diff --git a/pkg/discoverer/k8s/service/discover.go b/pkg/discoverer/k8s/service/discover.go index c95b02420dd..864860af260 100644 --- a/pkg/discoverer/k8s/service/discover.go +++ b/pkg/discoverer/k8s/service/discover.go @@ -55,11 +55,11 @@ type discoverer struct { pods sync.Map[string, *[]pod.Pod] podMetrics sync.Map[string, mpod.Pod] services sync.Map[string, *service.Service] - podsByNode atomic.Value - podsByNamespace atomic.Value - podsByName atomic.Value - nodeByName atomic.Value - svcsByName atomic.Value + podsByNode atomic.Pointer[map[string]map[string]map[string][]*payload.Info_Pod] + podsByNamespace atomic.Pointer[map[string]map[string][]*payload.Info_Pod] + podsByName atomic.Pointer[map[string][]*payload.Info_Pod] + nodeByName atomic.Pointer[map[string]*payload.Info_Node] + svcsByName atomic.Pointer[map[string]*payload.Info_Service] ctrl k8s.Controller namespace string name string @@ -77,11 +77,18 @@ func New(selector *config.Selectors, opts ...Option) (dsc Discoverer, err error) return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) } } - - d.podsByNode.Store(make(map[string]map[string]map[string][]*payload.Info_Pod)) - d.podsByNamespace.Store(make(map[string]map[string][]*payload.Info_Pod)) - d.podsByName.Store(make(map[string][]*payload.Info_Pod)) - d.nodeByName.Store(make(map[string]*payload.Info_Node)) + var ( + podsByNode = make(map[string]map[string]map[string][]*payload.Info_Pod) // map[node][namespace][name][]pod + podsByNamespace = make(map[string]map[string][]*payload.Info_Pod) // map[namespace][name][]pod + podsByName = make(map[string][]*payload.Info_Pod) // map[name][]pod + nodeByName = make(map[string]*payload.Info_Node) // map[name]node + svcsByName = make(map[string]*payload.Info_Service) // map[name]svc + ) + d.podsByNode.Store(&podsByNode) + d.podsByNamespace.Store(&podsByNamespace) + d.podsByName.Store(&podsByName) + d.nodeByName.Store(&nodeByName) + d.svcsByName.Store(&svcsByName) var k8sOpts []k8s.Option k8sOpts = append(k8sOpts, @@ -365,7 +372,7 @@ func (d *discoverer) Start(ctx context.Context) (<-chan error, error) { return true } }) - d.svcsByName.Store(svcsByName) + d.svcsByName.Store(&svcsByName) var wg sync.WaitGroup wg.Add(1) @@ -407,8 +414,8 @@ func (d *discoverer) Start(ctx context.Context) (<-chan error, error) { nodeByName[nodeName].GetPods().Pods = p } } - d.nodeByName.Store(nodeByName) - d.podsByNode.Store(podsByNode) + d.nodeByName.Store(&nodeByName) + d.podsByNode.Store(&podsByNode) return nil })) wg.Add(1) @@ -422,7 +429,7 @@ func (d *discoverer) Start(ctx context.Context) (<-chan error, error) { podsByNamespace[namespace][appName] = p } } - d.podsByNamespace.Store(podsByNamespace) + d.podsByNamespace.Store(&podsByNamespace) return nil })) wg.Add(1) @@ -434,7 +441,7 @@ func (d *discoverer) Start(ctx context.Context) (<-chan error, error) { }) podsByName[appName] = p } - d.podsByName.Store(podsByName) + d.podsByName.Store(&podsByName) return nil })) wg.Wait() @@ -456,8 +463,8 @@ func (d *discoverer) GetPods(req *payload.Discoverer_Request) (pods *payload.Inf ) pods = new(payload.Info_Pods) if req.GetNode() != "" && req.GetNode() != "*" { - pbn, ok := d.podsByNode.Load().(map[string]map[string]map[string][]*payload.Info_Pod) - if !ok { + pbn := *d.podsByNode.Load() + if pbn == nil { return nil, errors.ErrInvalidDiscoveryCache } podsByNamespace, ok = pbn[req.GetNode()] @@ -467,8 +474,8 @@ func (d *discoverer) GetPods(req *payload.Discoverer_Request) (pods *payload.Inf } if req.GetNamespace() != "" && req.GetNamespace() != "*" { if podsByNamespace == nil { - podsByNamespace, ok = d.podsByNamespace.Load().(map[string]map[string][]*payload.Info_Pod) - if !ok { + podsByNamespace = *d.podsByNamespace.Load() + if podsByNamespace == nil { return nil, errors.ErrInvalidDiscoveryCache } } @@ -486,8 +493,8 @@ func (d *discoverer) GetPods(req *payload.Discoverer_Request) (pods *payload.Inf } } } else { - podsByName, ok = d.podsByName.Load().(map[string][]*payload.Info_Pod) - if !ok { + podsByName = *d.podsByName.Load() + if podsByName == nil { return nil, errors.ErrInvalidDiscoveryCache } } @@ -507,15 +514,17 @@ func (d *discoverer) GetPods(req *payload.Discoverer_Request) (pods *payload.Inf pods.GetPods()[i].GetNode().Pods = nil } } + slices.SortFunc(pods.Pods, func(left, right *payload.Info_Pod) int { + return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage()) + }) return pods, nil } func (d *discoverer) GetNodes( req *payload.Discoverer_Request, ) (nodes *payload.Info_Nodes, err error) { - nodes = new(payload.Info_Nodes) - nbn, ok := d.nodeByName.Load().(map[string]*payload.Info_Node) - if !ok { + nbn := *d.nodeByName.Load() + if nbn == nil { return nil, errors.ErrInvalidDiscoveryCache } if req.GetNode() != "" && req.GetNode() != "*" { @@ -527,10 +536,15 @@ func (d *discoverer) GetNodes( if err == nil { n.Pods = ps } - nodes.Nodes = append(nodes.GetNodes(), n) - return nodes, nil + return &payload.Info_Nodes{ + Nodes: []*payload.Info_Node{ + n, + }, + }, nil + } + nodes = &payload.Info_Nodes{ + Nodes: make([]*payload.Info_Node, 0, len(nbn)), } - ns := nodes.Nodes for name, n := range nbn { req.Node = name if n.GetPods() != nil { @@ -546,13 +560,11 @@ func (d *discoverer) GetNodes( n.Pods = ps } } - ns = append(ns, n) + nodes.Nodes = append(nodes.Nodes, n) } - slices.SortFunc(ns, func(left, right *payload.Info_Node) int { + slices.SortFunc(nodes.Nodes, func(left, right *payload.Info_Node) int { return cmp.Compare(left.GetMemory().GetUsage(), right.GetMemory().GetUsage()) }) - - nodes.Nodes = ns return nodes, nil } @@ -561,8 +573,8 @@ func (d *discoverer) GetServices( req *payload.Discoverer_Request, ) (svcs *payload.Info_Services, err error) { svcs = new(payload.Info_Services) - sbn, ok := d.svcsByName.Load().(map[string]*payload.Info_Service) - if !ok { + sbn := *d.svcsByName.Load() + if sbn == nil { return nil, errors.ErrInvalidDiscoveryCache } diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 5bf3b6b267d..f7651547c0e 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -16,7 +16,6 @@ package usecase import ( "context" "os" - "slices" "syscall" "time" @@ -86,10 +85,6 @@ func New(cfg *config.Data) (r runner.Runner, err error) { discoverer.WithDiscoverDuration(cfg.Corrector.Discoverer.Duration), discoverer.WithOptions(acOpts...), discoverer.WithNodeName(cfg.Corrector.NodeName), - discoverer.WithOnDiscoverFunc(func(_ context.Context, _ discoverer.Client, addrs []string) error { - slices.Reverse(addrs) - return nil - }), ) if err != nil { return nil, err diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index 4fa18039817..6bba5d10fac 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -56,7 +56,7 @@ type index struct { indexDurationLimit time.Duration saveIndexDuration time.Duration saveIndexDurationLimit time.Duration - shouldSaveList sync.Map[string, struct{}] + shouldSaveList sync.Map[string, bool] createIndexConcurrency int saveIndexConcurrency int indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -67,8 +67,6 @@ type index struct { uncommittedUUIDsCount uint32 } -var empty = struct{}{} - func New(opts ...Option) (idx Indexer, err error) { i := new(index) for _, opt := range append(defaultOptions, opts...) { @@ -243,7 +241,7 @@ func (idx *index) createIndex(ctx context.Context, enableLowIndexSkip bool) (err log.Warnf("an error occurred while calling CreateIndex of %s: %s", addr, err) return err } - _, ok = idx.shouldSaveList.LoadOrStore(addr, empty) + _, ok = idx.shouldSaveList.LoadOrStore(addr, true) if ok { log.Debugf("addr %s already queued for saveIndex", addr) return nil diff --git a/pkg/manager/index/service/indexer_test.go b/pkg/manager/index/service/indexer_test.go index a4e80576c7d..8b0d017b70f 100644 --- a/pkg/manager/index/service/indexer_test.go +++ b/pkg/manager/index/service/indexer_test.go @@ -120,7 +120,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -283,7 +283,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -444,7 +444,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -604,7 +604,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -759,7 +759,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -907,7 +907,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -1055,7 +1055,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -1203,7 +1203,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -1351,7 +1351,7 @@ package service // indexDurationLimit time.Duration // saveIndexDuration time.Duration // saveIndexDurationLimit time.Duration -// shouldSaveList sync.Map[string, struct{}] +// shouldSaveList sync.Map[string, bool] // createIndexConcurrency int // saveIndexConcurrency int // indexInfos sync.Map[string, *payload.Info_Index_Count] diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 09bc5cab46e..76e260ebc68 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "os/exec" + "strings" "testing" "time" @@ -802,8 +803,29 @@ func TestE2EIndexJobCorrection(t *testing.T) { } t.Log("Test case 2: execute index correction after one agent removed") - t.Log("removing vald-agent-0...") - cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl delete pod vald-agent-0 && kubectl wait --for=condition=Ready pod/vald-agent-0") + detail, err := op.IndexDetail(t, ctx) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + if len(detail.Counts) == 0 { + t.Fatal("no pods found with index details") + } + var target string + for a, c := range detail.Counts { + if c.Stored > 0 { + parts := strings.Split(a, ":") + if len(parts) == 0 { + t.Fatalf("invalid address format: %s", a) + } + target = parts[0] + break + } + } + if target == "" { + t.Fatal("no pods found with stored count > 0") + } + + cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("kubectl get pods -o custom-columns=:metadata.name --no-headers=true --field-selector=\"status.podIP=%s\"", target)) out, err := cmd.Output() if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { @@ -812,6 +834,18 @@ func TestE2EIndexJobCorrection(t *testing.T) { t.Fatalf("unexpected error on creating job: %v", err) } } + agent := strings.TrimRight(string(out), "\n") + + t.Logf("removing %s...", agent) + cmd = exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("kubectl delete pod %s && kubectl wait --for=condition=Ready pod/%s", agent, agent)) + out, err = cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + t.Fatalf("%s, %s, %v", string(out), string(exitErr.Stderr), err) + } else { + t.Fatalf("unexpected error on creating job: %v", err) + } + } t.Log(string(out)) // correct the deleted index diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index 9ee24d79e91..11d21d65316 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -133,6 +133,7 @@ type Client interface { CreateIndex(t *testing.T, ctx context.Context) error SaveIndex(t *testing.T, ctx context.Context) error IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Index_Count, error) + IndexDetail(t *testing.T, ctx context.Context) (*payload.Info_Index_Detail, error) } type client struct { @@ -182,6 +183,17 @@ func (c *client) IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Ind return client.IndexInfo(ctx, &payload.Empty{}) } +func (c *client) IndexDetail( + t *testing.T, ctx context.Context, +) (*payload.Info_Index_Detail, error) { + client, err := c.getClient() + if err != nil { + return nil, err + } + + return client.IndexDetail(ctx, &payload.Empty{}) +} + func (c *client) getGRPCConn() (*grpc.ClientConn, error) { return grpc.NewClient( c.host+":"+strconv.Itoa(c.port),