Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New kV representation and more flexible extension for data Entry data structure #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public interface PositionStorageReader {

/**
* Get the position for the specified partition.
* Get the position for the specified queueId.
*
* @param partition
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,25 @@ public void setShardingKey(String shardingKey) {
this.shardingKey = shardingKey;
}

@Override public String toString() {
@Override
public String toString() {
return "QueueMetaData{" +
"queueName='" + queueName + '\'' +
", shardingKey='" + shardingKey + '\'' +
'}';
}

@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof QueueMetaData))
return false;
QueueMetaData data = (QueueMetaData) o;
@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (!(o instanceof QueueMetaData)) { return false; }
QueueMetaData data = (QueueMetaData)o;
return Objects.equals(queueName, data.queueName) &&
Objects.equals(shardingKey, data.shardingKey);
}

@Override public int hashCode() {
@Override
public int hashCode() {
return Objects.hash(queueName, shardingKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,76 @@

package io.openmessaging.connector.api.data;

import java.util.Arrays;
import java.util.Objects;

import io.openmessaging.connector.api.header.DataHeaders;
import io.openmessaging.connector.api.header.Header;
import io.openmessaging.connector.api.header.Headers;

/**
* Base class for records containing data to be copied to/from message queue.
*
* @version OMS 0.1.0
* @since OMS 0.1.0
*/
public abstract class DataEntry {

public DataEntry(Long timestamp,
EntryType entryType,
String queueName,
Schema schema,
Object[] payload) {
this(timestamp, entryType, queueName, schema, null, payload);
}

public DataEntry(Long timestamp,
EntryType entryType,
String queueName,
Schema schema,
String shardingKey,
Object[] payload) {
this.timestamp = timestamp;
this.entryType = entryType;
this.queueName = queueName;
this.schema = schema;
this.shardingKey = shardingKey;
this.payload = payload;
}

/**
* Timestamp of the data entry.
*/
private Long timestamp;

/**
* Type of the data entry.
*/
private EntryType entryType;

/**
* Related queueName.
*/
private String queueName;

/**
* Used for shard to related queue/partition.
*/
private String shardingKey;

/**
* Schema of the data entry.
* {@link EntryType} of the {@link DataEntry}
*/
private Schema schema;

private EntryType entryType;
/**
* Definition data key.
*/
private MetaAndData key;
/**
* Payload of the data entry.
* Definition data value.
*/
private Object[] payload;
private MetaAndData value;
/**
* The Headers of data.
*/
private Headers headers;

public DataEntry(Long timestamp,
String queueName,
String shardingKey,
EntryType entryType,
MetaAndData key,
MetaAndData value) {
this(timestamp, queueName, shardingKey, entryType, key, value, new DataHeaders());
}

public DataEntry(Long timestamp,
String queueName,
String shardingKey,
EntryType entryType,
MetaAndData key,
MetaAndData value,
Iterable<Header> headers) {
this.timestamp = timestamp;
this.queueName = queueName;
this.shardingKey = shardingKey;
this.entryType = entryType;
this.key = key;
this.value = value;
if (headers instanceof DataHeaders) {
this.headers = (DataHeaders)headers;
} else {
this.headers = new DataHeaders(headers);
}
}

public Long getTimestamp() {
return timestamp;
Expand All @@ -104,20 +112,28 @@ public void setQueueName(String queueName) {
this.queueName = queueName;
}

public Schema getSchema() {
return schema;
public MetaAndData getKey() {
return key;
}

public void setKey(MetaAndData meta) {
this.key = meta;
}

public MetaAndData getValue() {
return value;
}

public void setSchema(Schema schema) {
this.schema = schema;
public void setValue(MetaAndData value) {
this.value = value;
}

public Object[] getPayload() {
return payload;
public Headers getHeaders() {
return headers;
}

public void setPayload(Object[] payload) {
this.payload = payload;
public void setHeaders(Headers headers) {
this.headers = headers;
}

public String getShardingKey() {
Expand All @@ -128,34 +144,48 @@ public void setShardingKey(String shardingKey) {
this.shardingKey = shardingKey;
}

@Override public String toString() {
@Override
public String toString() {
return "DataEntry{" +
"timestamp=" + timestamp +
", entryType=" + entryType +
", queueName='" + queueName + '\'' +
", shardingKey='" + shardingKey + '\'' +
", schema=" + schema +
", payload=" + Arrays.toString(payload) +
"queueName='" + this.queueName + '\'' +
", shardingKey='" + this.shardingKey +
", entryType='" + this.entryType + '\'' +
", key='" + this.key + '\'' +
", value='" + this.value + '\'' +
", timestamp='" + timestamp + '\'' +
", headers'=" + headers + '\'' +
'}';
}

@Override public boolean equals(Object o) {
if (this == o)
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
if (!(o instanceof DataEntry))
}
if (o == null || getClass() != o.getClass()) {
return false;
DataEntry entry = (DataEntry) o;
return Objects.equals(timestamp, entry.timestamp) &&
entryType == entry.entryType &&
Objects.equals(queueName, entry.queueName) &&
Objects.equals(shardingKey, entry.shardingKey) &&
Objects.equals(schema, entry.schema) &&
Arrays.equals(payload, entry.payload);
}

@Override public int hashCode() {
int result = Objects.hash(timestamp, entryType, queueName, shardingKey, schema);
result = 31 * result + Arrays.hashCode(payload);
}

DataEntry that = (DataEntry)o;

return Objects.equals(this.shardingKey, that.shardingKey)
&& Objects.equals(this.queueName, that.queueName)
&& Objects.equals(this.entryType, that.entryType)
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.value, that.value)
&& Objects.equals(this.timestamp, that.timestamp)
&& Objects.equals(this.headers, that.headers);
}

@Override
public int hashCode() {
int result = this.queueName != null ? this.queueName.hashCode() : 0;
result = 31 * result + (this.shardingKey != null ? this.shardingKey.hashCode() : 0);
result = 31 * result + (this.entryType != null ? entryType.hashCode() : 0);
result = 31 * result + (this.key != null ? this.key.hashCode() : 0);
result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
result = 31 * result + (this.timestamp != null ? this.timestamp.hashCode() : 0);
result = 31 * result + this.headers.hashCode();
return result;
}
}
Loading