Skip to content

Commit

Permalink
Merge pull request #2472 from openziti/fix.2471.legacy.sdk.svc.accces…
Browse files Browse the repository at this point in the history
…s.in.ha

fixes #2471 pins legacy API Session to the initiating controller
  • Loading branch information
andrewpmartinez authored Oct 9, 2024
2 parents e599a37 + adf8610 commit bd8b4da
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 24 deletions.
11 changes: 8 additions & 3 deletions router/state/apiSessionAdded.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ func (h *apiSessionAddedHandler) ContentType() int32 {
return env.ApiSessionAddedType
}

func (h *apiSessionAddedHandler) HandleReceive(msg *channel.Message, _ channel.Channel) {
func (h *apiSessionAddedHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
go func() {
req := &edge_ctrl_pb.ApiSessionAdded{}
if err := proto.Unmarshal(msg.Body, req); err == nil {
for _, session := range req.ApiSessions {
h.sm.AddApiSession(session)
h.sm.AddApiSession(&ApiSession{
ApiSession: session,
ControllerId: ch.Id(),
})
}

if req.IsFullState {
Expand Down Expand Up @@ -149,7 +152,9 @@ func (h *apiSessionAddedHandler) syncFailed(err error) {
func (h *apiSessionAddedHandler) legacySync(reqWithState *apiSessionAddedWithState) {
pfxlog.Logger().Warn("using legacy sync logic some connections may be dropped")
for _, apiSession := range reqWithState.ApiSessions {
h.sm.AddApiSession(apiSession)
h.sm.AddApiSession(&ApiSession{
ApiSession: apiSession,
})
}

h.sm.RemoveMissingApiSessions(reqWithState.ApiSessions, "")
Expand Down
3 changes: 2 additions & 1 deletion router/state/apiSessionRemoved.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (h *apiSessionRemovedHandler) ContentType() int32 {
return env.ApiSessionRemovedType
}

func (h *apiSessionRemovedHandler) HandleReceive(msg *channel.Message, _ channel.Channel) {
func (h *apiSessionRemovedHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
go func() {
req := &edge_ctrl_pb.ApiSessionRemoved{}
if err := proto.Unmarshal(msg.Body, req); err == nil {
Expand All @@ -55,6 +55,7 @@ func (h *apiSessionRemovedHandler) HandleReceive(msg *channel.Message, _ channel
pfxlog.Logger().
WithField("apiSessionToken", token).
WithField("apiSessionId", id).
WithField("ctrlId", ch.Id()).
Debugf("removing api session [token: %s] [id: %s]", token, id)

h.sm.RemoveApiSession(token)
Expand Down
7 changes: 5 additions & 2 deletions router/state/apiSessionUpdated.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ func (h *apiSessionUpdatedHandler) ContentType() int32 {
return env.ApiSessionUpdatedType
}

func (h *apiSessionUpdatedHandler) HandleReceive(msg *channel.Message, _ channel.Channel) {
func (h *apiSessionUpdatedHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
go func() {
req := &edge_ctrl_pb.ApiSessionUpdated{}
if err := proto.Unmarshal(msg.Body, req); err == nil {
for _, session := range req.ApiSessions {
h.sm.UpdateApiSession(session)
h.sm.UpdateApiSession(&ApiSession{
ApiSession: session,
ControllerId: ch.Id(),
})
}
} else {
pfxlog.Logger().Panic("could not convert message as network session updated")
Expand Down
35 changes: 23 additions & 12 deletions router/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ type Manager interface {
//ApiSessions
GetApiSession(token string) *ApiSession
GetApiSessionWithTimeout(token string, timeout time.Duration) *ApiSession
AddApiSession(apiSession *edge_ctrl_pb.ApiSession)
UpdateApiSession(apiSession *edge_ctrl_pb.ApiSession)
AddApiSession(apiSession *ApiSession)
UpdateApiSession(apiSession *ApiSession)
RemoveApiSession(token string)
RemoveMissingApiSessions(knownSessions []*edge_ctrl_pb.ApiSession, beforeSessionId string)
AddConnectedApiSession(token string)
Expand Down Expand Up @@ -114,7 +114,7 @@ var _ Manager = (*ManagerImpl)(nil)
func NewManager(env Env) Manager {
return &ManagerImpl{
EventEmmiter: events.New(),
apiSessionsByToken: cmap.New[*edge_ctrl_pb.ApiSession](),
apiSessionsByToken: cmap.New[*ApiSession](),
activeApiSessions: cmap.New[*MapWithMutex](),
sessions: cmap.New[uint32](),
recentlyRemovedSessions: cmap.New[time.Time](),
Expand All @@ -126,7 +126,7 @@ func NewManager(env Env) Manager {

type ManagerImpl struct {
env Env
apiSessionsByToken cmap.ConcurrentMap[string, *edge_ctrl_pb.ApiSession]
apiSessionsByToken cmap.ConcurrentMap[string, *ApiSession]

activeApiSessions cmap.ConcurrentMap[string, *MapWithMutex]
activeChannels cmap.ConcurrentMap[string, *ApiSession]
Expand Down Expand Up @@ -362,7 +362,7 @@ func (sm *ManagerImpl) IsSyncInProgress() bool {
return sm.currentSync == ""
}

func (sm *ManagerImpl) AddApiSession(apiSession *edge_ctrl_pb.ApiSession) {
func (sm *ManagerImpl) AddApiSession(apiSession *ApiSession) {
pfxlog.Logger().
WithField("apiSessionId", apiSession.Id).
WithField("apiSessionToken", apiSession.Token).
Expand All @@ -372,7 +372,7 @@ func (sm *ManagerImpl) AddApiSession(apiSession *edge_ctrl_pb.ApiSession) {
sm.Emit(EventAddedApiSession, apiSession)
}

func (sm *ManagerImpl) UpdateApiSession(apiSession *edge_ctrl_pb.ApiSession) {
func (sm *ManagerImpl) UpdateApiSession(apiSession *ApiSession) {
pfxlog.Logger().
WithField("apiSessionId", apiSession.Id).
WithField("apiSessionToken", apiSession.Token).
Expand Down Expand Up @@ -405,7 +405,7 @@ func (sm *ManagerImpl) RemoveMissingApiSessions(knownApiSessions []*edge_ctrl_pb
}

var tokensToRemove []string
sm.apiSessionsByToken.IterCb(func(token string, apiSession *edge_ctrl_pb.ApiSession) {
sm.apiSessionsByToken.IterCb(func(token string, apiSession *ApiSession) {
if _, ok := validTokens[token]; !ok && (beforeSessionId == "" || apiSession.Id <= beforeSessionId) {
tokensToRemove = append(tokensToRemove, token)
}
Expand Down Expand Up @@ -454,8 +454,21 @@ func (sm *ManagerImpl) GetApiSessionWithTimeout(token string, timeout time.Durat

type ApiSession struct {
*edge_ctrl_pb.ApiSession
JwtToken *jwt.Token
Claims *common.AccessClaims
JwtToken *jwt.Token
Claims *common.AccessClaims
ControllerId string //used for non HA API Sessions
}

func (a *ApiSession) SelectCtrlCh(ctrls env.NetworkControllers) channel.Channel {
if a == nil {
return nil
}

if a.ControllerId != "" {
return ctrls.GetCtrlChannel(a.ControllerId)
}

return ctrls.AnyCtrlChannel()
}

func NewApiSessionFromToken(jwtToken *jwt.Token, accessClaims *common.AccessClaims) *ApiSession {
Expand Down Expand Up @@ -493,9 +506,7 @@ func (sm *ManagerImpl) GetApiSession(token string) *ApiSession {
}

if apiSession, ok := sm.apiSessionsByToken.Get(token); ok {
return &ApiSession{
ApiSession: apiSession,
}
return apiSession
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion router/xgress_edge/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ func (self *edgeTerminator) close(registry *hostedServiceRegistry, notifySdk boo
if notifyCtrl {
if self.terminatorId != "" {
logger.Info("removing terminator on controller")
ctrlCh := self.edgeClientConn.listener.factory.ctrls.AnyCtrlChannel()

ctrlCh := self.edgeClientConn.apiSession.SelectCtrlCh(self.edgeClientConn.listener.factory.ctrls)

if ctrlCh == nil {
logger.Error("no controller available, unable to remove terminator")
} else if err := self.edgeClientConn.removeTerminator(ctrlCh, self.token, self.terminatorId); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions router/xgress_edge/hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (self *hostedServiceRegistry) RemoveTerminators(terminatorIds []string) err
request := &ctrl_pb.RemoveTerminatorsRequest{
TerminatorIds: terminatorIds,
}

ctrls := self.env.GetNetworkControllers()
ctrlCh := ctrls.AnyValidCtrlChannel()
if ctrlCh == nil {
Expand Down Expand Up @@ -560,7 +560,8 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
request.ApiSessionToken = apiSession.Token
}

ctrlCh := factory.ctrls.AnyCtrlChannel()
ctrlCh := terminator.edgeClientConn.apiSession.SelectCtrlCh(factory.ctrls)

if ctrlCh == nil {
errStr := "no controller available, cannot create terminator"
log.Error(errStr)
Expand Down
7 changes: 4 additions & 3 deletions router/xgress_edge/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (self *edgeClientConn) processConnect(manager state.Manager, req *channel.M
pfxlog.Logger().Errorf("connId not set. unable to process connect message")
return
}
ctrlCh := self.apiSession.SelectCtrlCh(self.listener.factory.ctrls)

ctrlCh := self.listener.factory.ctrls.AnyCtrlChannel()
if ctrlCh == nil {
errStr := "no controller available, cannot create circuit"
log.Error(errStr)
Expand Down Expand Up @@ -275,7 +275,8 @@ func (self *edgeClientConn) sendCreateCircuitRequestV2(req *ctrl_msg.CreateCircu
}

func (self *edgeClientConn) processBind(manager state.Manager, req *channel.Message, ch channel.Channel) {
ctrlCh := self.listener.factory.ctrls.AnyCtrlChannel()
ctrlCh := self.apiSession.SelectCtrlCh(self.listener.factory.ctrls)

if ctrlCh == nil {
errStr := "no controller available, cannot create terminator"
pfxlog.ContextLogger(ch.Label()).
Expand Down Expand Up @@ -581,8 +582,8 @@ func (self *edgeClientConn) processUpdateBind(manager state.Manager, req *channe
log.Error("failed to update bind, no listener found")
return
}
ctrlCh := self.apiSession.SelectCtrlCh(self.listener.factory.ctrls)

ctrlCh := self.listener.factory.ctrls.AnyCtrlChannel()
if ctrlCh == nil {
log.Error("no controller available, cannot update terminator")
return
Expand Down

0 comments on commit bd8b4da

Please sign in to comment.