Skip to content

Commit 195ea96

Browse files
feat: add instrumentation for a couple of OpenTelemetry metrics (#2501)
* feat: add instrumentation for a couple OpenTelemetry metrics * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 9708ef4 commit 195ea96

File tree

9 files changed

+321
-7
lines changed

9 files changed

+321
-7
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
5757
If you are using Gradle without BOM, add this to your dependencies:
5858

5959
```Groovy
60-
implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.1'
60+
implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.2'
6161
```
6262

6363
If you are using SBT, add this to your dependencies:
6464

6565
```Scala
66-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.1"
66+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.2"
6767
```
6868
<!-- {x-version-update-end} -->
6969

@@ -221,7 +221,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
221221
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
222222
[stability-image]: https://img.shields.io/badge/stability-stable-green
223223
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
224-
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.1
224+
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.2
225225
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
226226
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
227227
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles

google-cloud-bigquerystorage/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@
156156
<artifactId>google-auth-library-credentials</artifactId>
157157
<version>1.23.0</version>
158158
</dependency>
159+
<dependency>
160+
<groupId>io.opentelemetry</groupId>
161+
<artifactId>opentelemetry-api</artifactId>
162+
</dependency>
159163

160164
<!-- Test dependencies -->
161165
<dependency>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,21 @@
3636
import io.grpc.Status;
3737
import io.grpc.Status.Code;
3838
import io.grpc.StatusRuntimeException;
39+
import io.opentelemetry.api.common.AttributeKey;
40+
import io.opentelemetry.api.common.Attributes;
41+
import io.opentelemetry.api.common.AttributesBuilder;
42+
import io.opentelemetry.api.metrics.LongCounter;
43+
import io.opentelemetry.api.metrics.Meter;
44+
import io.opentelemetry.api.metrics.MeterProvider;
3945
import java.io.IOException;
4046
import java.time.Duration;
4147
import java.time.Instant;
48+
import java.util.ArrayList;
4249
import java.util.Comparator;
4350
import java.util.Deque;
4451
import java.util.HashMap;
4552
import java.util.LinkedList;
53+
import java.util.List;
4654
import java.util.Map;
4755
import java.util.Set;
4856
import java.util.UUID;
@@ -253,6 +261,24 @@ class ConnectionWorker implements AutoCloseable {
253261
static final Pattern DEFAULT_STREAM_PATTERN =
254262
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");
255263

264+
private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
265+
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
266+
private Meter writeMeter;
267+
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
268+
private static String dataflowPrefix = "dataflow:";
269+
static List<AttributeKey<String>> telemetryKeysTraceId =
270+
new ArrayList<AttributeKey<String>>() {
271+
{
272+
add(AttributeKey.stringKey("trace_field_1"));
273+
add(AttributeKey.stringKey("trace_field_2"));
274+
add(AttributeKey.stringKey("trace_field_3"));
275+
}
276+
};
277+
private Attributes telemetryAttributes;
278+
private LongCounter instrumentIncomingRequestCount;
279+
private LongCounter instrumentIncomingRequestSize;
280+
private LongCounter instrumentIncomingRequestRows;
281+
256282
public static Boolean isDefaultStreamName(String streamName) {
257283
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
258284
return matcher.matches();
@@ -278,6 +304,85 @@ static String getRoutingHeader(String streamName, String location) {
278304
return project + "locations/" + location;
279305
}
280306

307+
private String getTableName() {
308+
Matcher tableMatcher = streamPatternTable.matcher(this.streamName);
309+
return tableMatcher.find() ? tableMatcher.group(1) : "";
310+
}
311+
312+
private void setTraceIdAttributesPart(
313+
AttributesBuilder builder,
314+
String[] traceIdParts,
315+
int indexPartsToCheck,
316+
int indexTelemetryKeysToUse) {
317+
if ((indexPartsToCheck < traceIdParts.length) && !traceIdParts[indexPartsToCheck].isEmpty()) {
318+
builder.put(
319+
telemetryKeysTraceId.get(indexTelemetryKeysToUse), traceIdParts[indexPartsToCheck]);
320+
}
321+
}
322+
323+
private void setTraceIdAttributes(AttributesBuilder builder) {
324+
if ((this.traceId != null) && !this.traceId.isEmpty()) {
325+
int indexDataflow = this.traceId.toLowerCase().indexOf(dataflowPrefix);
326+
if (indexDataflow >= 0) {
327+
String[] traceIdParts =
328+
this.traceId.substring(indexDataflow + dataflowPrefix.length()).split(":", 8);
329+
setTraceIdAttributesPart(builder, traceIdParts, 0, 0);
330+
setTraceIdAttributesPart(builder, traceIdParts, 1, 1);
331+
setTraceIdAttributesPart(builder, traceIdParts, 2, 2);
332+
}
333+
}
334+
}
335+
336+
private Attributes buildOpenTelemetryAttributes() {
337+
AttributesBuilder builder = Attributes.builder();
338+
String tableName = getTableName();
339+
if (!tableName.isEmpty()) {
340+
builder.put(telemetryKeyTableId, tableName);
341+
}
342+
setTraceIdAttributes(builder);
343+
return builder.build();
344+
}
345+
346+
private void refreshOpenTelemetryTableNameAttributes() {
347+
String tableName = getTableName();
348+
if (!tableName.isEmpty()
349+
&& !tableName.equals(getTelemetryAttributes().get(telemetryKeyTableId))) {
350+
AttributesBuilder builder = getTelemetryAttributes().toBuilder();
351+
builder.put(telemetryKeyTableId, tableName);
352+
this.telemetryAttributes = builder.build();
353+
}
354+
}
355+
356+
@VisibleForTesting
357+
Attributes getTelemetryAttributes() {
358+
return telemetryAttributes;
359+
}
360+
361+
private void registerOpenTelemetryMetrics() {
362+
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
363+
writeMeter =
364+
meterProvider
365+
.meterBuilder("com.google.cloud.bigquery.storage.v1.write")
366+
.setInstrumentationVersion(
367+
ConnectionWorker.class.getPackage().getImplementationVersion())
368+
.build();
369+
instrumentIncomingRequestCount =
370+
writeMeter
371+
.counterBuilder("append_requests")
372+
.setDescription("Counts number of incoming requests")
373+
.build();
374+
instrumentIncomingRequestSize =
375+
writeMeter
376+
.counterBuilder("append_request_bytes")
377+
.setDescription("Counts byte size of incoming requests")
378+
.build();
379+
instrumentIncomingRequestRows =
380+
writeMeter
381+
.counterBuilder("append_rows")
382+
.setDescription("Counts number of incoming request rows")
383+
.build();
384+
}
385+
281386
public ConnectionWorker(
282387
String streamName,
283388
String location,
@@ -312,6 +417,9 @@ public ConnectionWorker(
312417
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
313418
this.compressorName = compressorName;
314419
this.retrySettings = retrySettings;
420+
this.telemetryAttributes = buildOpenTelemetryAttributes();
421+
registerOpenTelemetryMetrics();
422+
315423
// Always recreate a client for connection worker.
316424
HashMap<String, String> newHeaders = new HashMap<>();
317425
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
@@ -507,6 +615,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
507615
+ requestWrapper.messageSize)));
508616
return requestWrapper.appendResult;
509617
}
618+
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
619+
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
620+
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
510621
this.lock.lock();
511622
try {
512623
if (userClosed) {
@@ -783,6 +894,7 @@ private void appendLoop() {
783894
|| (originalRequest.getProtoRows().hasWriterSchema()
784895
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
785896
streamName = originalRequest.getWriteStream();
897+
refreshOpenTelemetryTableNameAttributes();
786898
writerSchema = originalRequest.getProtoRows().getWriterSchema();
787899
isMultiplexing = true;
788900
firstRequestForTableOrSchemaSwitch = true;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.common.base.Stopwatch;
2828
import com.google.common.collect.ImmutableList;
2929
import com.google.common.util.concurrent.MoreExecutors;
30+
import io.opentelemetry.api.common.Attributes;
3031
import java.io.IOException;
3132
import java.util.Collections;
3233
import java.util.Comparator;
@@ -238,9 +239,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows)
238239
return append(streamWriter, rows, -1);
239240
}
240241

241-
/** Distributes the writing of a message to an underlying connection. */
242-
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
243-
// We are in multiplexing mode after entering the following logic.
242+
ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
244243
ConnectionWorker connectionWorker;
245244
lock.lock();
246245
try {
@@ -277,6 +276,13 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
277276
} finally {
278277
lock.unlock();
279278
}
279+
return connectionWorker;
280+
}
281+
282+
/** Distributes the writing of a message to an underlying connection. */
283+
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
284+
// We are in multiplexing mode after entering the following logic.
285+
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
280286
Stopwatch stopwatch = Stopwatch.createStarted();
281287
ApiFuture<AppendRowsResponse> responseFuture =
282288
connectionWorker.append(streamWriter, rows, offset);
@@ -294,6 +300,12 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
294300
MoreExecutors.directExecutor());
295301
}
296302

303+
@VisibleForTesting
304+
Attributes getTelemetryAttributes(StreamWriter streamWriter) {
305+
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
306+
return connectionWorker.getTelemetryAttributes();
307+
}
308+
297309
/**
298310
* Create a new connection if we haven't reached current maximum, or reuse an existing connection
299311
* with least load.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1;
17+
18+
import io.opentelemetry.api.GlobalOpenTelemetry;
19+
import io.opentelemetry.api.OpenTelemetry;
20+
import java.util.logging.Logger;
21+
22+
/** Container for global singleton objects. */
23+
class Singletons {
24+
25+
private static final Logger log = Logger.getLogger(Singletons.class.getName());
26+
27+
// Global OpenTelemetry instance
28+
private static OpenTelemetry openTelemetry = null;
29+
30+
static OpenTelemetry getOpenTelemetry() {
31+
if (openTelemetry == null) {
32+
openTelemetry = GlobalOpenTelemetry.get();
33+
}
34+
return openTelemetry;
35+
}
36+
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.grpc.Status;
3535
import io.grpc.Status.Code;
3636
import io.grpc.StatusRuntimeException;
37+
import io.opentelemetry.api.common.Attributes;
3738
import java.io.IOException;
3839
import java.time.Duration;
3940
import java.util.HashMap;
@@ -172,6 +173,15 @@ public ApiFuture<AppendRowsResponse> append(
172173
}
173174
}
174175

176+
@VisibleForTesting
177+
Attributes getTelemetryAttributes(StreamWriter streamWriter) {
178+
if (getKind() == Kind.CONNECTION_WORKER) {
179+
return connectionWorker().getTelemetryAttributes();
180+
} else {
181+
return connectionWorkerPool().getTelemetryAttributes(streamWriter);
182+
}
183+
}
184+
175185
public void close(StreamWriter streamWriter) {
176186
if (getKind() == Kind.CONNECTION_WORKER) {
177187
connectionWorker().close();
@@ -459,6 +469,11 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
459469
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
460470
}
461471

472+
@VisibleForTesting
473+
Attributes getTelemetryAttributes() {
474+
return this.singleConnectionOrConnectionPool.getTelemetryAttributes(this);
475+
}
476+
462477
/**
463478
* Returns the wait of a request in Client side before sending to the Server. Request could wait
464479
* in Client because it reached the client side inflight request limit (adjustable when

0 commit comments

Comments
 (0)