Skip to content

Commit

Permalink
[PRODUCER] rewrite proudcer.Metadata by java 17 record (#1751)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 17, 2023
1 parent a69f7d1 commit 4d0c726
Showing 1 changed file with 26 additions and 68 deletions.
94 changes: 26 additions & 68 deletions common/src/main/java/org/astraea/common/producer/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,31 @@
import java.util.Optional;
import org.apache.kafka.clients.producer.RecordMetadata;

public interface Metadata {

static Metadata of(RecordMetadata metadata) {
return new Metadata() {
@Override
public long offset() {
return metadata.offset();
}

@Override
public int serializedKeySize() {
return metadata.serializedKeySize();
}

@Override
public int serializedValueSize() {
return metadata.serializedValueSize();
}

@Override
public Optional<Long> timestamp() {
return metadata.hasTimestamp() ? Optional.of(metadata.timestamp()) : Optional.empty();
}

@Override
public String topic() {
return metadata.topic();
}

@Override
public int partition() {
return metadata.partition();
}
};
/**
* @param topic The topic the record was appended to
* @param partition The partition the record was sent to
* @param offset The offset of the record in the topic/partition.
* @param serializedKeySize The size of the serialized, uncompressed key in bytes. If key is null,
* the returned size is -1.
* @param serializedValueSize The size of the serialized, uncompressed value in bytes. If value is
* null, the returned size is -1.
* @param timestamp the timestamp of the record
*/
public record Metadata(
String topic,
int partition,
long offset,
int serializedKeySize,
int serializedValueSize,
Optional<Long> timestamp) {

public static Metadata of(RecordMetadata metadata) {
return new Metadata(
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.serializedKeySize(),
metadata.serializedValueSize(),
metadata.hasTimestamp() ? Optional.of(metadata.timestamp()) : Optional.empty());
}

/**
* The offset of the record in the topic/partition.
*
* @return the offset of the record, or -1
*/
long offset();

/**
* @return The size of the serialized, uncompressed key in bytes. If key is null, the returned
* size is -1.
*/
int serializedKeySize();

/**
* @return The size of the serialized, uncompressed value in bytes. If value is null, the returned
* size is -1.
*/
int serializedValueSize();

/**
* @return the timestamp of the record
*/
Optional<Long> timestamp();

/**
* @return The topic the record was appended to
*/
String topic();

/**
* @return The partition the record was sent to
*/
int partition();
}

0 comments on commit 4d0c726

Please sign in to comment.