Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(operator/hostname): enable watcher on provider hostname #245

Merged
merged 10 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: Integration tests

# yamllint disable-line rule:truthy
on:
workflow_call:

Expand All @@ -15,8 +16,12 @@ jobs:
crd-e2e:
env:
KIND_NAME: kube
runs-on: ubuntu-latest
runs-on: core-e2e
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- name: Setup GOPATH
run: echo "GOPATH=$GITHUB_WORKSPACE/go" >> $GITHUB_ENV
- name: Ensure GOPATH dirs
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ defaults:
run:
shell: bash

# yamllint disable-line rule:truthy
on:
workflow_dispatch:

jobs:
publish:
runs-on: ubuntu-latest
runs-on: core-e2e
env:
DOCKER_CLI_EXPERIMENTAL: "enabled"
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- uses: actions/checkout@v4
with:
fetch-depth: 0
Expand All @@ -26,6 +31,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand Down
23 changes: 22 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defaults:
run:
shell: bash

# yamllint disable-line rule:truthy
on:
pull_request:
push:
Expand All @@ -29,6 +30,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand All @@ -52,6 +55,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- run: make test-full

lint:
Expand All @@ -69,14 +74,20 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Run linter
run: make lint

release-dry-run:
runs-on: ubuntu-latest
runs-on: core-e2e
env:
DOCKER_CLI_EXPERIMENTAL: "enabled"
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- uses: actions/checkout@v4
with:
fetch-depth: 0
Expand All @@ -89,6 +100,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand Down Expand Up @@ -116,8 +129,12 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: .cache/tests/coverage.txt

Expand All @@ -136,6 +153,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Run codegen
run: make codegen
- name: Ensure no files changed/added/removed
Expand Down Expand Up @@ -168,6 +187,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- run: make shellcheck

integration-tests:
Expand Down
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ issues:
exclude:
- comment on exported (method|function|type|const|var)
exclude-use-default: true

# Skip generated k8s code
run:
skip-dirs:
- pkg/client
- ".*/mocks"
Expand All @@ -18,7 +16,7 @@ linters:
enable:
- unused
- misspell
- goerr113
- err113
- gofmt
- gocritic
- goconst
Expand Down
2 changes: 2 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
---
version: 2
project_name: provider
env:
- GO111MODULE=on
Expand Down
2 changes: 1 addition & 1 deletion bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func makeMocks(s *orderTestScaffold) {

s.cluster = &clustermocks.Cluster{}
s.reserveCallNotify = make(chan int, 1)
s.cluster.On("Reserve", s.orderID, &(groupResult.Group)).Run(func(args mock.Arguments) {
s.cluster.On("Reserve", s.orderID, &(groupResult.Group)).Run(func(_ mock.Arguments) {
s.reserveCallNotify <- 0
time.Sleep(time.Second) // add a delay before returning response, to test race conditions
}).Return(mockReservation, nil)
Expand Down
2 changes: 1 addition & 1 deletion bidengine/pricing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func Test_ScriptPricingFromScript(t *testing.T) {

expectedPrice := fmt.Sprintf("%.*f", DefaultPricePrecision, 67843137.254901960)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := io.WriteString(w, mockAPIResponse)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,11 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {

ipAddrStatusCalled := make(chan struct{}, 2)
// First call indicates no data
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(args mock.Arguments) {
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(_ mock.Arguments) {
ipAddrStatusCalled <- struct{}{}
}).Return([]cip.LeaseIPStatus{}, nil).Once()
// Second call indicates the IP is there and can be confirmed
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(args mock.Arguments) {
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(_ mock.Arguments) {
ipAddrStatusCalled <- struct{}{}
}).Return([]cip.LeaseIPStatus{
{
Expand Down
6 changes: 6 additions & 0 deletions cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ const (
AkashLeaseOSeqLabelName = "akash.network/lease.id.oseq"
AkashLeaseProviderLabelName = "akash.network/lease.id.provider"
AkashLeaseManifestVersion = "akash.network/manifest.version"
AkashLeaseUpdatedAt = "akash.network/lease.updated_at"
)

const (
ValTrue = "true"
ValFalse = "false"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cluster/kube/builder/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (b *Workload) container() corev1.Container {

// fixme: ram is never expected to be nil
if mem := service.Resources.Memory; mem != nil {
requestedRam := sdlutil.ComputeCommittedResources(b.settings.MemoryCommitLevel, mem.Quantity)
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRam.Value()), resource.DecimalSI).DeepCopy()
requestedRAM := sdlutil.ComputeCommittedResources(b.settings.MemoryCommitLevel, mem.Quantity)
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRAM.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Limits[corev1.ResourceMemory] = resource.NewQuantity(int64(mem.Quantity.Value()+requestedMem), resource.DecimalSI).DeepCopy()
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/kube/client_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func withExecTestScaffold(t *testing.T, changePod func(pod *corev1.Pod) error, t
UserAgent: "client_exec_test.go",
Username: "theusername",
Password: "thepassword",
Proxy: func(req *http.Request) (*url.URL, error) {
Proxy: func(_ *http.Request) (*url.URL, error) {
return nil, errNoSPDYInTest
},
},
Expand Down
68 changes: 44 additions & 24 deletions cluster/kube/client_hostname_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"fmt"
"strings"
"time"

kubeErrors "k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -39,29 +40,39 @@ func (c *client) DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host s
labels := map[string]string{
builder.AkashManagedLabelName: "true",
}
builder.AppendLeaseLabels(lID, labels)

foundEntry, err := c.ac.AkashV2beta2().ProviderHosts(c.ns).Get(ctx, host, metav1.GetOptions{})
exists := true
var resourceVersion string
builder.AppendLeaseLabels(lID, labels)

update := true
obj, err := c.ac.AkashV2beta2().ProviderHosts(c.ns).Get(ctx, host, metav1.GetOptions{})
if err != nil {
if kubeErrors.IsNotFound(err) {
exists = false
if kerrors.IsNotFound(err) {
update = false
} else {
return err
}
} else {
resourceVersion = foundEntry.ObjectMeta.ResourceVersion
}

obj := crd.ProviderHost{
ObjectMeta: metav1.ObjectMeta{
Name: host, // Name is always the hostname, to prevent duplicates
Labels: labels,
ResourceVersion: resourceVersion,
},
Spec: crd.ProviderHostSpec{
if !update {
obj = &crd.ProviderHost{
ObjectMeta: metav1.ObjectMeta{
Name: host, // Name is always the hostname, to prevent duplicates
Labels: labels,
},
Spec: crd.ProviderHostSpec{
Hostname: host,
Owner: lID.GetOwner(),
Dseq: lID.GetDSeq(),
Oseq: lID.GetOSeq(),
Gseq: lID.GetGSeq(),
Provider: lID.GetProvider(),
ServiceName: serviceName,
ExternalPort: externalPort,
},
}
} else {
obj.ObjectMeta.Labels = labels
obj.Spec = crd.ProviderHostSpec{
Hostname: host,
Owner: lID.GetOwner(),
Dseq: lID.GetDSeq(),
Expand All @@ -70,18 +81,27 @@ func (c *client) DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host s
Provider: lID.GetProvider(),
ServiceName: serviceName,
ExternalPort: externalPort,
},
}
}

c.log.Info("declaring hostname", "lease", lID, "service-name", serviceName, "external-port", externalPort, "host", host)
// Create or update the entry
if exists {
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Update(ctx, &obj, metav1.UpdateOptions{})

if obj.Annotations == nil {
obj.Annotations = make(map[string]string)
}

obj.Annotations[builder.AkashLeaseUpdatedAt] = time.Now().UTC().Format(time.RFC3339)

if update {
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Update(ctx, obj, metav1.UpdateOptions{})
} else {
obj.ResourceVersion = ""
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Create(ctx, &obj, metav1.CreateOptions{})
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Create(ctx, obj, metav1.CreateOptions{})
}
return err

if err != nil {
return err
}

return nil
}

func (c *client) PurgeDeclaredHostname(ctx context.Context, lID mtypes.LeaseID, hostname string) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/kube/operators/clients/hostname/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import (
"testing"
)

func TestHostnameOperatorClient(t *testing.T) {
func TestHostnameOperatorClient(_ *testing.T) {
// TODO: tests here
}
4 changes: 2 additions & 2 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<
}

if len(pods.Items) == 0 {
return nil, fmt.Errorf("no inventory pods available") // nolint: goerr113
return nil, fmt.Errorf("no inventory pods available") // nolint: err113
}

var pod *corev1.Pod
Expand All @@ -274,7 +274,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<
}

if pod == nil {
return nil, fmt.Errorf("no inventory pods available") // nolint: goerr113
return nil, fmt.Errorf("no inventory pods available") // nolint: err113
}

loop:
Expand Down
4 changes: 2 additions & 2 deletions cluster/kube/operators/clients/inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func makeInventoryScaffold(t *testing.T) *inventoryScaffold {
}

// QueryCluster does not need to be implemented as provider only uses stream
func (gm *testInventoryServer) QueryCluster(ctx context.Context, _ *emptypb.Empty) (*inventoryV1.Cluster, error) {
return nil, errors.New("unimplemented") // nolint: goerr113
func (gm *testInventoryServer) QueryCluster(_ context.Context, _ *emptypb.Empty) (*inventoryV1.Cluster, error) {
return nil, errors.New("unimplemented") // nolint: err113
}

func (gm *testInventoryServer) StreamCluster(_ *emptypb.Empty, stream inventoryV1.ClusterRPC_StreamClusterServer) error {
Expand Down
6 changes: 3 additions & 3 deletions cluster/kube/operators/clients/ip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ func fakeIPOperatorHandler() *fakeOperator {
fake.ipUsageResponse.Store([]byte{})

fake.mux.HandleFunc("/health",
func(rw http.ResponseWriter, req *http.Request) {
func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.healthStatus)
rw.WriteHeader(int(status))
})

fake.mux.HandleFunc("/ip-lease-status/", func(rw http.ResponseWriter, req *http.Request) {
fake.mux.HandleFunc("/ip-lease-status/", func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.ipLeaseStatusStatus)
rw.WriteHeader(int(status))

body := fake.ipLeaseStatusResponse.Load().([]byte)
_, _ = io.Copy(rw, bytes.NewReader(body))
})

fake.mux.HandleFunc("/usage", func(rw http.ResponseWriter, req *http.Request) {
fake.mux.HandleFunc("/usage", func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.ipUsageStatus)
rw.WriteHeader(int(status))

Expand Down
Loading
Loading