Skip to content

Commit

Permalink
Merge pull request #6 from Nuvindu/reactive
Browse files Browse the repository at this point in the history
Apply Asynchronous Behavior to the Pipe Package
  • Loading branch information
Nuvindu authored Aug 16, 2022
2 parents e766530 + 9002bda commit f9ae01b
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 118 deletions.
4 changes: 2 additions & 2 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
org = "nuvindu"
name = "pipe"
version = "0.1.2"
version = "0.1.3"
authors = ["Nuvindu"]
keywords = ["pipe"]
repository = "https://github.com/Nuvindu/module-pipe"
license = ["Apache-2.0"]
distribution = "2201.0.1"

[[platform.java11.dependency]]
path = "../native/build/libs/pipe-native-0.1.2.jar"
path = "../native/build/libs/pipe-native-0.1.3-SNAPSHOT.jar"
17 changes: 15 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,27 @@ modules = [
{org = "ballerina", packageName = "test", moduleName = "test"}
]

[[package]]
org = "ballerina"
name = "time"
version = "2.2.2"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
modules = [
{org = "ballerina", packageName = "time", moduleName = "time"}
]

[[package]]
org = "nuvindu"
name = "pipe"
version = "0.1.2"
version = "0.1.3"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
{org = "ballerina", name = "test"}
{org = "ballerina", name = "test"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "nuvindu", packageName = "pipe", moduleName = "pipe"}
Expand Down
Binary file added ballerina/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 15 additions & 12 deletions ballerina/pipe.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import ballerina/jballerina.java;

# Consists of APIs to exchange events concurrently.
public isolated class Pipe {
private handle javaPipeObject;
private handle nativePipeObject;

# Creates a new `pipe:Pipe` instance.
#
# + 'limit - The maximum number of entries that are held in the pipe at once
public isolated function init(int 'limit) {
self.javaPipeObject = newPipe('limit);
# + timer - The timer that used to keep track of time to notify the timeouts in APIs
public isolated function init(int 'limit, Timer? timer = ()) {
if timer is Timer {
self.nativePipeObject = newPipeWithTimer('limit, timer);
} else {
self.nativePipeObject = newPipe('limit);
}
}

# Produces events into the pipe.
Expand Down Expand Up @@ -63,26 +68,24 @@ public isolated class Pipe {
# + return - Return `()`, if the pipe is successfully closed. Otherwise returns a `pipe:Error`
public isolated function immediateClose() returns Error? {
lock {
check immediateClose(self.javaPipeObject);
check immediateClose(self.nativePipeObject);
}
}

# Closes the pipe gracefully. Waits for some grace period until all the events in the pipe is consumed.
#
# + timeout - The maximum grace period to wait until the pipe is empty. The default timeout is thirty seconds
# + return - Return `()`, if the pipe is successfully closed. Otherwise returns a `pipe:Error`
public isolated function gracefulClose(decimal timeout = 30) returns Error? {
lock {
check gracefulClose(self.javaPipeObject, timeout);
}
}
public isolated function gracefulClose(decimal timeout = 30) returns Error? = @java:Method {
'class: "org.nuvindu.pipe.Pipe"
} external;

# Checks whether the pipe is closed.
#
# + return - Returns `true`, if the pipe is closed. Otherwise returns `false`
public isolated function isClosed() returns boolean {
lock {
return isClosed(self.javaPipeObject);
return isClosed(self.nativePipeObject);
}
}
}
Expand All @@ -91,11 +94,11 @@ isolated function newPipe(int 'limit) returns handle = @java:Constructor {
'class: "org.nuvindu.pipe.Pipe"
} external;

isolated function immediateClose(handle pipe) returns Error? = @java:Method {
isolated function newPipeWithTimer(int 'limit, Timer timer) returns handle = @java:Constructor {
'class: "org.nuvindu.pipe.Pipe"
} external;

isolated function gracefulClose(handle pipe, decimal timeout) returns Error? = @java:Method {
isolated function immediateClose(handle pipe) returns Error? = @java:Method {
'class: "org.nuvindu.pipe.Pipe"
} external;

Expand Down
45 changes: 44 additions & 1 deletion ballerina/tests/errors_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// under the License.

import ballerina/test;
import ballerina/time;

@test:Config {
groups: ["errors"]
Expand Down Expand Up @@ -67,16 +68,19 @@ function testImmediateClosingOfClosedPipe() returns error? {
}

@test:Config {
groups: ["errors","close"]
groups: ["errors", "close"]
}
function testGracefulClosingOfClosedPipe() returns error? {
Pipe pipe = new (1);
string expectedValue = "Closing of a closed pipe is not allowed.";
time:Utc currentUtc = time:utcNow();
check pipe.gracefulClose();
Error? gracefulCloseResult = pipe.gracefulClose();
test:assertTrue(gracefulCloseResult is Error);
string actualValueValue = (<error>gracefulCloseResult).message();
test:assertEquals(actualValueValue, expectedValue);
int val = time:utcNow()[0] - currentUtc[0];
test:assertTrue(val < 30);
}

@test:Config {
Expand All @@ -93,3 +97,42 @@ function testClosingOfClosedStreamInPipe() returns error? {
string actualValue = (<error>close).message();
test:assertEquals(actualValue, expectedValue);
}

@test:Config {
groups: ["main_apis"]
}
function testErrorsInPipesWithTimer() returns error? {
Timer timeKeeper = new();
Pipe timerPipe = new (1, timeKeeper);
Pipe timerPipe2 = new (1, timeKeeper);
Pipe timerPipe3 = new(5, timeKeeper);
string expectedError = "Operation has timed out.";

worker A {
Error? produce = timerPipe.produce("data1", timeout = 1);
test:assertTrue(produce !is Error);
Error? result1 = timerPipe.produce("data2", timeout = 1);
test:assertTrue(result1 is Error);
string actualValue1 = (<error>result1).message();
test:assertEquals(actualValue1, expectedError);
}

@strand {
thread: "any"
}
worker B {
string|Error result2 = timerPipe2.consume(timeout = 1);
test:assertTrue(result2 is Error);
string actualValue2 = (<error>result2).message();
test:assertEquals(actualValue2, expectedError);
}

worker C {
string expectedValue = "pipe_test";
Error? produce = timerPipe3.produce(expectedValue, timeout = 2);
test:assertTrue(produce !is Error);
string|Error actualValue3 = timerPipe3.consume(5);
test:assertTrue(actualValue3 is Error);
test:assertEquals(actualValue3, expectedValue);
}
}
98 changes: 95 additions & 3 deletions ballerina/tests/pipe_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
// specific language governing permissions and limitations
// under the License.

import ballerina/test;
import ballerina/lang.runtime;
import ballerina/test;
import ballerina/time;

@test:Config {
groups: ["main_apis"]
Expand All @@ -25,7 +26,7 @@ function testPipe() returns error? {
check pipe.produce("pipe_test", timeout = 2);
string actualValue = check pipe.consume(5);
string expectedValue = "pipe_test";
test:assertEquals(expectedValue, actualValue);
test:assertEquals(actualValue, expectedValue);
}

@test:Config {
Expand Down Expand Up @@ -86,7 +87,7 @@ function testImmediateClose() returns error? {
function testGracefulClose() returns error? {
Pipe pipe = new(5);
check pipe.produce("1", timeout = 5);
check pipe.gracefulClose();
check pipe.gracefulClose(timeout = 5);
string expectedValue = "Events cannot be produced to a closed pipe.";
Error? actualValue = pipe.produce("1", timeout = 5);
test:assertTrue(actualValue is Error);
Expand All @@ -99,11 +100,14 @@ function testGracefulClose() returns error? {
function testIsClosedInPipe() returns error? {
Pipe pipe = new(5);
test:assertTrue(!pipe.isClosed());
time:Utc currentUtc = time:utcNow();
check pipe.gracefulClose();
test:assertTrue(pipe.isClosed());
Pipe newPipe = new(5);
check newPipe.immediateClose();
test:assertTrue(pipe.isClosed());
int val = time:utcNow()[0] - currentUtc[0];
test:assertTrue(val < 30);
}

@test:Config {
Expand All @@ -112,6 +116,7 @@ function testIsClosedInPipe() returns error? {
function testWaitingInGracefulClose() returns error? {
Pipe pipe = new(5);
int expectedValue = 1;
time:Utc currentUtc = time:utcNow();
check pipe.produce(expectedValue, timeout = 5.00111);
worker A {
runtime:sleep(5);
Expand All @@ -132,6 +137,8 @@ function testWaitingInGracefulClose() returns error? {
worker B {
Error? close = pipe.gracefulClose();
test:assertTrue(close !is Error);
int val = time:utcNow()[0] - currentUtc[0];
test:assertTrue(val < 30);
}
}

Expand Down Expand Up @@ -186,3 +193,88 @@ function testWaitingInProduce() returns error? {
test:assertEquals(actualValue, expectedValue);
}
}

@test:Config {
groups: ["main_apis"]
}
function testConcurrencyInPipe() returns error? {
Pipe pipe = new(1);
int expectedValue = 3;
int workerCount = 0;
worker A {
runtime:sleep(1);
Error? produce = pipe.produce(expectedValue, 5);
test:assertTrue(produce !is Error);
workerCount+=1;
}

@strand {
thread: "any"
}
worker B {
int|Error actualValue = pipe.consume(6);
test:assertTrue(actualValue !is Error);
test:assertEquals(actualValue, expectedValue);
workerCount+=1;
}

worker C {
runtime:sleep(1);
int|Error actualValue = pipe.consume(2);
test:assertTrue(actualValue is Error);
workerCount+=1;
}

worker D {
runtime:sleep(1);
int|Error actualValue = pipe.consume(2);
test:assertTrue(actualValue is Error);
workerCount+=1;
}

worker E {
runtime:sleep(8);
test:assertEquals(workerCount, 4);
}
}

@test:Config {
groups: ["main_apis"]
}
function testPipesWithTimer() returns error? {
Timer timeKeeper = new();

Pipe timerPipe = new(5, timeKeeper);
Pipe timerPipe2 = new(5, timeKeeper);
Pipe timerPipe3 = new(5, timeKeeper);
string expectedValue = "pipe_test";

worker A {
runtime:sleep(1);
Error? produce = timerPipe.produce("pipe_test", timeout = 2);
test:assertTrue(produce !is Error);
string|Error actualValue1 = timerPipe.consume(5);
test:assertTrue(actualValue1 is Error);
test:assertEquals(actualValue1, expectedValue);
}

@strand {
thread: "any"
}
worker B {
Error? produce = timerPipe2.produce("pipe_test", timeout = 2);
test:assertTrue(produce !is Error);
string|Error actualValue2 = timerPipe2.consume(5);
test:assertTrue(actualValue2 is Error);
test:assertEquals(actualValue2, expectedValue);
}

worker C {
runtime:sleep(1);
Error? produce = timerPipe3.produce("pipe_test", timeout = 2);
test:assertTrue(produce !is Error);
string|Error actualValue3 = timerPipe3.consume(5);
test:assertTrue(actualValue3 is Error);
test:assertEquals(actualValue3, expectedValue);
}
}
31 changes: 31 additions & 0 deletions ballerina/timer.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2022, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/jballerina.java;

# Represents Ballerina Timer class to schedule tasks with a timeout.
public distinct class Timer {
private handle nativeTimerObject;

public function init() {
self.nativeTimerObject = getTimer();
}
}

public isolated function getTimer() returns handle = @java:Constructor {
'class: "java.util.Timer"
} external;

1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ subprojects {
/* Standard libraries */
ballerinaStdLibs "io.ballerina.stdlib:io-ballerina:${project.stdlibIoVersion}"
ballerinaStdLibs "io.ballerina.stdlib:log-ballerina:${project.stdlibLogVersion}"
ballerinaStdLibs "io.ballerina.stdlib:time-ballerina:${project.stdlibTimeVersion}"
ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}"
ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}"
}
Expand Down
2 changes: 1 addition & 1 deletion examples/covid_report/tests/report_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
// specific language governing permissions and limitations
// under the License.

import nuvindu/pipe;
import ballerina/test;
import nuvindu/pipe;

@test:Config {
groups: ["examples", "covid_report"]
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ballerinaGradlePluginVersion=0.14.1
#stdlib dependencies
stdlibIoVersion=1.2.1
stdlibLogVersion=2.2.1
stdlibTimeVersion=2.2.2

# Ballerinax Observer
observeVersion=1.0.3
Expand Down
4 changes: 4 additions & 0 deletions load_tests/pipe_test/src/.devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"image": "ballerina/ballerina-devcontainer:2201.1.1",
"extensions": ["WSO2.ballerina"],
}
Loading

0 comments on commit f9ae01b

Please sign in to comment.