Skip to content

Commit

Permalink
refactor: git fetch based caching to fetch only src branches
Browse files Browse the repository at this point in the history
Signed-off-by: Nico Braun <rainbowstack@gmail.com>
  • Loading branch information
bluebrown committed Jan 19, 2024
1 parent 31f62f9 commit 6803eb9
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 105 deletions.
5 changes: 3 additions & 2 deletions kioutil/kioutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -123,7 +122,9 @@ func (g GitPackageReader) Read() ([]*yaml.RNode, error) {
return nil, fmt.Errorf("clone repo: %w", err)
}
} else {
slog.InfoContext(g.ctx, "cache hit, skip cloning", "repo", g.srcURI.Repo)
if err := gitE(g.ctx, "-C", g.cachePath, "switch", g.srcURI.Ref); err != nil {
return nil, fmt.Errorf("checkout branch: %w", err)
}
}

pkgPath := filepath.Join(g.cachePath, g.srcURI.Pkg)
Expand Down
14 changes: 10 additions & 4 deletions krm/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"text/template"

"github.com/bluebrown/kobold/kioutil"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/kustomize/kyaml/kio"
)

type Pipeline struct {
RepoURI string `json:"repoUri,omitempty"`
SrcBranch string `json:"sourceBranch,omitempty"`
DstBranch string `json:"destinationBranch,omitempty"`
CachePath string `json:"cachePath,omitempty"`
RepoURI string `json:"repoUri,omitempty"`
SrcBranch string `json:"sourceBranch,omitempty"`
DstBranch string `json:"destinationBranch,omitempty"`
CachePath string `json:"cachePath,omitempty"`
PushCounter *prometheus.CounterVec
}

func (opts Pipeline) Run(ctx context.Context, imageRefs []string) (msg string, changes, warnings []string, err error) {
Expand Down Expand Up @@ -54,6 +56,10 @@ func (opts Pipeline) Run(ctx context.Context, imageRefs []string) (msg string, c
return "", nil, nil, err
}

if len(kf.Changes) > 0 && opts.PushCounter != nil {
opts.PushCounter.With(prometheus.Labels{"repo": opts.RepoURI}).Inc()
}

return grw.CommitMessage(), kf.Changes, kf.Warnings, nil
}

Expand Down
115 changes: 115 additions & 0 deletions task/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package task

import (
"context"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"

"github.com/bluebrown/kobold/store"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
)

type repoCache struct {
repos map[string][]string
cache string
tmp string
}

func (cache *repoCache) fill(ctx context.Context, gg []store.TaskGroup, lim int) error {

if err := os.MkdirAll(cache.cache, 0755); err != nil {
return err
}

if err := os.MkdirAll(cache.tmp, 0755); err != nil {
return err
}

for _, g := range gg {
cache.repos[g.RepoUri.Repo] = append(cache.repos[g.RepoUri.Repo], g.RepoUri.Ref)
}

g := errgroup.Group{}
g.SetLimit(lim)

for uri, path := range cache.repos {
uri, refs := uri, path
g.Go(func() error {
return cache.ensure(ctx, uri, refs)
})
}

err := g.Wait()

return err
}

func (cache *repoCache) ensure(ctx context.Context, uri string, refs []string) error {
if err := ctx.Err(); err != nil {
return err
}

path := filepath.Join(cache.cache, uri)

if _, err := os.Stat(path); err != nil {
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("remove %q: %w", path, err)
}

slog.InfoContext(ctx, "cloning repo", "repo", uri)

if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("mkdir %q: %w", path, err)
}

if err := run(ctx, "git", "init", path); err != nil {
return fmt.Errorf("git init %q: %w", uri, err)
}

if err := run(ctx, "git", "-C", path, "remote", "add", "origin", uri); err != nil {
return fmt.Errorf("git remote add %q: %w", uri, err)
}
} else {
slog.InfoContext(ctx, "updating repo", "repo", uri)
}

args := []string{"git", "-C", path, "remote", "set-branches", "origin"}
args = append(args, refs...)
if err := run(ctx, args...); err != nil {
return fmt.Errorf("git remote set-branches %q: %w", uri, err)
}

if err := run(ctx, "git", "-C", path, "fetch", "--depth", "1"); err != nil {
return fmt.Errorf("git fetch %q: %w", uri, err)
}

metricGitFetch.With(prometheus.Labels{"repo": uri}).Inc()

return nil
}

func (cache *repoCache) get(repo string) string {
src := filepath.Join(cache.cache, repo)
d := filepath.Join(cache.tmp, uuid.NewString())
if err := run(context.Background(), "cp", "-r", src, d); err != nil {
slog.Warn("failed to copy repo", "repo", repo, "error", err)
return ""
}
return d
}
func (cache *repoCache) cleanTmp() error {
return os.RemoveAll(cache.tmp)
}

func run(ctx context.Context, args ...string) error {
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
if b, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("%w: %s", err, string(b))
}
return nil
}
9 changes: 5 additions & 4 deletions task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func KoboldHandler(ctx context.Context, cache string, g store.TaskGroup, runner
}

pipline := krm.Pipeline{
RepoURI: g.RepoUri.String(),
SrcBranch: g.RepoUri.Ref,
DstBranch: g.DestBranch.String,
CachePath: cache,
RepoURI: g.RepoUri.String(),
SrcBranch: g.RepoUri.Ref,
DstBranch: g.DestBranch.String,
CachePath: cache,
PushCounter: metricGitPush,
}

if err := ctx.Err(); err != nil {
Expand Down
20 changes: 14 additions & 6 deletions task/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@ import (
)

var (
metricRun = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "kobold_run",
metricRunsActive = promauto.NewGauge(prometheus.GaugeOpts{
Name: "kobold_run_active",
Help: "number of active runs",
})
metricRunStatus = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_run_status",
Help: "run status (task groups)",
}, []string{"status", "repo"})
metricMsgRecv = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_recv",
Name: "kobold_msg_recv",
Help: "number of messages received",
}, []string{"channel", "rejected"})
metricClone = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_clone",
Help: "number of clone",
metricGitFetch = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_git_fetch",
Help: "number of git fetches",
}, []string{"repo"})
metricGitPush = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_git_push",
Help: "number of git pushes",
}, []string{"repo"})
)
92 changes: 7 additions & 85 deletions task/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -76,8 +75,9 @@ func (p *Pool) Dispatch() error {

// each dispatch gets its own cache, to avoid collisions
cache := &repoCache{
repos: make(map[string]string),
base: filepath.Join(os.TempDir(), "kobold-cache-"+uuid.NewString()),
repos: make(map[string][]string),
cache: filepath.Join(os.TempDir(), "kobold-cache", "repos"),
tmp: filepath.Join(os.TempDir(), "kobold-cache", uuid.NewString()),
}

// find all unique repos and clone them beforehand in order to avoid
Expand All @@ -92,7 +92,7 @@ func (p *Pool) Dispatch() error {

go func() {
wg.Wait()
cache.purge()
cache.cleanTmp()
}()

for _, g := range taskGroups {
Expand Down Expand Up @@ -129,9 +129,8 @@ func (p *Pool) Dispatch() error {
}

slog.InfoContext(p.ctx, "task group dispatched", "fingerprint", g.Fingerprint)
l := prometheus.Labels{"status": string(StatusRunning), "repo": g.RepoUri.Repo}
metricRun.With(l).Inc()
defer metricRun.With(l).Add(-1)
metricRunsActive.Inc()
defer metricRunsActive.Add(-1)

var (
status = StatusSuccess
Expand Down Expand Up @@ -159,7 +158,7 @@ func (p *Pool) Dispatch() error {
})

slog.InfoContext(p.ctx, "task group done", "fingerprint", g.Fingerprint, "status", status)
metricRun.With(prometheus.Labels{"status": string(status), "repo": g.RepoUri.Repo}).Add(1)
metricRunStatus.With(prometheus.Labels{"status": string(status), "repo": g.RepoUri.Repo}).Add(1)

if err != nil {
return err
Expand Down Expand Up @@ -252,80 +251,3 @@ func (p *Pool) QueueReader(ctx context.Context, channel string, r io.Reader) err
}
return scanner.Err()
}

type repoCache struct {
repos map[string]string
base string
path string
}

func (cache *repoCache) fill(ctx context.Context, gg []store.TaskGroup, lim int) error {

// add a random dir inside the cache path to avoid collisions
cache.path = cache.base

if err := os.MkdirAll(cache.path, 0755); err != nil {
return err
}

for _, g := range gg {
cache.repos[g.RepoUri.Repo] = filepath.Join(cache.path, g.RepoUri.Repo)
}

g := errgroup.Group{}
g.SetLimit(lim)

for uri, path := range cache.repos {
uri, path := uri, path
g.Go(func() error {
return cache.ensure(ctx, uri, path)
})
}

err := g.Wait()

return err
}

func (cache *repoCache) ensure(ctx context.Context, uri, path string) error {
if err := ctx.Err(); err != nil {
return err
}

// since fetching leads to merge conflicts, we just delete the repo and
// clone it again with depth 1
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("remove repo %q: %w", uri, err)
}

cmd := exec.CommandContext(ctx, "git", "clone", "--depth", "1", uri, path)
if b, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("git clone %q: %w: %s", uri, err, string(b))
}

metricClone.With(prometheus.Labels{"repo": uri}).Inc()
slog.InfoContext(ctx, "repo cloned", "repo", uri, "cache", path)

return nil
}

func (cache *repoCache) get(repo string) string {
path := cache.repos[repo]
if path == "" {
return ""
}

d := filepath.Join(cache.path, uuid.NewString())

cmd := exec.Command("cp", "-r", path, d)
if b, err := cmd.CombinedOutput(); err != nil {
slog.Warn("failed to copy repo", "error", err, "output", string(b))
return ""
}

return d
}

func (cache *repoCache) purge() error {
return os.RemoveAll(cache.path)
}
2 changes: 1 addition & 1 deletion testdata/events.http
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Host: localhost:9000

{
"push_data": {
"tag": "v1.1.3"
"tag": "v1.6.3"
},
"repository": {
"repo_name": "library/busybox"
Expand Down
6 changes: 3 additions & 3 deletions testdata/kobold.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ decoder = "builtin.dockerhub@v1"
[[pipeline]]
name = "github"
channels = ["gh", "dockerhub"]
repo_uri = "git@github.com:bluebrown/foobar.git@main/manifests"
dest_branch = "kobold"
post_hook = "builtin.github-pr@v1"
repo_uri = "git@github.com:bluebrown/foobar.git@test/manifests"
# dest_branch = "kobold"
# post_hook = "builtin.github-pr@v1"

[[channel]]
name = "ado"
Expand Down

0 comments on commit 6803eb9

Please sign in to comment.