From b8646e4e9dd31cc7c8ffcc42b95962dd08fa1a9e Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Mon, 23 Dec 2024 05:53:47 +0000 Subject: [PATCH] feat: Add support for deterministic listener ports (based on broker ID) --- cmd/kafka-proxy/server.go | 1 + config/config.go | 7 +- proxy/processor_default_test.go | 7 +- proxy/protocol/responses.go | 13 ++- proxy/protocol/responses_test.go | 9 ++- proxy/proxy.go | 58 ++++++++++---- proxy/proxy_test.go | 133 +++++++++++++++++++++++++------ 7 files changed, 179 insertions(+), 49 deletions(-) diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index f5811c7a..dccfdea7 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -89,6 +89,7 @@ func initFlags() { Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))") Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started") Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment") + Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).") Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.") Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.") diff --git a/config/config.go b/config/config.go index d8f958fa..9a15b6f6 100644 --- a/config/config.go +++ b/config/config.go @@ -22,13 +22,17 @@ var ( Version = "unknown" ) -type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) +type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) type ListenerConfig struct { BrokerAddress string ListenerAddress string AdvertisedAddress string } +type IdListenerConfig struct { + BrokerAddress string + Listener net.Listener +} type DialAddressMapping struct { SourceAddress string DestinationAddress string @@ -74,6 +78,7 @@ type Config struct { DefaultListenerIP string BootstrapServers []ListenerConfig ExternalServers []ListenerConfig + DeterministicListeners bool DialAddressMappings []DialAddressMapping DisableDynamicListeners bool DynamicAdvertisedListener string diff --git a/proxy/processor_default_test.go b/proxy/processor_default_test.go index f2720f4a..cc31e58a 100644 --- a/proxy/processor_default_test.go +++ b/proxy/processor_default_test.go @@ -3,11 +3,12 @@ package proxy import ( "bytes" "encoding/hex" + "testing" + "time" + "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestHandleRequest(t *testing.T) { @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) { } func TestHandleResponse(t *testing.T) { - netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" { switch brokerPort { case 19092: diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index fcd5cb65..0f9b0509 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -14,6 +14,7 @@ const ( brokersKeyName = "brokers" hostKeyName = "host" portKeyName = "port" + nodeKeyName = "node_id" coordinatorKeyName = "coordinator" coordinatorsKeyName = "coordinators" @@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu if !ok { return errors.New("broker.port not found") } + nodeId, ok := broker.Get(nodeKeyName).(int32) + if !ok { + return errors.New("broker.node_id not found") + } if host == "" && port <= 0 { continue } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } @@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e if !ok { return errors.New("coordinator.port not found") } + nodeId, ok := coordinator.Get(nodeKeyName).(int32) + if !ok { + return errors.New("coordinator.node_id not found") + } if host == "" && port <= 0 { return nil } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index c2ef822d..549e9737 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -3,11 +3,12 @@ package protocol import ( "encoding/hex" "fmt" - "github.com/google/uuid" "reflect" "strings" "testing" + "github.com/google/uuid" + "github.com/grepplabs/kafka-proxy/config" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -20,7 +21,7 @@ var ( // topic_metadata 0x00, 0x00, 0x00, 0x00} - testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "myhost1", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { @@ -31,7 +32,7 @@ var ( return "", 0, errors.New("unexpected data") } - testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 19092 { return "myhost1", 34001, nil } else if brokerHost == "localhost" && brokerPort == 29092 { @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) { a.Nil(err) a.Equal(bytes, resp) - modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "azure.microsoft.com", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { diff --git a/proxy/proxy.go b/proxy/proxy.go index 203635e4..7114a780 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -25,11 +25,13 @@ type Listeners struct { listenFunc ListenFunc + deterministicListeners bool disableDynamicListeners bool dynamicSequentialMinPort int - brokerToListenerConfig map[string]config.ListenerConfig - lock sync.RWMutex + brokerToListenerConfig map[string]config.ListenerConfig + brokerIdToIdListenerConfig map[int32]config.IdListenerConfig + lock sync.RWMutex } func NewListeners(cfg *config.Config) (*Listeners, error) { @@ -64,15 +66,19 @@ func NewListeners(cfg *config.Config) (*Listeners, error) { return nil, err } + brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig) + return &Listeners{ - defaultListenerIP: defaultListenerIP, - dynamicAdvertisedListener: dynamicAdvertisedListener, - connSrc: make(chan Conn, 1), - brokerToListenerConfig: brokerToListenerConfig, - tcpConnOptions: tcpConnOptions, - listenFunc: listenFunc, - disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, - dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, + defaultListenerIP: defaultListenerIP, + dynamicAdvertisedListener: dynamicAdvertisedListener, + connSrc: make(chan Conn, 1), + brokerToListenerConfig: brokerToListenerConfig, + brokerIdToIdListenerConfig: brokerIdToIdListenerConfig, + tcpConnOptions: tcpConnOptions, + listenFunc: listenFunc, + deterministicListeners: cfg.Proxy.DeterministicListeners, + disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, + dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, }, nil } @@ -117,7 +123,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo return brokerToListenerConfig, nil } -func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { +func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "" || brokerPort <= 0 { return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort) } @@ -126,6 +132,7 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l p.lock.RLock() listenerConfig, ok := p.brokerToListenerConfig[brokerAddress] + idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId] p.lock.RUnlock() if ok { @@ -133,13 +140,25 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l return util.SplitHostPort(listenerConfig.AdvertisedAddress) } if !p.disableDynamicListeners { + if brokerIdFound { + logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId) + // Existing broker ID found, but with a different upstream broker + // Close existing listener, remove two mappings: + // * ID to removed upstream broker + // * removed upstream broker + idListenerConfig.Listener.Close() + p.lock.Lock() + delete(p.brokerIdToIdListenerConfig, brokerId) + delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress) + p.lock.Unlock() + } logrus.Infof("Starting dynamic listener for broker %s", brokerAddress) - return p.ListenDynamicInstance(brokerAddress) + return p.ListenDynamicInstance(brokerAddress, brokerId) } return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort) } -func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) { +func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) { p.lock.Lock() defer p.lock.Unlock() // double check @@ -147,9 +166,15 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, return util.SplitHostPort(v.AdvertisedAddress) } - defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort)) - if p.dynamicSequentialMinPort != 0 { - p.dynamicSequentialMinPort += 1 + var defaultListenerAddress string + + if p.deterministicListeners { + defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId))) + } else { + defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort)) + if p.dynamicSequentialMinPort != 0 { + p.dynamicSequentialMinPort += 1 + } } cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress} @@ -167,6 +192,7 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port)) p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress} + p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l} logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress) diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 0bcb8515..13cf3568 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2,9 +2,10 @@ package proxy import ( "fmt" + "testing" + "github.com/grepplabs/kafka-proxy/config" "github.com/stretchr/testify/assert" - "testing" ) func TestGetBrokerToListenerConfig(t *testing.T) { @@ -24,7 +25,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -38,9 +43,21 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{}, nil, @@ -64,8 +81,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -79,8 +104,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32401", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32401 0.0.0.0:32400} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -88,8 +121,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32401", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32401} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -97,13 +138,31 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32403", "kafka-proxy-0:32403", "kafka-proxy-0:32403"}, - {"192.168.99.100:32404", "kafka-proxy-0:32404", "kafka-proxy-0:32404"}, + { + BrokerAddress: "192.168.99.100:32403", + ListenerAddress: "kafka-proxy-0:32403", + AdvertisedAddress: "kafka-proxy-0:32403"}, + { + BrokerAddress: "192.168.99.100:32404", + ListenerAddress: "kafka-proxy-0:32404", + AdvertisedAddress: "kafka-proxy-0:32404"}, }, nil, map[string]config.ListenerConfig{ @@ -136,10 +195,18 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, nil, map[string]config.ListenerConfig{ @@ -152,10 +219,18 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-1:32400", "kafka-proxy-1:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-1:32400", + AdvertisedAddress: "kafka-proxy-1:32400", + }, }, fmt.Errorf("bootstrap and external server mappings 192.168.99.100:32400 with different advertised addresses: kafka-proxy-1:32400 and kafka-proxy-0:32400"), nil, @@ -163,7 +238,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping has different listener and advertised addresses {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32401}"), nil, @@ -171,8 +250,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32400", "kafka-proxy-0:32401", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping 192.168.99.100:32400 configured twice: kafka-proxy-0:32401 and {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32400}"), nil,