Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,33 +197,37 @@ public CompletableResultCode export(Collection<MetricData> metrics) {
for (MetricData metric : metrics) {
switch (metric.getType()) {
case LONG_SUM:
String longSumMetricsName = PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), metric.getLongSumData().isMonotonic(), false);
metric.getLongSumData().getPoints().forEach(point ->
lineList.add(serializeCounter(
PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), metric.getLongSumData().isMonotonic(), false),
lineList.add(serializeCounter(longSumMetricsName,
point.getValue(), point.getAttributes(), point.getEpochNanos())));
break;
case DOUBLE_SUM:
String doubleSumMetricsName = PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), metric.getDoubleSumData().isMonotonic(), false);
metric.getDoubleSumData().getPoints().forEach(point ->
lineList.add(serializeCounter(
PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), metric.getDoubleSumData().isMonotonic(), false),
doubleSumMetricsName,
point.getValue(), point.getAttributes(), point.getEpochNanos())));
break;
case LONG_GAUGE:
String longGaugeMetricsName = PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, true);
metric.getLongGaugeData().getPoints().forEach(point ->
lineList.add(serializeGauge(
PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, true),
longGaugeMetricsName,
point.getValue(), point.getAttributes(), point.getEpochNanos())));
break;
case DOUBLE_GAUGE:
String doubleGaugeMetricsName = PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, true);
metric.getDoubleGaugeData().getPoints().forEach(point ->
lineList.add(serializeGauge(
PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, true),
doubleGaugeMetricsName,
point.getValue(), point.getAttributes(), point.getEpochNanos())));
break;
case HISTOGRAM:
String histogramMetricsName = PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, false);
metric.getHistogramData().getPoints().forEach(point ->
lineList.add(serializeHistogram(
PrometheusUtils.mapMetricsName(metric.getName(), metric.getUnit(), false, false),
histogramMetricsName,
point)));
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private[group] case object Empty extends GroupState {


private object GroupMetadata extends Logging {
// AutoMQ for Kafka inject start
private val CommitOffset: String = "CommitOffset"
// AutoMQ for Kafka inject end

def loadGroup(groupId: String,
initialState: GroupState,
Expand Down Expand Up @@ -226,6 +228,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

var newMemberAdded: Boolean = false

// AutoMQ for Kafka inject start
private val metricsGroup = new KafkaMetricsGroup(GroupMetadata.getClass)
private def recreateOffsetMetric(tp: TopicPartition): Unit = {
removeOffsetMetric(tp)
Expand All @@ -247,7 +250,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
metricsGroup.removeMetric(GroupMetadata.CommitOffset,
Map("group" -> groupId, "topic" -> tp.topic, "partition" -> tp.partition.toString).asJava)
}

// AutoMQ for Kafka inject end
def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)

def is(groupState: GroupState): Boolean = state == groupState
Expand Down Expand Up @@ -452,8 +455,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
assertValidTransition(groupState)
state = groupState
currentStateTimestamp = Some(time.milliseconds())
// AutoMQ for Kafka inject start
if (groupState == Dead)
offsets.foreach(offset => removeOffsetMetric(offset._1))
// AutoMQ for Kafka inject end
}

def selectProtocol: String = {
Expand Down Expand Up @@ -643,10 +648,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState

def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
// AutoMQ for Kafka inject start
offsets.forKeyValue { (topicPartition, _) =>
if (!this.offsets.contains(topicPartition))
recreateOffsetMetric(topicPartition)
}
// AutoMQ for Kafka inject end
this.offsets ++= offsets
this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
}
Expand All @@ -657,8 +664,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
"in the log.")
// AutoMQ for Kafka inject start
if (!offsets.contains(topicPartition))
recreateOffsetMetric(topicPartition)
// AutoMQ for Kafka inject end
if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
offsets.put(topicPartition, offsetWithCommitRecordMetadata)
}
Expand Down Expand Up @@ -781,9 +790,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
pendingOffsets.remove(topicPartition)
}
val removedOffset = offsets.remove(topicPartition)
// AutoMQ for Kafka inject start
if (removedOffset.isDefined) {
removeOffsetMetric(topicPartition)
}
// AutoMQ for Kafka inject end
removedOffset.map(topicPartition -> _.offsetAndMetadata)
}.toMap
}
Expand Down
Loading