3636import io .grpc .Status ;
3737import io .grpc .Status .Code ;
3838import io .grpc .StatusRuntimeException ;
39+ import io .opentelemetry .api .common .AttributeKey ;
40+ import io .opentelemetry .api .common .Attributes ;
41+ import io .opentelemetry .api .common .AttributesBuilder ;
42+ import io .opentelemetry .api .metrics .LongCounter ;
43+ import io .opentelemetry .api .metrics .Meter ;
44+ import io .opentelemetry .api .metrics .MeterProvider ;
3945import java .io .IOException ;
4046import java .time .Duration ;
4147import java .time .Instant ;
48+ import java .util .ArrayList ;
4249import java .util .Comparator ;
4350import java .util .Deque ;
4451import java .util .HashMap ;
4552import java .util .LinkedList ;
53+ import java .util .List ;
4654import java .util .Map ;
4755import java .util .Set ;
4856import java .util .UUID ;
@@ -253,6 +261,24 @@ class ConnectionWorker implements AutoCloseable {
253261 static final Pattern DEFAULT_STREAM_PATTERN =
254262 Pattern .compile ("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$" );
255263
264+ private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/" ;
265+ private static Pattern streamPatternTable = Pattern .compile (tableMatching );
266+ private Meter writeMeter ;
267+ static AttributeKey <String > telemetryKeyTableId = AttributeKey .stringKey ("table_id" );
268+ private static String dataflowPrefix = "dataflow:" ;
269+ static List <AttributeKey <String >> telemetryKeysTraceId =
270+ new ArrayList <AttributeKey <String >>() {
271+ {
272+ add (AttributeKey .stringKey ("trace_field_1" ));
273+ add (AttributeKey .stringKey ("trace_field_2" ));
274+ add (AttributeKey .stringKey ("trace_field_3" ));
275+ }
276+ };
277+ private Attributes telemetryAttributes ;
278+ private LongCounter instrumentIncomingRequestCount ;
279+ private LongCounter instrumentIncomingRequestSize ;
280+ private LongCounter instrumentIncomingRequestRows ;
281+
256282 public static Boolean isDefaultStreamName (String streamName ) {
257283 Matcher matcher = DEFAULT_STREAM_PATTERN .matcher (streamName );
258284 return matcher .matches ();
@@ -278,6 +304,85 @@ static String getRoutingHeader(String streamName, String location) {
278304 return project + "locations/" + location ;
279305 }
280306
307+ private String getTableName () {
308+ Matcher tableMatcher = streamPatternTable .matcher (this .streamName );
309+ return tableMatcher .find () ? tableMatcher .group (1 ) : "" ;
310+ }
311+
312+ private void setTraceIdAttributesPart (
313+ AttributesBuilder builder ,
314+ String [] traceIdParts ,
315+ int indexPartsToCheck ,
316+ int indexTelemetryKeysToUse ) {
317+ if ((indexPartsToCheck < traceIdParts .length ) && !traceIdParts [indexPartsToCheck ].isEmpty ()) {
318+ builder .put (
319+ telemetryKeysTraceId .get (indexTelemetryKeysToUse ), traceIdParts [indexPartsToCheck ]);
320+ }
321+ }
322+
323+ private void setTraceIdAttributes (AttributesBuilder builder ) {
324+ if ((this .traceId != null ) && !this .traceId .isEmpty ()) {
325+ int indexDataflow = this .traceId .toLowerCase ().indexOf (dataflowPrefix );
326+ if (indexDataflow >= 0 ) {
327+ String [] traceIdParts =
328+ this .traceId .substring (indexDataflow + dataflowPrefix .length ()).split (":" , 8 );
329+ setTraceIdAttributesPart (builder , traceIdParts , 0 , 0 );
330+ setTraceIdAttributesPart (builder , traceIdParts , 1 , 1 );
331+ setTraceIdAttributesPart (builder , traceIdParts , 2 , 2 );
332+ }
333+ }
334+ }
335+
336+ private Attributes buildOpenTelemetryAttributes () {
337+ AttributesBuilder builder = Attributes .builder ();
338+ String tableName = getTableName ();
339+ if (!tableName .isEmpty ()) {
340+ builder .put (telemetryKeyTableId , tableName );
341+ }
342+ setTraceIdAttributes (builder );
343+ return builder .build ();
344+ }
345+
346+ private void refreshOpenTelemetryTableNameAttributes () {
347+ String tableName = getTableName ();
348+ if (!tableName .isEmpty ()
349+ && !tableName .equals (getTelemetryAttributes ().get (telemetryKeyTableId ))) {
350+ AttributesBuilder builder = getTelemetryAttributes ().toBuilder ();
351+ builder .put (telemetryKeyTableId , tableName );
352+ this .telemetryAttributes = builder .build ();
353+ }
354+ }
355+
356+ @ VisibleForTesting
357+ Attributes getTelemetryAttributes () {
358+ return telemetryAttributes ;
359+ }
360+
361+ private void registerOpenTelemetryMetrics () {
362+ MeterProvider meterProvider = Singletons .getOpenTelemetry ().getMeterProvider ();
363+ writeMeter =
364+ meterProvider
365+ .meterBuilder ("com.google.cloud.bigquery.storage.v1.write" )
366+ .setInstrumentationVersion (
367+ ConnectionWorker .class .getPackage ().getImplementationVersion ())
368+ .build ();
369+ instrumentIncomingRequestCount =
370+ writeMeter
371+ .counterBuilder ("append_requests" )
372+ .setDescription ("Counts number of incoming requests" )
373+ .build ();
374+ instrumentIncomingRequestSize =
375+ writeMeter
376+ .counterBuilder ("append_request_bytes" )
377+ .setDescription ("Counts byte size of incoming requests" )
378+ .build ();
379+ instrumentIncomingRequestRows =
380+ writeMeter
381+ .counterBuilder ("append_rows" )
382+ .setDescription ("Counts number of incoming request rows" )
383+ .build ();
384+ }
385+
281386 public ConnectionWorker (
282387 String streamName ,
283388 String location ,
@@ -312,6 +417,9 @@ public ConnectionWorker(
312417 this .inflightRequestQueue = new LinkedList <AppendRequestAndResponse >();
313418 this .compressorName = compressorName ;
314419 this .retrySettings = retrySettings ;
420+ this .telemetryAttributes = buildOpenTelemetryAttributes ();
421+ registerOpenTelemetryMetrics ();
422+
315423 // Always recreate a client for connection worker.
316424 HashMap <String , String > newHeaders = new HashMap <>();
317425 newHeaders .putAll (clientSettings .toBuilder ().getHeaderProvider ().getHeaders ());
@@ -507,6 +615,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
507615 + requestWrapper .messageSize )));
508616 return requestWrapper .appendResult ;
509617 }
618+ instrumentIncomingRequestCount .add (1 , getTelemetryAttributes ());
619+ instrumentIncomingRequestSize .add (requestWrapper .messageSize , getTelemetryAttributes ());
620+ instrumentIncomingRequestRows .add (message .getProtoRows ().getRows ().getSerializedRowsCount ());
510621 this .lock .lock ();
511622 try {
512623 if (userClosed ) {
@@ -783,6 +894,7 @@ private void appendLoop() {
783894 || (originalRequest .getProtoRows ().hasWriterSchema ()
784895 && !originalRequest .getProtoRows ().getWriterSchema ().equals (writerSchema ))) {
785896 streamName = originalRequest .getWriteStream ();
897+ refreshOpenTelemetryTableNameAttributes ();
786898 writerSchema = originalRequest .getProtoRows ().getWriterSchema ();
787899 isMultiplexing = true ;
788900 firstRequestForTableOrSchemaSwitch = true ;
0 commit comments