Skip to content

Commit

Permalink
[Bug] Fix #290, #289
Browse files Browse the repository at this point in the history
  • Loading branch information
derailed committed Mar 5, 2024
1 parent 82151aa commit 2eed9d6
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 104 deletions.
52 changes: 33 additions & 19 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,18 @@ func InitConnectionOrDie(config types.Config) (*APIClient, error) {
config: config,
cache: cache.NewLRUExpireCache(cacheSize),
}
_, err := a.serverGroups()
if err != nil {
return nil, err
}
if err := a.supportsMetricsResources(); err != nil {
log.Warn().Msgf("no metrics server detected %s", err.Error())
log.Warn().Err(err).Msgf("no metrics server detected")
}

return &a, nil
}

func makeSAR(ns string, gvr types.GVR) *authorizationv1.SelfSubjectAccessReview {
func makeSAR(ns string, gvr types.GVR, n string) *authorizationv1.SelfSubjectAccessReview {
if ns == "-" {
ns = ""
}
Expand All @@ -83,13 +87,14 @@ func makeSAR(ns string, gvr types.GVR) *authorizationv1.SelfSubjectAccessReview
Group: res.Group,
Resource: res.Resource,
Subresource: gvr.SubResource(),
Name: n,
},
},
}
}

func makeCacheKey(ns, gvr string, vv []string) string {
return ns + ":" + gvr + "::" + strings.Join(vv, ",")
func makeCacheKey(ns string, gvr types.GVR, n string, vv []string) string {
return ns + ":" + gvr.String() + ":" + n + "::" + strings.Join(vv, ",")
}

// ActiveContext returns the current context name.
Expand Down Expand Up @@ -146,11 +151,11 @@ func (a *APIClient) ConnectionOK() bool {
}

// CanI checks if user has access to a certain resource.
func (a *APIClient) CanI(ns string, gvr types.GVR, verbs ...string) (auth bool, err error) {
func (a *APIClient) CanI(ns string, gvr types.GVR, n string, verbs []string) (auth bool, err error) {
if IsClusterWide(ns) {
ns = AllNamespaces
ns = BlankNamespace
}
key := makeCacheKey(ns, gvr.String(), verbs)
key := makeCacheKey(ns, gvr, n, verbs)
if v, ok := a.cache.Get(key); ok {
if auth, ok = v.(bool); ok {
return auth, nil
Expand All @@ -161,7 +166,7 @@ func (a *APIClient) CanI(ns string, gvr types.GVR, verbs ...string) (auth bool,
if err != nil {
return false, err
}
dial, sar := c.AuthorizationV1().SelfSubjectAccessReviews(), makeSAR(ns, gvr)
dial, sar := c.AuthorizationV1().SelfSubjectAccessReviews(), makeSAR(ns, gvr, n)
ctx, cancel := context.WithTimeout(context.Background(), CallTimeout)
defer cancel()
for _, v := range verbs {
Expand Down Expand Up @@ -357,30 +362,38 @@ func (a *APIClient) checkCacheBool(key string) (state bool, ok bool) {
return
}

func (a *APIClient) serverGroups() (*metav1.APIGroupList, error) {
dial, err := a.CachedDiscovery()
if err != nil {
log.Warn().Err(err).Msgf("Unable to dial discovery API")
return nil, err
}
apiGroups, err := dial.ServerGroups()
if err != nil {
log.Warn().Err(err).Msgf("Unable to retrieve server groups")
return nil, fmt.Errorf("unable to fetch server groups: %w", err)
}

return apiGroups, nil
}

func (a *APIClient) supportsMetricsResources() error {
supported, ok := a.checkCacheBool(cacheMXAPIKey)
if ok {
if supported {
return nil
}
return errors.New("No metrics-server detected")
return errors.New("no metrics-server detected")
}

defer func() {
a.cache.Add(cacheMXAPIKey, supported, cacheExpiry)
}()

dial, err := a.CachedDiscovery()
if err != nil {
log.Warn().Err(err).Msgf("Unable to dial discovery API")
return err
}
apiGroups, err := dial.ServerGroups()
gg, err := a.serverGroups()
if err != nil {
log.Warn().Err(err).Msgf("Unable to retrieve server groups")
return err
}
for _, grp := range apiGroups.Groups {
for _, grp := range gg.Groups {
if grp.Name != metricsapi.GroupName {
continue
}
Expand All @@ -390,8 +403,9 @@ func (a *APIClient) supportsMetricsResources() error {
}
}

return errors.New("No metrics-server detected")
return errors.New("no metrics-server detected")
}

func checkMetricsVersion(grp metav1.APIGroup) bool {
for _, version := range grp.Versions {
for _, supportedVersion := range supportedMetricsAPIVersions {
Expand Down
2 changes: 1 addition & 1 deletion internal/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *Config) CurrentUserName() (string, error) {

// CurrentNamespaceName retrieves the active namespace.
func (c *Config) CurrentNamespaceName() (string, error) {
if c.flags.Namespace != nil {
if isSet(c.flags.Namespace) {
return *c.flags.Namespace, nil
}

Expand Down
86 changes: 49 additions & 37 deletions internal/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (f *Factory) Start(ns string) {
}
}

// Terminate stops the factory.
// Terminate terminates all watchers and forwards.
func (f *Factory) Terminate() {
f.mx.Lock()
defer f.mx.Unlock()
Expand All @@ -66,48 +66,68 @@ func (f *Factory) Terminate() {

// List returns a resource collection.
func (f *Factory) List(gvr types.GVR, ns string, wait bool, labels labels.Selector) ([]runtime.Object, error) {
inf, err := f.CanForResource(ns, gvr, types.MonitorAccess...)
inf, err := f.CanForResource(ns, gvr, types.ListAccess)
if err != nil {
return nil, err
}
if wait {
f.waitForCacheSync(ns)
if IsAllNamespace(ns) {
ns = BlankNamespace
}

var oo []runtime.Object
if IsClusterScoped(ns) {
return inf.Lister().List(labels)
oo, err = inf.Lister().List(labels)
} else {
oo, err = inf.Lister().ByNamespace(ns).List(labels)
}
if !wait || (wait && inf.Informer().HasSynced()) {
return oo, err
}

if IsAllNamespace(ns) {
ns = AllNamespaces
f.waitForCacheSync(ns)
if IsClusterScoped(ns) {
return inf.Lister().List(labels)
}
return inf.Lister().ByNamespace(ns).List(labels)
}

// HasSynced checks if given informer is up to date.
func (f *Factory) HasSynced(gvr types.GVR, ns string) (bool, error) {
inf, err := f.CanForResource(ns, gvr, types.ListAccess)
if err != nil {
return false, err
}

return inf.Informer().HasSynced(), nil
}

// Get retrieves a given resource.
func (f *Factory) Get(gvr types.GVR, path string, wait bool, sel labels.Selector) (runtime.Object, error) {
ns, n := Namespaced(path)
inf, err := f.CanForResource(ns, gvr, types.GetVerb)
func (f *Factory) Get(gvr types.GVR, fqn string, wait bool, sel labels.Selector) (runtime.Object, error) {
ns, n := Namespaced(fqn)
inf, err := f.CanForResource(ns, gvr, []string{types.GetVerb})
if err != nil {
return nil, err
}

if wait {
f.waitForCacheSync(ns)
var o runtime.Object
if IsClusterScoped(ns) {
o, err = inf.Lister().Get(n)
} else {
o, err = inf.Lister().ByNamespace(ns).Get(n)
}
if !wait || (wait && inf.Informer().HasSynced()) {
return o, err
}

f.waitForCacheSync(ns)
if IsClusterScoped(ns) {
return inf.Lister().Get(n)
}

return inf.Lister().ByNamespace(ns).Get(n)
}

func (f *Factory) waitForCacheSync(ns string) {
if IsClusterWide(ns) {
ns = AllNamespaces
}

if f.isClusterWide() {
ns = AllNamespaces
ns = BlankNamespace
}

f.mx.RLock()
Expand All @@ -131,7 +151,7 @@ func (f *Factory) WaitForCacheSync() {
for ns, fac := range f.factories {
m := fac.WaitForCacheSync(f.stopChan)
for k, v := range m {
log.Debug().Msgf("CACHE %q synched %t:%s", ns, v, k)
log.Debug().Msgf("CACHE `%q Loaded %t:%s", ns, v, k)
}
}
}
Expand All @@ -148,33 +168,24 @@ func (f *Factory) FactoryFor(ns string) di.DynamicSharedInformerFactory {

// SetActiveNS sets the active namespace.
func (f *Factory) SetActiveNS(ns string) error {
if !f.isClusterWide() {
if _, err := f.ensureFactory(ns); err != nil {
return err
}
if f.isClusterWide() {
return nil
}

return nil
_, err := f.ensureFactory(ns)
return err
}

func (f *Factory) isClusterWide() bool {
f.mx.RLock()
defer f.mx.RUnlock()
_, ok := f.factories[BlankNamespace]

_, ok := f.factories[AllNamespaces]
return ok
}

// CanForResource return an informer is user has access.
func (f *Factory) CanForResource(ns string, gvr types.GVR, verbs ...string) (informers.GenericInformer, error) {
// If user can access resource cluster wide, prefer cluster wide factory.
if !IsClusterWide(ns) {
auth, err := f.Client().CanI(AllNamespaces, gvr, verbs...)
if auth && err == nil {
return f.ForResource(AllNamespaces, gvr)
}
}
auth, err := f.Client().CanI(ns, gvr, verbs...)
func (f *Factory) CanForResource(ns string, gvr types.GVR, verbs []string) (informers.GenericInformer, error) {
auth, err := f.Client().CanI(ns, gvr, "", verbs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,13 +217,14 @@ func (f *Factory) ForResource(ns string, gvr types.GVR) (informers.GenericInform

func (f *Factory) ensureFactory(ns string) (di.DynamicSharedInformerFactory, error) {
if IsClusterWide(ns) {
ns = AllNamespaces
ns = BlankNamespace
}
f.mx.Lock()
defer f.mx.Unlock()
if fac, ok := f.factories[ns]; ok {
return fac, nil
}

dial, err := f.client.DynDial()
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ var toFileName = regexp.MustCompile(`[^(\w/\.)]`)

// IsClusterWide returns true if ns designates cluster scope, false otherwise.
func IsClusterWide(ns string) bool {
return ns == NamespaceAll || ns == AllNamespaces || ns == ClusterScope
return ns == NamespaceAll || ns == BlankNamespace || ns == ClusterScope
}

// CleanseNamespace ensures all ns maps to blank.
func CleanseNamespace(ns string) string {
if IsAllNamespace(ns) {
return AllNamespaces
return BlankNamespace
}

return ns
Expand All @@ -36,7 +36,7 @@ func IsAllNamespace(ns string) bool {

// IsAllNamespaces returns true if all namespaces, false otherwise.
func IsAllNamespaces(ns string) bool {
return ns == NamespaceAll || ns == AllNamespaces
return ns == NamespaceAll || ns == BlankNamespace
}

// IsNamespaced returns true if a specific ns is given.
Expand Down
29 changes: 14 additions & 15 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/derailed/popeye/internal"
"github.com/derailed/popeye/internal/client"
"github.com/derailed/popeye/internal/db/schema"
"github.com/derailed/popeye/types"
"github.com/hashicorp/go-memdb"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -146,12 +147,25 @@ func (db *DB) Find(kind types.GVR, fqn string) (any, error) {
defer txn.Abort()
o, err := txn.First(kind.String(), "id", fqn)
if err != nil || o == nil {
log.Error().Err(err).Msgf("db.find unable to find object: [%s]%s", kind, fqn)
return nil, fmt.Errorf("object not found: %q", fqn)
}

return o, nil
}

func (db *DB) Dump(gvr types.GVR) {
txn, it := db.MustITFor(gvr)
defer txn.Abort()

log.Debug().Msgf("> Dumping %q", gvr)
for o := it.Next(); o != nil; o = it.Next() {
m := o.(schema.MetaAccessor)
log.Debug().Msgf(" o %s/%s", m.GetNamespace(), m.GetName())
}
log.Debug().Msg("< Done")
}

func (db *DB) FindPod(ns string, sel map[string]string) (*v1.Pod, error) {
txn := db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -273,21 +287,6 @@ func (db *DB) FindNSBySel(sel *metav1.LabelSelector) ([]*v1.Namespace, error) {
return nss, nil
}

func (db *DB) DumpNS() error {
txn := db.Txn(false)
defer txn.Abort()
it, err := txn.Get(internal.Glossary[internal.NS].String(), "id")
if err != nil {
return err
}
for o := it.Next(); o != nil; o = it.Next() {
ns, _ := o.(*v1.Namespace)
log.Debug().Msgf("NS %q", ns.Name)
}

return nil
}

func (db *DB) FindNS(ns string) (*v1.Namespace, error) {
txn := db.Txn(false)
defer txn.Abort()
Expand Down
14 changes: 3 additions & 11 deletions internal/db/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,10 @@ func (l *Loader) fetchNodesMetrics(c types.Connection) (*mv1beta1.NodeMetricsLis

func loadResource(ctx context.Context, gvr types.GVR) ([]runtime.Object, error) {
f := mustExtractFactory(ctx)
if strings.Contains(gvr.String(), "metrics") {
if !f.Client().HasMetrics() {
return nil, nil
}
}
if gvr.IsMetricsRes() {
var res dao.Generic
res.Init(f, gvr)
return res.List(ctx)
if strings.Contains(gvr.String(), "metrics") && !f.Client().HasMetrics() {
return nil, nil
}

var res dao.Resource
var res dao.Generic
res.Init(f, gvr)

return res.List(ctx)
Expand Down
Loading

0 comments on commit 2eed9d6

Please sign in to comment.