Skip to content

Commit 966a471

Browse files
shirshankachakru-r
authored andcommitted
telemetry: add support for unified mixpanel + kafka tracking in GMS (#5514)
1 parent 28d58e8 commit 966a471

File tree

27 files changed

+1745
-156
lines changed

27 files changed

+1745
-156
lines changed

.github/workflows/docker-unified.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,10 @@ jobs:
365365
fail-fast: false
366366
matrix: ${{ fromJson(needs.smoke_test_matrix.outputs.matrix) }}
367367
if: ${{ always() && !failure() && !cancelled() && needs.smoke_test_matrix.outputs.matrix != '[]' }}
368+
env:
369+
# TODO Chakru: Review if required
370+
MIXPANEL_API_SECRET: ${{ secrets.MIXPANEL_API_SECRET }}
371+
MIXPANEL_PROJECT_ID: ${{ secrets.MIXPANEL_PROJECT_ID }}
368372
steps:
369373
- name: Free up disk space
370374
if: ${{ !contains(needs.setup.outputs.test_runner_type, 'depot') }}

metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConvention.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public interface TopicConvention {
5757
@Nonnull
5858
String getPlatformEventTopicName();
5959

60+
/** The name of the datahub usage event topic. */
61+
@Nonnull
62+
String getDataHubUsageEventTopicName();
63+
6064
/**
6165
* Returns the name of the metadata change event (v5) kafka topic.
6266
*

metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConventionImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class TopicConventionImpl implements TopicConvention {
4141
private final String _failedMetadataChangeProposalTopicName;
4242
private final String _platformEventTopicName;
4343
private final String _dataHubUpgradeHistoryTopicName;
44-
44+
private final String _dataHubUsageEventTopicName;
4545
// v5 patterns
4646
private final String _eventPattern;
4747

@@ -55,7 +55,8 @@ public TopicConventionImpl(
5555
@Nonnull String failedMetadataChangeProposalTopicName,
5656
@Nonnull String platformEventTopicName,
5757
@Nonnull String eventPattern,
58-
@Nonnull String dataHubUpgradeHistoryTopicName) {
58+
@Nonnull String dataHubUpgradeHistoryTopicName,
59+
@Nonnull String dataHubUsageEventTopicName) {
5960
_metadataChangeEventTopicName = metadataChangeEventTopicName;
6061
_metadataAuditEventTopicName = metadataAuditEventTopicName;
6162
_failedMetadataChangeEventTopicName = failedMetadataChangeEventTopicName;
@@ -66,6 +67,7 @@ public TopicConventionImpl(
6667
_platformEventTopicName = platformEventTopicName;
6768
_eventPattern = eventPattern;
6869
_dataHubUpgradeHistoryTopicName = dataHubUpgradeHistoryTopicName;
70+
_dataHubUsageEventTopicName = dataHubUsageEventTopicName;
6971
}
7072

7173
public TopicConventionImpl() {
@@ -79,7 +81,8 @@ public TopicConventionImpl() {
7981
Topics.FAILED_METADATA_CHANGE_PROPOSAL,
8082
Topics.PLATFORM_EVENT,
8183
DEFAULT_EVENT_PATTERN,
82-
Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME);
84+
Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME,
85+
Topics.DATAHUB_USAGE_EVENT);
8386
}
8487

8588
@Nonnull
@@ -130,6 +133,12 @@ public String getPlatformEventTopicName() {
130133
return _platformEventTopicName;
131134
}
132135

136+
@Nonnull
137+
@Override
138+
public String getDataHubUsageEventTopicName() {
139+
return _dataHubUsageEventTopicName;
140+
}
141+
133142
@Nonnull
134143
private String buildEventName(
135144
@Nonnull String eventType,

metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/transformer/DataHubUsageEventTransformer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,25 @@ public Optional<TransformedDocument> transformDataHubUsageEvent(String dataHubUs
7777
// Timestamp is required
7878
long timestampMillis;
7979
if (usageEvent.get(TIMESTAMP).isNumber()) {
80+
// Handle numeric timestamp (epoch milliseconds)
8081
timestampMillis = usageEvent.get(TIMESTAMP).asLong();
8182
} else if (usageEvent.get(TIMESTAMP).isTextual()) {
83+
// Handle ISO date string
8284
try {
8385
String isoDate = usageEvent.get(TIMESTAMP).asText();
8486
java.time.Instant instant = java.time.Instant.parse(isoDate);
8587
timestampMillis = instant.toEpochMilli();
8688
} catch (Exception e) {
87-
log.warn("Failed to parse ISO date string: {}", usageEvent.get(TIMESTAMP));
89+
log.warn(
90+
"Failed to parse ISO date string: {}. Defaulting to current system time",
91+
usageEvent.get(TIMESTAMP));
8892
timestampMillis = Instant.now().toEpochMilli();
8993
}
9094
} else {
9195
log.warn(
9296
"Invalid timestamp format - expected number or ISO date string but got: {}",
9397
usageEvent.get(TIMESTAMP));
94-
timestampMillis = Instant.now().toEpochMilli();
98+
return Optional.empty();
9599
}
96100

97101
log.debug("Raw timestamp value from event: {}", timestampMillis);
@@ -108,11 +112,11 @@ public Optional<TransformedDocument> transformDataHubUsageEvent(String dataHubUs
108112
}
109113

110114
try {
111-
return Optional.of(
112-
new TransformedDocument(
113-
getId(eventDocument), OBJECT_MAPPER.writeValueAsString(eventDocument)));
115+
String serializedDoc = OBJECT_MAPPER.writeValueAsString(eventDocument);
116+
log.debug("Final serialized document: {}", serializedDoc);
117+
return Optional.of(new TransformedDocument(getId(eventDocument), serializedDoc));
114118
} catch (JsonProcessingException e) {
115-
log.info("Failed to package document: {}", eventDocument);
119+
log.error("Failed to package document: {}", eventDocument);
116120
return Optional.empty();
117121
}
118122
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.datahub.telemetry;
2+
3+
/** Enum representing the possible destinations for tracking events. */
4+
public enum TrackingDestination {
5+
MIXPANEL,
6+
KAFKA
7+
}

0 commit comments

Comments
 (0)