Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jan 25, 2018
2 parents 8363cef + e10c2be commit c9344db
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 42 deletions.
46 changes: 38 additions & 8 deletions .gitignore
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,22 +1,52 @@
# Package Files #
### How to update
# This is copied from OpenHFT/.gitignore
# update the original and run OpenHFT/update_gitignore.sh

### Compiled class file
*.class

### Package Files
*.jar
*.war
*.ear

# IntelliJ
### Log file
*.log

### IntelliJ
*.iml
*.ipr
*.iws
.idea
.attach_pid*

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
### Virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# Eclipse
### Maven template
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties

### Eclipse template
*.pydevproject
.metadata
.gradle
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.classpath
.project
.settings/
.loadpath

# maven
target

.attach_pid*
### Queue files
*.cq4t
*.cq4
2 changes: 1 addition & 1 deletion microbenchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.15.39-SNAPSHOT</version>
<version>1.15.56</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>chronicle-wire</artifactId>
<version>1.10.10-SNAPSHOT</version>
<version>1.10.14-SNAPSHOT</version>
<name>OpenHFT/Chronicle-Wire</name>
<description>Chronicle-Wire</description>
<packaging>bundle</packaging>
Expand All @@ -49,7 +49,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.15.45-SNAPSHOT</version>
<version>1.15.56</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/net/openhft/chronicle/wire/AbstractWire.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public long writeHeaderOfUnknownLength(final int safeLength, final long timeout,
* this would be done much in the same way that we store the index, containing both the cycle and seq number ),
* or to put it another way, this LongValue will store the end of the lastPosition in the same place that the index stores it’s cycle number.
* <p>
* When ever we store the lastPostion, we should also store the seqAndLastPosition.
* When ever we store the lastPosition, we should also store the seqAndLastPosition.
* <p>
* so to get the the lastPosition and the sequence number of the approximate end of the queue, first we read the lastPosition,
* then we read the seqAndLastPosition, if the last bits of the lastPosition, match the higher bits of seqAndLastPosition
Expand Down Expand Up @@ -428,7 +428,7 @@ private long tryWriteHeader0(int length, int safeLength) {
throw new IllegalArgumentException();
long pos = bytes.writePosition();

final int value = Wires.addMaskedPidToHeader(NOT_COMPLETE | length);
final int value = Wires.addMaskedTidToHeader(NOT_COMPLETE | length);
if (bytes.compareAndSwapInt(pos, 0, value)) {

int maxlen = length == UNKNOWN_LENGTH ? safeLength : length;
Expand All @@ -451,7 +451,7 @@ private long writeHeader0(int length, int safeLength, long timeout, TimeUnit tim

// System.out.println(Thread.currentThread()+" wh0 pos: "+pos+" hdr "+(int) headerNumber);
try {
final int value = Wires.addMaskedPidToHeader(NOT_COMPLETE | length);
final int value = Wires.addMaskedTidToHeader(NOT_COMPLETE | length);
for (; ; ) {
if (bytes.compareAndSwapInt(pos, 0, value)) {

Expand Down Expand Up @@ -516,7 +516,7 @@ public void updateHeader(final long position, final boolean metaData) throws Str
long pos = bytes.writePosition();
int actualLength = Maths.toUInt31(pos - position - 4);

int expectedHeader = Wires.addMaskedPidToHeader(NOT_COMPLETE | UNKNOWN_LENGTH);
int expectedHeader = Wires.addMaskedTidToHeader(NOT_COMPLETE | UNKNOWN_LENGTH);
int header = actualLength;
if (metaData) header |= META_DATA;
if (header == UNKNOWN_LENGTH)
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/openhft/chronicle/wire/TextWire.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public DocumentContext readingDocument() {
return readContext;
}


protected void initReadContext() {
if (readContext == null)
readContext = new TextReadDocumentContext(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private void invoke(Object o, @NotNull Method m, Object[] args) throws IllegalAc
*/
public boolean readOne() {
for (; ; ) {

try (DocumentContext context = in.readingDocument()) {
if (!context.isPresent())
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ else if (VanillaMethodReader.LOGGER.isDebugEnabled())

@NotNull
private static String errorMsg(CharSequence s, MessageHistory history, long sourceIndex) {
return "Unknown method-name='" + s + "' from " + history.lastSourceId() + " at " + Long.toHexString(sourceIndex) + " ~ " + (int) sourceIndex;

final String identifierType = s.length() != 0 && Character.isDigit(s.charAt(0)) ? "@MethodId" : "method-name";
return "Unknown " + identifierType + "='" + s + "' from " + history.lastSourceId() + " at " +
Long.toHexString(sourceIndex) + " ~ " + (int) sourceIndex;
}

public boolean ignoreDefaults() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void parseOne(@NotNull WireIn wireIn, O out) {
lastParslet = parslet;
}

private int parseInt(CharSequence sb) {
private static int parseInt(CharSequence sb) {
int acc = 0;
for (int i = 0; i < sb.length(); ++i) {
char ch = sb.charAt(i);
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/net/openhft/chronicle/wire/Wires.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.openhft.chronicle.wire;

import net.openhft.affinity.Affinity;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.VanillaBytes;
Expand Down Expand Up @@ -64,9 +65,9 @@ public enum Wires {
public static final int META_DATA = 1 << 30;
public static final int UNKNOWN_LENGTH = 0x0;
public static final int MAX_LENGTH = (1 << 30) - 1;
private static final boolean ENCODE_PID_IN_HEADER = Boolean.getBoolean("wire.encodePidInHeader");
private static final int PID_MASK = 0b01111111_11111111_11111111_11111111;
private static final int INVERSE_PID_MASK = ~PID_MASK;
private static final boolean ENCODE_TID_IN_HEADER = Boolean.getBoolean("wire.encodeTidInHeader");
private static final int TID_MASK = 0b00111111_11111111_11111111_11111111;
private static final int INVERSE_TID_MASK = ~TID_MASK;

// value to use when the message is not ready and of an unknown length
public static final int NOT_COMPLETE_UNKNOWN_LENGTH = NOT_COMPLETE | UNKNOWN_LENGTH;
Expand Down Expand Up @@ -189,8 +190,8 @@ public static StringBuilder acquireStringBuilder() {
}

public static int lengthOf(int len) {
if (isNotComplete(len) && ENCODE_PID_IN_HEADER) {
return Wires.removeMaskedPidFromHeader(len) & LENGTH_MASK;
if (isNotComplete(len) && ENCODE_TID_IN_HEADER) {
return Wires.removeMaskedTidFromHeader(len) & LENGTH_MASK;
}
final int len0 = len & LENGTH_MASK;
// if (len0 > 1 << 20)
Expand Down Expand Up @@ -391,16 +392,16 @@ public static void reset(@NotNull Object o) {
wm.reset(o);
}

public static int addMaskedPidToHeader(final int header) {
return ENCODE_PID_IN_HEADER ? header | (PID_MASK & OS.getProcessId()) : header;
public static int addMaskedTidToHeader(final int header) {
return ENCODE_TID_IN_HEADER ? header | (TID_MASK & Affinity.getThreadId()) : header;
}

public static int removeMaskedPidFromHeader(final int header) {
return header & INVERSE_PID_MASK;
public static int removeMaskedTidFromHeader(final int header) {
return header & INVERSE_TID_MASK;
}

public static int extractPidFromHeader(final int header) {
return header & PID_MASK;
public static int extractTidFromHeader(final int header) {
return header & TID_MASK;
}

static final ClassLocal<Function<String, Marshallable>> MARSHALLABLE_FUNCTION = ClassLocal.withInitial(tClass -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MethodId;
import net.openhft.chronicle.bytes.MethodReader;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class VanillaWireParserTest {

@Test
public void shouldDetermineMethodNamesFromMethodIds() {
final BinaryWire wire = new BinaryWire(Bytes.allocateElasticDirect());
final Speaker speaker =
wire.methodWriterBuilder(Speaker.class).useMethodIds(true).build();
speaker.say("hello");

final MethodReader reader = new VanillaMethodReaderBuilder(wire).build(impl());
assertTrue(reader.readOne());
}

interface Speaker {
@MethodId(7)
void say(final String message);
}

interface Listener {
void hear(final String message);
}

private static Listener impl() {
return m -> {};
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package net.openhft.chronicle.wire.monitoring;

import net.openhft.affinity.Affinity;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.NativeBytes;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
Expand All @@ -21,16 +21,18 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;

@RunWith(Parameterized.class)
public final class StoreWritingProcessInHeaderTest {

private final NativeBytes<Void> bytes;
private final Wire wire;
private final WireType wireType;

public StoreWritingProcessInHeaderTest(final String name, final WireType wireType) {
bytes = Bytes.allocateElasticDirect();
wire = wireType.apply(bytes);
this.wireType = wireType;
}

@Parameterized.Parameters(name = "{0}")
Expand All @@ -40,24 +42,39 @@ public static Object[][] parameters() {

@Test
public void shouldEncodePid() {
final int pid = OS.getProcessId();
final int headerWithPid = Wires.addMaskedPidToHeader(Wires.NOT_COMPLETE_UNKNOWN_LENGTH);
final int tid = Affinity.getThreadId();
final int headerWithTid = Wires.addMaskedTidToHeader(Wires.NOT_COMPLETE_UNKNOWN_LENGTH);

assertThat(Wires.isNotComplete(headerWithPid), is(true));
assertThat(headerWithPid, is(not(Wires.NOT_COMPLETE_UNKNOWN_LENGTH)));
assertThat(Wires.extractPidFromHeader(headerWithPid), is(pid));
assertThat(Wires.removeMaskedPidFromHeader(headerWithPid), is(Wires.NOT_COMPLETE_UNKNOWN_LENGTH));
assertThat(Wires.isNotComplete(headerWithTid), is(true));
assertThat(headerWithTid, is(not(Wires.NOT_COMPLETE_UNKNOWN_LENGTH)));
assertThat(Wires.extractTidFromHeader(headerWithTid), is(tid));
assertThat(Wires.removeMaskedTidFromHeader(headerWithTid), is(Wires.NOT_COMPLETE_UNKNOWN_LENGTH));
}

@Test
public void shouldStoreWritingProcessIdInHeader() throws TimeoutException, EOFException {
final long position = wire.writeHeaderOfUnknownLength(1, TimeUnit.SECONDS, null, null);

final int header = wire.bytes().readVolatileInt(position);
assertThat(Wires.isNotComplete(header), is(true));
assertThat(header, is(Wires.addMaskedPidToHeader(Wires.NOT_COMPLETE_UNKNOWN_LENGTH)));
assertThat(Wires.removeMaskedPidFromHeader(header), is(Wires.NOT_COMPLETE_UNKNOWN_LENGTH));
assertThat(Wires.extractPidFromHeader(header), is(OS.getProcessId()));
assertThat(header, is(Wires.addMaskedTidToHeader(Wires.NOT_COMPLETE_UNKNOWN_LENGTH)));
assertThat(Wires.removeMaskedTidFromHeader(header), is(Wires.NOT_COMPLETE_UNKNOWN_LENGTH));
assertThat(Wires.extractTidFromHeader(header), is(Affinity.getThreadId()));
}

@Test
public void shouldWorkWithMetaDataEntries() throws TimeoutException, EOFException {
assumeTrue(wireType != WireType.READ_ANY);

final long position = wire.writeHeaderOfUnknownLength(1, TimeUnit.SECONDS, null, null);
final int header = wire.bytes().readVolatileInt(position);
// simulate meta-data indicator in header
wire.bytes().writeInt(position, header | Wires.META_DATA);
final int updatedHeader = wire.bytes().readVolatileInt(position);
assertThat(Wires.isNotComplete(updatedHeader), is(true));
assertThat(updatedHeader, is(Wires.addMaskedTidToHeader(Wires.NOT_COMPLETE_UNKNOWN_LENGTH | Wires.META_DATA)));
assertThat(Wires.removeMaskedTidFromHeader(updatedHeader), is(Wires.NOT_COMPLETE_UNKNOWN_LENGTH | Wires.META_DATA));
assertThat(Wires.extractTidFromHeader(updatedHeader), is(Affinity.getThreadId()));
assertThat(Wires.isNotComplete(updatedHeader), is(true));
}

@After
Expand All @@ -67,12 +84,12 @@ public void tearDown() {

@BeforeClass
public static void enableFeature() {
System.setProperty("wire.encodePidInHeader", Boolean.TRUE.toString());
System.setProperty("wire.encodeTidInHeader", Boolean.TRUE.toString());
}

@AfterClass
public static void disableFeature() {
System.setProperty("wire.encodePidInHeader", Boolean.FALSE.toString());
System.setProperty("wire.encodeTidInHeader", Boolean.FALSE.toString());
}

private static Object[][] toParams(final WireType[] values) {
Expand Down

0 comments on commit c9344db

Please sign in to comment.