Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1526 from IRCody/rerestream
Browse files Browse the repository at this point in the history
Plugin initiated workflow
  • Loading branch information
IRCody authored Mar 3, 2017
2 parents 30f6399 + a7d4bc1 commit 2304181
Show file tree
Hide file tree
Showing 27 changed files with 1,556 additions and 174 deletions.
69 changes: 69 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.STREAMGRPC:
c, e := client.NewStreamCollectorGrpcClient(
resp.ListenAddress,
DefaultClientTimeout,
resp.PublicKey,
!resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
Expand Down Expand Up @@ -242,6 +252,12 @@ func (a *availablePlugin) Kill(r string) error {
}).Debug("deleting available plugin package")
os.RemoveAll(filepath.Dir(a.execPath))
}
// If it's a stremaing plugin, we need to signal the scheduler that
// this plugin is being killed.
if c, ok := a.client.(client.PluginStreamCollectorClient); ok {
c.Killed()
}

return a.ePlugin.Kill()
}

Expand Down Expand Up @@ -440,6 +456,59 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return results, nil
}

func (ap *availablePlugins) streamMetrics(
pluginKey string,
metricTypes []core.Metric,
taskID string,
maxCollectDuration time.Duration,
maxMetricsBuffer int64) (chan []core.Metric, chan error, error) {

pool, serr := ap.getPool(pluginKey)
if serr != nil {
return nil, nil, serr
}
if pool == nil {
return nil, nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}

if pool.Strategy() == nil {
return nil, nil, errors.New("Plugin strategy not set")
}

config := metricTypes[0].Config()
cfg := map[string]ctypes.ConfigValue{}
if config != nil {
cfg = config.Table()
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.SelectAP(taskID, cfg)
if serr != nil {
return nil, nil, serr
}

cli, ok := p.(*availablePlugin).client.(client.PluginStreamCollectorClient)
if !ok {
return nil, nil, serror.New(errors.New("Invalid streaming client"))
}

metricChan, errChan, err := cli.StreamMetrics(metricTypes)
if err != nil {
return nil, nil, serror.New(err)
}
err = cli.UpdateCollectDuration(maxCollectDuration)
if err != nil {
return nil, nil, serror.New(err)
}
err = cli.UpdateMetricsBuffer(maxMetricsBuffer)
if err != nil {
return nil, nil, serror.New(err)
}

return metricChan, errChan, nil
}

func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
pool, serr := ap.getPool(key)
Expand Down
48 changes: 48 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,54 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]
return
}

func (p *pluginControl) StreamMetrics(
id string,
allTags map[string]map[string]string,
maxCollectDuration time.Duration,
maxMetricsBuffer int64) (chan []core.Metric, chan error, []error) {
if !p.Started {
return nil, nil, []error{ErrControllerNotStarted}
}
errs := make([]error, 0)
pluginToMetricMap, serrs, err := p.subscriptionGroups.Get(id)
if err != nil {
controlLogger.WithFields(log.Fields{
"_block": "StreamMetrics",
"subscription-group-id": id,
}).Error(err)
errs = append(errs, err)
return nil, nil, errs
}

if serrs != nil {
for _, e := range serrs {
errs = append(errs, e)
}
}
if len(pluginToMetricMap) > 1 {
return nil, nil, append(errs, errors.New("Only 1 streaming collecting plugin per task"))
}
var metricChan chan []core.Metric
var errChan chan error
for pluginKey, pmt := range pluginToMetricMap {
for _, mt := range pmt.metricTypes {
if mt.Config() != nil {
mt.Config().ReverseMergeInPlace(
p.Config.Plugins.getPluginConfigDataNode(
core.CollectorPluginType,
pmt.plugin.Name(),
pmt.plugin.Version()))
}
}
metricChan, errChan, err = p.pluginRunner.AvailablePlugins().streamMetrics(pluginKey, pmt.metricTypes, id, maxCollectDuration, maxMetricsBuffer)
if err != nil {
errs = append(errs, err)
return nil, nil, errs
}
}
return metricChan, errChan, nil
}

// PublishMetrics
func (p *pluginControl) PublishMetrics(metrics []core.Metric, config map[string]ctypes.ConfigValue, taskID, pluginName string, pluginVersion int) []error {
// If control is not started we don't want tasks to be able to
Expand Down
85 changes: 85 additions & 0 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,91 @@ func TestFailedPlugin(t *testing.T) {
})
}

func TestStreamMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
plugin.PingTimeoutLimit = 1
plugin.PingTimeoutDurationDefault = time.Second * 1

// Create controller
config := getTestConfig()
c := New(config)
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)

// Load plugin
_, e := load(c, fixtures.PluginPathStreamRand1)
So(e, ShouldBeNil)
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 3)

cd := cdata.NewNode()
cd.AddItem("testint", ctypes.ConfigValueInt{Value: 3})
cd.AddItem("testfloat", ctypes.ConfigValueFloat{Value: 0.14})
cd.AddItem("teststring", ctypes.ConfigValueStr{Value: "pi"})
m1 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "integer"),
Cfg: cd,
}
m2 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "float"),
Cfg: cd,
}
m3 := fixtures.MockMetricType{
Namespace_: core.NewNamespace("random", "string"),
Cfg: cd,
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

r := []core.RequestedMetric{}
for _, m := range []fixtures.MockMetricType{m1, m2, m3} {
r = append(r, m)
}

cdt := cdata.NewTree()
cdt.Add([]string{"random"}, cd)
taskHit := "hitting"

Convey("create a pool, add subscriptions and start plugins", func() {
serrs := c.SubscribeDeps(taskHit, r, []core.SubscribedPlugin{subscribedPlugin{typeName: "collector", name: "test-rand-streamer", version: 1}}, cdt)
So(serrs, ShouldBeNil)

pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "test-rand-streamer" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)

Convey("stream metrics", func() {

metrics, errors, err := c.StreamMetrics(taskHit, nil, time.Second, 0)
So(err, ShouldBeNil)
select {
case mts := <-metrics:
So(mts, ShouldNotBeNil)
So(len(mts), ShouldEqual, 3)
case errs := <-errors:
t.Fatal(errs)
case <-time.After(time.Second * 10):
t.Fatal("Failed to get a response from stream metrics")
}

ap := c.AvailablePlugins()
So(ap, ShouldNotBeEmpty)
So(pool.Strategy(), ShouldNotBeNil)
So(pool.Strategy().String(), ShouldEqual, plugin.DefaultRouting.String())
c.Stop()
})
})
})
}

func TestCollectMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
Expand Down
3 changes: 3 additions & 0 deletions control/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (

PluginNameMock2 = "snap-plugin-collector-mock2"
PluginPathMock2 = helper.PluginFilePath(PluginNameMock2)

PluginNameStreamRand1 = "snap-plugin-stream-collector-rand1"
PluginPathStreamRand1 = helper.PluginFilePath(PluginNameStreamRand1)
)

// mocks a metric type
Expand Down
13 changes: 13 additions & 0 deletions control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package client

import (
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
Expand All @@ -41,6 +43,17 @@ type PluginCollectorClient interface {
GetMetricTypes(plugin.ConfigType) ([]core.Metric, error)
}

type PluginStreamCollectorClient interface {
PluginClient
StreamMetrics([]core.Metric) (chan []core.Metric, chan error, error)
GetMetricTypes(plugin.ConfigType) ([]core.Metric, error)
UpdateCollectedMetrics([]core.Metric) error
UpdatePluginConfig([]byte) error
UpdateMetricsBuffer(int64) error
UpdateCollectDuration(time.Duration) error
Killed()
}

// PluginProcessorClient A client providing processor specific plugin method calls.
type PluginProcessorClient interface {
PluginClient
Expand Down
Loading

0 comments on commit 2304181

Please sign in to comment.