Skip to content

Commit

Permalink
Removed all function type classes
Browse files Browse the repository at this point in the history
They were redundent and not actually needed so they were removed in
favor of other classes.

Other misc updates were also implemented like proper shutdown of the
manager and some changes to testing that still need to be finished.
  • Loading branch information
ShadowLordAlpha committed Sep 15, 2017
1 parent 836a588 commit 275c890
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 131 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
apply plugin: 'java'
apply plugin: 'eclipse'


// Repositories, where to find dependencies
repositories {
jcenter()
Expand Down
23 changes: 17 additions & 6 deletions src/main/java/com/shadowcs/themis/EventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

public class EventManager {
public class EventManager implements AutoCloseable {

private ExecutorService eService;
private LoadingCache<Class<?>, Collection<Consumer<?>>> lCache;
Expand All @@ -31,14 +32,17 @@ public <V> EventManager removeListener(Consumer<V> listener, Class<V> klass) {

// TODO: Submit, returns a future
public <T> Future<T> submit(T event) {

return eService.submit(() -> event(event), event);
}

// TODO: Execute, returns nothing
public <T> void execute(T event) {

eService.execute(() -> event(event));

// For some reason calling execute on the pool actually causes an error sometimes
// so better to submit the task and just not bother returning the future as its basically
// the same thing in this context
submit(event);
}

// TODO: Invoke, blocking run on current thread
Expand All @@ -56,7 +60,14 @@ private <T> void event(T event) {
});
}

public static void main(String[] args) {

@Override
public void close() {
eService.shutdown();
try {
eService.awaitTermination(30, TimeUnit.SECONDS);
} catch(InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
*/
package com.shadowcs.themis.util.concurrent;

import com.shadowcs.themis.util.function.Procedure;

/**
* An object that creates new threads on demand with a procedure that is run at some point before the first task takes
* place.
Expand All @@ -30,17 +28,17 @@
*/
public class ProcedureThreadFactory extends DefaultThreadFactory {

private Procedure procedure;
private Runnable procedure;

public ProcedureThreadFactory(Procedure procedure) {
public ProcedureThreadFactory(Runnable procedure) {
super();
this.procedure = procedure;
}

public Thread newThread(Runnable r) {
return super.newThread(() -> {
if(procedure != null) {
procedure.invoke();
procedure.run();
}
r.run();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import com.shadowcs.themis.util.function.Procedure;
import java.util.concurrent.atomic.AtomicInteger;

/**
* An object that takes already started and provided threads and allows them to be used as part of a thread pool without
Expand All @@ -36,16 +36,17 @@
* @author Josh "ShadowLordAlpha"
*
*/
public class ProvidedThreadFactory implements ThreadFactory {
public class ProvidedThreadFactory implements ThreadFactory, AutoCloseable {

private AtomicInteger count = new AtomicInteger();
private Queue<ThreadWork> queue = new LinkedBlockingQueue<ThreadWork>();

public ProvidedThreadFactory() {

}

// just a cleaner cede
public void cede() throws InterruptedException {
public void cede() {
cede(null); // does nothing
}

Expand All @@ -54,23 +55,35 @@ public void cede() throws InterruptedException {
* after the thread is first added to the thread waiting queue. All this is for is making sure that when a
* ProvidedThreadFactory has successfully hijacked a thread another thread is able to start. A Procedure is always
* started in a throw away thread. This means that the procedure may go on forever without affecting the provided
* thread as it is running on a newly created thread.
* thread as it is running on a newly created thread. If the provided thread is interrupted it is not recycled. Instead the
* exception is caught and the method is returned. It is assumed that the interrupt is expected behavior. If not simply recalling
* cede on the thread (though a loop in the calling method) will basically work the same.
*
* @param procedure
* @throws InterruptedException
*/
public void cede(Procedure procedure) throws InterruptedException {
public void cede(Runnable procedure) {
count.incrementAndGet();
ThreadWork work = new ThreadWork();
if(queue.offer(work)) {
if(procedure != null) {
new Thread(() -> procedure.invoke()).run();
new Thread(procedure, "procedure-cede").start(); // TODO: probably not name the thread
}

do {
work.await();
try {
work.await();
} catch(InterruptedException e) {
// Thread interrupted so we simply leave
// This is expected to be proper behavior
// e.printStackTrace();
count.decrementAndGet();
return;
}
} while(queue.offer(work));
}

count.decrementAndGet();
throw new RuntimeException("Failed to cede/recycle thread!");
}

Expand All @@ -80,6 +93,16 @@ public Thread newThread(Runnable r) {

return null;
}

@Override
public void close() {
Runnable work = () -> Thread.currentThread().interrupt();

// I might not need count but it works well for now
for(int i = 0; i < count.get(); i++) {
newThread(work).start();
}
}

private class ThreadWork {

Expand Down
57 changes: 0 additions & 57 deletions src/main/java/com/shadowcs/themis/util/function/Procedure.java

This file was deleted.

This file was deleted.

27 changes: 0 additions & 27 deletions src/main/java/com/shadowcs/themis/util/function/package-info.java

This file was deleted.

8 changes: 1 addition & 7 deletions src/test/java/com/shadowcs/themis/test/CucumberStepDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.shadowcs.themis.util.concurrent.ProcedureThreadFactory;
import com.shadowcs.themis.util.concurrent.ProvidedThreadFactory;
import com.shadowcs.themis.util.concurrent.SimpleThreadFactory;
import cucumber.api.PendingException;
import cucumber.api.java.en.Given;
import cucumber.api.java.en.Then;

Expand All @@ -35,12 +34,7 @@ public void createThreadFactory(ThreadFactoryType tfType) throws Throwable {
tFactory = new ProvidedThreadFactory();
Semaphore sema = new Semaphore(0);
new Thread(() -> {
try {
((ProvidedThreadFactory) tFactory).cede(() -> sema.release(1));
} catch(InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
((ProvidedThreadFactory) tFactory).cede(() -> sema.release(1));
}).start();

// YES, you do have to wait for the factory to be ready!
Expand Down

0 comments on commit 275c890

Please sign in to comment.