Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
IMPROVEMENTS:
* agent: Add `BlockQueryWaitTime` config option for Nomad API connectivity [[GH-755](https://github.com/hashicorp/nomad-autoscaler/pull/755)]

BUG FIXES:
* plugin/apm/nomad: Set correct namespace when querying group metrics [[GH-808](https://github.com/hashicorp/nomad-autoscaler/pull/808)]

## 0.4.0 (December 20, 2023)

FEATURES:
Expand Down
49 changes: 36 additions & 13 deletions plugins/builtin/apm/nomad/plugin/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// group.
type taskGroupQuery struct {
metric string
namespace string
job string
group string
operation string
Expand Down Expand Up @@ -49,9 +50,12 @@ func (a *APMPlugin) queryTaskGroup(q string) (sdk.TimestampedMetrics, error) {
// getTaskGroupResourceUsage iterates the allocations within a job and
// identifies those which meet the criteria for being part of the calculation.
func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64, error) {
q := &api.QueryOptions{
Namespace: query.namespace,
}

// Grab the list of allocations assigned to the job in question.
allocs, _, err := a.client.Jobs().Allocations(query.job, false, nil)
allocs, _, err := a.client.Jobs().Allocations(query.job, false, q)
if err != nil {
return nil, fmt.Errorf("failed to get alloc listing for job: %v", err)
}
Expand All @@ -77,7 +81,7 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,
// out of amount allocated for taskgroups, the calculation must be done here.
// The total CPU allocated to the task group is retrieved once here since it
// does not vary between allocations.
allocatedCPU, err := a.getAllocatedCPUForTaskGroup(query.job, query.group)
allocatedCPU, err := a.getAllocatedCPUForTaskGroup(query.namespace, query.job, query.group)
if err != nil {
return nil, fmt.Errorf("failed to get total allocated CPU for taskgroup: %v", err)
}
Expand All @@ -94,7 +98,7 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,

// Similarly to `queryMetricCPUAllocated` we must calculate the allocated
// memory since it's not provided as a metric.
allocatedMem, err := a.getAllocatedMemForTaskGroup(query.job, query.group)
allocatedMem, err := a.getAllocatedMemForTaskGroup(query.namespace, query.job, query.group)
if err != nil {
return nil, fmt.Errorf("failed to get total allocated memory for taskgroup: %v", err)
}
Expand Down Expand Up @@ -124,7 +128,7 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,
// When calling Stats an entire Allocation object is needed, but only
// the ID is used within the call. Further details:
// https://github.com/hashicorp/nomad/issues/7955
allocStats, err := a.client.Allocations().Stats(&api.Allocation{ID: alloc.ID}, nil)
allocStats, err := a.client.Allocations().Stats(&api.Allocation{ID: alloc.ID}, q)
if err != nil {
return nil, fmt.Errorf("failed to get alloc stats: %v", err)
}
Expand All @@ -143,8 +147,8 @@ func (a *APMPlugin) getTaskGroupResourceUsage(query *taskGroupQuery) ([]float64,
}

// getAllocatedCPUForTaskGroup calculates the total allocated CPU in MHz for a taskgroup
func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
func (a *APMPlugin) getAllocatedCPUForTaskGroup(ns, job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(ns, job, taskgroup)
if err != nil {
return -1, err
}
Expand All @@ -160,8 +164,8 @@ func (a *APMPlugin) getAllocatedCPUForTaskGroup(job, taskgroup string) (int, err
}

// getAllocatedMemForTaskGroup calculates the total allocated memory in MiB for a taskgroup
func (a *APMPlugin) getAllocatedMemForTaskGroup(job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(job, taskgroup)
func (a *APMPlugin) getAllocatedMemForTaskGroup(ns, job, taskgroup string) (int, error) {
taskGroupConfig, err := a.getTaskGroup(ns, job, taskgroup)
if err != nil {
return -1, err
}
Expand All @@ -177,8 +181,10 @@ func (a *APMPlugin) getAllocatedMemForTaskGroup(job, taskgroup string) (int, err
}

// getTaskGroup returns a task group configuration from a job.
func (a *APMPlugin) getTaskGroup(job, taskgroup string) (*api.TaskGroup, error) {
jobInfo, _, err := a.client.Jobs().Info(job, nil)
func (a *APMPlugin) getTaskGroup(ns, job, taskgroup string) (*api.TaskGroup, error) {
jobInfo, _, err := a.client.Jobs().Info(job, &api.QueryOptions{
Namespace: ns,
})
if err != nil {
return nil, fmt.Errorf("failed to get info for job: %v", err)
}
Expand Down Expand Up @@ -237,12 +243,29 @@ func calculateTaskGroupResult(op string, metrics []float64) sdk.TimestampedMetri
func parseTaskGroupQuery(q string) (*taskGroupQuery, error) {
mainParts := strings.SplitN(q, "/", 3)
if len(mainParts) != 3 {
return nil, fmt.Errorf("expected <query>/<group>/<job>, received %s", q)
return nil, fmt.Errorf("expected <query>/<group>/<job>@<namespace>, received %s", q)
}

nsJob := mainParts[2]
nsJobSepIdx := strings.LastIndex(nsJob, "@")
if nsJobSepIdx == -1 {
return nil, fmt.Errorf("missing namespace from query %s", q)
}

ns := nsJob[nsJobSepIdx+1:]
if len(ns) == 0 {
return nil, fmt.Errorf("missing namespace from query %s", q)
}

job := nsJob[:nsJobSepIdx]
if len(job) == 0 {
return nil, fmt.Errorf("missing job from query %s", q)
}

query := &taskGroupQuery{
group: mainParts[1],
job: mainParts[2],
group: mainParts[1],
job: job,
namespace: ns,
}

opMetricParts := strings.SplitN(mainParts[0], "_", 3)
Expand Down
33 changes: 28 additions & 5 deletions plugins/builtin/apm/nomad/plugin/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func Test_parseTaskGroupQuery(t *testing.T) {
}{
{
name: "avg_cpu",
input: "taskgroup_avg_cpu/group/job",
input: "taskgroup_avg_cpu/group/job@default",
expected: &taskGroupQuery{
metric: "cpu",
namespace: "default",
job: "job",
group: "group",
operation: "avg",
Expand All @@ -70,9 +71,10 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
{
name: "avg_cpu-allocated",
input: "taskgroup_avg_cpu-allocated/group/job",
input: "taskgroup_avg_cpu-allocated/group/job@ns",
expected: &taskGroupQuery{
metric: "cpu-allocated",
namespace: "ns",
job: "job",
group: "group",
operation: "avg",
Expand All @@ -81,9 +83,10 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
{
name: "avg_memory",
input: "taskgroup_avg_memory/group/job",
input: "taskgroup_avg_memory/group/job@dev",
expected: &taskGroupQuery{
metric: "memory",
namespace: "dev",
job: "job",
group: "group",
operation: "avg",
Expand All @@ -92,9 +95,10 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
{
name: "avg_memory-allocated",
input: "taskgroup_avg_memory-allocated/group/job",
input: "taskgroup_avg_memory-allocated/group/job@dev",
expected: &taskGroupQuery{
metric: "memory-allocated",
namespace: "dev",
job: "job",
group: "group",
operation: "avg",
Expand All @@ -103,15 +107,28 @@ func Test_parseTaskGroupQuery(t *testing.T) {
},
{
name: "job with fwd slashes",
input: "taskgroup_avg_cpu/group/my/super/job//",
input: "taskgroup_avg_cpu/group/my/super/job//@dev",
expected: &taskGroupQuery{
metric: "cpu",
namespace: "dev",
job: "my/super/job//",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "job with at signs",
input: "taskgroup_avg_cpu/group/job@job@@dev",
expected: &taskGroupQuery{
metric: "cpu",
namespace: "dev",
job: "job@job@",
group: "group",
operation: "avg",
},
expectError: false,
},
{
name: "empty query",
input: "",
Expand All @@ -124,6 +141,12 @@ func Test_parseTaskGroupQuery(t *testing.T) {
expected: nil,
expectError: true,
},
{
name: "missing namspace",
input: "avg_cpu/group/job",
expected: nil,
expectError: true,
},
{
name: "missing job",
input: "avg_cpu/group",
Expand Down
36 changes: 21 additions & 15 deletions policy/nomad/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ func TestSource_canonicalizePolicy(t *testing.T) {
input: &sdk.ScalingPolicy{
Target: &sdk.ScalingPolicyTarget{
Config: map[string]string{
"Job": "job",
"Group": "group",
"Namespace": "dev",
"Job": "job",
"Group": "group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
Expand All @@ -126,14 +127,15 @@ func TestSource_canonicalizePolicy(t *testing.T) {
Target: &sdk.ScalingPolicyTarget{
Name: plugins.InternalTargetNomad,
Config: map[string]string{
"Job": "job",
"Group": "group",
"Namespace": "dev",
"Job": "job",
"Group": "group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
{
Source: plugins.InternalAPMNomad,
Query: "taskgroup_avg_cpu/group/job",
Query: "taskgroup_avg_cpu/group/job@dev",
QueryWindow: policy.DefaultQueryWindow,
Strategy: &sdk.ScalingPolicyStrategy{
Config: map[string]string{},
Expand All @@ -147,8 +149,9 @@ func TestSource_canonicalizePolicy(t *testing.T) {
input: &sdk.ScalingPolicy{
Target: &sdk.ScalingPolicyTarget{
Config: map[string]string{
"Job": "job",
"Group": "group",
"Namespace": "dev",
"Job": "job",
"Group": "group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
Expand All @@ -164,14 +167,15 @@ func TestSource_canonicalizePolicy(t *testing.T) {
Target: &sdk.ScalingPolicyTarget{
Name: plugins.InternalTargetNomad,
Config: map[string]string{
"Job": "job",
"Group": "group",
"Namespace": "dev",
"Job": "job",
"Group": "group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
{
Source: plugins.InternalAPMNomad,
Query: "taskgroup_avg_cpu/group/job",
Query: "taskgroup_avg_cpu/group/job@dev",
QueryWindow: policy.DefaultQueryWindow,
Strategy: &sdk.ScalingPolicyStrategy{
Config: map[string]string{},
Expand All @@ -185,8 +189,9 @@ func TestSource_canonicalizePolicy(t *testing.T) {
input: &sdk.ScalingPolicy{
Target: &sdk.ScalingPolicyTarget{
Config: map[string]string{
"Job": "my_job",
"Group": "my_group",
"Namespace": "my_ns",
"Job": "my_job",
"Group": "my_group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
Expand All @@ -201,14 +206,15 @@ func TestSource_canonicalizePolicy(t *testing.T) {
Target: &sdk.ScalingPolicyTarget{
Name: plugins.InternalTargetNomad,
Config: map[string]string{
"Job": "my_job",
"Group": "my_group",
"Namespace": "my_ns",
"Job": "my_job",
"Group": "my_group",
},
},
Checks: []*sdk.ScalingPolicyCheck{
{
Source: plugins.InternalAPMNomad,
Query: "taskgroup_avg_cpu/my_group/my_job",
Query: "taskgroup_avg_cpu/my_group/my_job@my_ns",
QueryWindow: policy.DefaultQueryWindow,
Strategy: &sdk.ScalingPolicyStrategy{
Config: map[string]string{},
Expand Down
10 changes: 8 additions & 2 deletions policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,14 @@ func (pr *Processor) CanonicalizeAPMQuery(c *sdk.ScalingPolicyCheck, t *sdk.Scal
// If the target is a Nomad job task group, format the query in the
// expected manner.
if t.IsJobTaskGroupTarget() {
c.Query = fmt.Sprintf("%s_%s/%s/%s",
nomadAPM.QueryTypeTaskGroup, c.Query, t.Config[sdk.TargetConfigKeyTaskGroup], t.Config[sdk.TargetConfigKeyJob])
c.Query = fmt.Sprintf(
"%s_%s/%s/%s@%s",
nomadAPM.QueryTypeTaskGroup,
c.Query,
t.Config[sdk.TargetConfigKeyTaskGroup],
t.Config[sdk.TargetConfigKeyJob],
t.Config[sdk.TargetConfigKeyNamespace],
)
Comment on lines -111 to +118
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal query format, so I wouldn't consider it a breaking change.

return
}

Expand Down
8 changes: 6 additions & 2 deletions policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,16 @@ func TestProcessor_CanonicalizeAPMQuery(t *testing.T) {
},
inputAPMNames: []string{"nomad-apm"},
inputTarget: &sdk.ScalingPolicyTarget{
Config: map[string]string{"Job": "example", "Group": "cache"},
Config: map[string]string{
"Namespace": "dev",
"Job": "example",
"Group": "cache",
},
},
expectedOutputCheck: &sdk.ScalingPolicyCheck{
Name: "random-check",
Source: "nomad-apm",
Query: "taskgroup_avg_cpu/cache/example",
Query: "taskgroup_avg_cpu/cache/example@dev",
},
name: "correctly formatted taskgroup target short query",
},
Expand Down
4 changes: 4 additions & 0 deletions sdk/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const (
// cooldown where out-of-band scaling activities have been triggered.
TargetStatusMetaKeyLastEvent = "nomad_autoscaler.last_event"

// TargetConfigKeyNamespace is the config key used within horizontal app
// scaling to identify the Nomad namespace targeted for autoscaling.
TargetConfigKeyNamespace = "Namespace"

// TargetConfigKeyJob is the config key used within horizontal app scaling
// to identify the Nomad job targeted for autoscaling.
TargetConfigKeyJob = "Job"
Expand Down