Skip to content

Commit

Permalink
Factor out object converter from event buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
0xaa4eb committed Oct 12, 2024
1 parent c5725c8 commit 4749196
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.ulyp.agent;

import com.ulyp.core.Type;
import com.ulyp.core.TypeResolver;
import com.ulyp.core.bytes.BufferBytesOut;
import com.ulyp.core.recorders.*;
import com.ulyp.core.util.LoggingSettings;
import com.ulyp.core.util.SystemPropertyUtil;
import lombok.extern.slf4j.Slf4j;
import org.agrona.concurrent.UnsafeBuffer;


@Slf4j
public class ByTypeRecordedObjectConverter implements RecordedObjectConverter {

private static final int TMP_BUFFER_SIZE = SystemPropertyUtil.getInt("ulyp.recording.tmp-buffer.size", 16 * 1024);

private final TypeResolver typeResolver;
private byte[] tmpBuffer;

public ByTypeRecordedObjectConverter(TypeResolver typeResolver) {
this.typeResolver = typeResolver;
}

@Override
public Object[] prepare(Object[] args) {
if (args == null) {
return null;
}
for (int i = 0; i < args.length; i++) {
args[i] = prepare(args[i]);
}
return args;
}

/**
* Resolves type for an object, then checks if it can be recorded asynchronously in a background thread. Most objects
* have only their identity hash code and type id recorded, so it can be safely done concurrently in some other thread.
* Collections have a few of their items recorded (if enabled), so the recording must happen here.
*/
public Object prepare(Object value) {
if (value == null) {
return null;
}
Type type = typeResolver.get(value);
ObjectRecorder recorder = type.getRecorderHint();
if (recorder == null) {
recorder = RecorderChooser.getInstance().chooseForType(value.getClass());
type.setRecorderHint(recorder);
}
if (recorder.supportsAsyncRecording()) {
if (recorder instanceof IdentityRecorder) {
return new QueuedIdentityObject(type.getId(), value);
} else {
return value;
}
} else {
BufferBytesOut output = new BufferBytesOut(new UnsafeBuffer(getTmpBuffer()));
try {
recorder.write(value, output, typeResolver);
return new QueuedRecordedObject(type, recorder.getId(), output.copy());
} catch (Exception e) {
if (LoggingSettings.DEBUG_ENABLED) {
log.debug("Error while recording object", e);
}
return new QueuedIdentityObject(type.getId(), value);
}
}
}

private byte[] getTmpBuffer() {
if (tmpBuffer == null) {
tmpBuffer = new byte[TMP_BUFFER_SIZE];
}
return tmpBuffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ulyp.agent;

import lombok.extern.slf4j.Slf4j;

/**
* Passes all objects to the background thread by reference. Can only be enabled if collections and arrays recording
* is turned off.
*/
@Slf4j
public class PassByRefRecordedObjectConverter implements RecordedObjectConverter {

public static final RecordedObjectConverter INSTANCE = new PassByRefRecordedObjectConverter();

public Object prepare(Object arg) {
return arg;
}

public Object[] prepare(Object[] args) {
return args;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ulyp.agent;

/**
* Prepares (and possibly converts) object for sending to the background thread. The background thread does actual recording using the recorders.
* Most of the time we just pass reference to an object to the background thread which calls recorders. This allows us to
* offload some work from the client app threads to the background thread.
* This works since most objects are either:
* 1) immutable
* 2) we only record their type id and identity hash code.
* So, we can record their values (i.e. access their fields) in some other threads.
* All other objects like collections and arrays must be recorded immediately in the client app thread.
*/
public interface RecordedObjectConverter {

Object prepare(Object arg);

Object[] prepare(Object[] args);
}
44 changes: 31 additions & 13 deletions ulyp-agent-core/src/main/java/com/ulyp/agent/Recorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void disableRecording() {
if (recordingCtx != null) {
recordingCtx.setEnabled(false);
} else {
recordingCtx = new RecordingThreadLocalContext();
recordingCtx = new RecordingThreadLocalContext(options, typeResolver);
recordingCtx.setEnabled(false);
threadLocalRecordingCtx.set(recordingCtx);
}
Expand Down Expand Up @@ -118,13 +118,13 @@ public long startRecordingOnMethodEnter(int methodId, @Nullable Object callee, O
private RecordingThreadLocalContext initializeRecordingCtx(int methodId) {
RecordingThreadLocalContext recordingCtx = threadLocalRecordingCtx.get();
if (recordingCtx == null) {
recordingCtx = new RecordingThreadLocalContext();
recordingCtx = new RecordingThreadLocalContext(options, typeResolver);
recordingCtx.setEnabled(false);
int recordingId = recordingContextStore.add(recordingCtx);
RecordingMetadata recordingMetadata = generateRecordingMetadata(recordingId);
recordingCtx.setRecordingMetadata(recordingMetadata);
threadLocalRecordingCtx.set(recordingCtx);
RecordingEventBuffer recordingEventBuffer = new RecordingEventBuffer(recordingMetadata.getId(), options, typeResolver);
RecordingEventBuffer recordingEventBuffer = new RecordingEventBuffer(recordingMetadata.getId());
recordingCtx.setEventBuffer(recordingEventBuffer);

currentRecordingSessionCount.incrementAndGet();
Expand Down Expand Up @@ -152,12 +152,13 @@ public long onMethodEnter(RecordingThreadLocalContext recordingCtx, int methodId

try {
recordingCtx.setEnabled(false);
RecordedObjectConverter objectConverter = recordingCtx.getObjectConverter();
int callId = recordingCtx.nextCallId();
RecordingEventBuffer eventBuffer = recordingCtx.getEventBuffer();
if (AgentOptions.TIMESTAMPS_ENABLED) {
eventBuffer.appendMethodEnterEvent(methodId, callee, args, System.nanoTime());
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(args), System.nanoTime());
} else {
eventBuffer.appendMethodEnterEvent(methodId, callee, args);
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(args));
}
dropIfFull(eventBuffer);
return BitUtil.longFromInts(recordingCtx.getRecordingId(), callId);
Expand All @@ -184,12 +185,13 @@ public long onMethodEnter(RecordingThreadLocalContext recordingCtx, int methodId

try {
recordingCtx.setEnabled(false);
RecordedObjectConverter objectConverter = recordingCtx.getObjectConverter();
int callId = recordingCtx.nextCallId();
RecordingEventBuffer eventBuffer = recordingCtx.getEventBuffer();
if (AgentOptions.TIMESTAMPS_ENABLED) {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg, System.nanoTime());
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(arg), System.nanoTime());
} else {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg);
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(arg));
}
dropIfFull(eventBuffer);
return BitUtil.longFromInts(recordingCtx.getRecordingId(), callId);
Expand All @@ -216,12 +218,13 @@ public long onMethodEnter(RecordingThreadLocalContext recordingCtx, int methodId

try {
recordingCtx.setEnabled(false);
RecordedObjectConverter objectConverter = recordingCtx.getObjectConverter();
int callId = recordingCtx.nextCallId();
RecordingEventBuffer eventBuffer = recordingCtx.getEventBuffer();
if (AgentOptions.TIMESTAMPS_ENABLED) {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg1, arg2, System.nanoTime());
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(arg1), objectConverter.prepare(arg2), System.nanoTime());
} else {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg1, arg2);
eventBuffer.appendMethodEnterEvent(methodId, callee, objectConverter.prepare(arg1), objectConverter.prepare(arg2));
}
dropIfFull(eventBuffer);
return BitUtil.longFromInts(recordingCtx.getRecordingId(), callId);
Expand All @@ -248,12 +251,26 @@ public long onMethodEnter(RecordingThreadLocalContext recordingCtx, int methodId

try {
recordingCtx.setEnabled(false);
RecordedObjectConverter objectConverter = recordingCtx.getObjectConverter();
int callId = recordingCtx.nextCallId();
RecordingEventBuffer eventBuffer = recordingCtx.getEventBuffer();
if (AgentOptions.TIMESTAMPS_ENABLED) {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg1, arg2, arg3, System.nanoTime());
eventBuffer.appendMethodEnterEvent(
methodId,
callee,
objectConverter.prepare(arg1),
objectConverter.prepare(arg2),
objectConverter.prepare(arg3),
System.nanoTime()
);
} else {
eventBuffer.appendMethodEnterEvent(methodId, callee, arg1, arg2, arg3);
eventBuffer.appendMethodEnterEvent(
methodId,
callee,
objectConverter.prepare(arg1),
objectConverter.prepare(arg2),
objectConverter.prepare(arg3)
);
}
dropIfFull(eventBuffer);
return BitUtil.longFromInts(recordingCtx.getRecordingId(), callId);
Expand Down Expand Up @@ -313,11 +330,12 @@ public void onMethodExit(int methodId, Object result, Throwable thrown, long cal
try {
recordingCtx.setEnabled(false);

RecordedObjectConverter objectConverter = recordingCtx.getObjectConverter();
RecordingEventBuffer eventBuffer = recordingCtx.getEventBuffer();
if (AgentOptions.TIMESTAMPS_ENABLED) {
eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null, System.nanoTime());
eventBuffer.appendMethodExitEvent(callId, objectConverter.prepare(thrown != null ? thrown : result), thrown != null, System.nanoTime());
} else {
eventBuffer.appendMethodExitEvent(callId, thrown != null ? thrown : result, thrown != null);
eventBuffer.appendMethodExitEvent(callId, objectConverter.prepare(thrown != null ? thrown : result), thrown != null);
}

if (callId == RecordingThreadLocalContext.ROOT_CALL_RECORDING_ID) {
Expand Down
Loading

0 comments on commit 4749196

Please sign in to comment.