Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCER] rewrite proudcer.Metadata by java 17 record #1751

Merged
merged 1 commit into from
May 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 26 additions & 68 deletions common/src/main/java/org/astraea/common/producer/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,31 @@
import java.util.Optional;
import org.apache.kafka.clients.producer.RecordMetadata;

public interface Metadata {

static Metadata of(RecordMetadata metadata) {
return new Metadata() {
@Override
public long offset() {
return metadata.offset();
}

@Override
public int serializedKeySize() {
return metadata.serializedKeySize();
}

@Override
public int serializedValueSize() {
return metadata.serializedValueSize();
}

@Override
public Optional<Long> timestamp() {
return metadata.hasTimestamp() ? Optional.of(metadata.timestamp()) : Optional.empty();
}

@Override
public String topic() {
return metadata.topic();
}

@Override
public int partition() {
return metadata.partition();
}
};
/**
* @param topic The topic the record was appended to
* @param partition The partition the record was sent to
* @param offset The offset of the record in the topic/partition.
* @param serializedKeySize The size of the serialized, uncompressed key in bytes. If key is null,
* the returned size is -1.
* @param serializedValueSize The size of the serialized, uncompressed value in bytes. If value is
* null, the returned size is -1.
* @param timestamp the timestamp of the record
*/
public record Metadata(
String topic,
int partition,
long offset,
int serializedKeySize,
int serializedValueSize,
Optional<Long> timestamp) {

public static Metadata of(RecordMetadata metadata) {
return new Metadata(
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.serializedKeySize(),
metadata.serializedValueSize(),
metadata.hasTimestamp() ? Optional.of(metadata.timestamp()) : Optional.empty());
}

/**
* The offset of the record in the topic/partition.
*
* @return the offset of the record, or -1
*/
long offset();

/**
* @return The size of the serialized, uncompressed key in bytes. If key is null, the returned
* size is -1.
*/
int serializedKeySize();

/**
* @return The size of the serialized, uncompressed value in bytes. If value is null, the returned
* size is -1.
*/
int serializedValueSize();

/**
* @return the timestamp of the record
*/
Optional<Long> timestamp();

/**
* @return The topic the record was appended to
*/
String topic();

/**
* @return The partition the record was sent to
*/
int partition();
}