From df49e286e5d0525a903e6c5c8c75be606a6209c0 Mon Sep 17 00:00:00 2001
From: Robert Elliot <rob@lidalia.org.uk>
Date: Thu, 16 Jan 2025 14:26:05 +0000
Subject: [PATCH] Make access to ArrayDeque synchronized

ArrayDeque specifies that:

> Array deques ... are not thread-safe; in the absence of external
> synchronization, they do not support concurrent access by multiple
> threads.

`marshalerPool` is concurrently added to by the OkHttp threadpool
without synchronization, along with all threads that end spans (with
synchronisation in `SimpleSpanProcessor.exportLock`, which is not used
to synchronize with the OkHttp threadpool).

Just making the ArrayQueue synchronous internally removes all need to
worry about upstream locks.

Fixes #7019
---
 .../traces/SpanReusableDataMarshaler.java     | 22 +++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java
index af69e811c89..b423b341ccb 100644
--- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java
+++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java
@@ -11,7 +11,7 @@
 import io.opentelemetry.sdk.trace.data.SpanData;
 import java.util.ArrayDeque;
 import java.util.Collection;
-import java.util.Deque;
+import java.util.Queue;
 import java.util.function.BiFunction;
 
 /**
@@ -20,7 +20,8 @@
  */
 public class SpanReusableDataMarshaler {
 
-  private final Deque<LowAllocationTraceRequestMarshaler> marshalerPool = new ArrayDeque<>();
+  private final SynchronizedQueue<LowAllocationTraceRequestMarshaler> marshalerPool
+      = new SynchronizedQueue<>(new ArrayDeque<>());
 
   private final MemoryMode memoryMode;
   private final BiFunction<Marshaler, Integer, CompletableResultCode> doExport;
@@ -55,4 +56,21 @@ public CompletableResultCode export(Collection<SpanData> spans) {
     TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);
     return doExport.apply(request, spans.size());
   }
+
+  private static class SynchronizedQueue<T> {
+    private final Queue<T> queue;
+
+    private SynchronizedQueue(Queue<T> queue) {
+      this.queue = queue;
+    }
+
+    @SuppressWarnings("UnusedReturnValue")
+    public synchronized boolean add(T t) {
+      return queue.add(t);
+    }
+
+    public synchronized T poll() {
+      return queue.poll();
+    }
+  }
 }