-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8ae6b5b
commit fd8ed4b
Showing
6 changed files
with
120 additions
and
2 deletions.
There are no files selected for viewing
33 changes: 33 additions & 0 deletions
33
langchain4j-reactor/src/main/java/dev/langchain4j/reactor/TokenStreamToFluxAdapter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package dev.langchain4j.reactor; | ||
|
||
import dev.langchain4j.service.TokenStream; | ||
import dev.langchain4j.spi.services.TokenStreamAdapter; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Sinks; | ||
|
||
import java.lang.reflect.ParameterizedType; | ||
import java.lang.reflect.Type; | ||
|
||
public class TokenStreamToFluxAdapter implements TokenStreamAdapter { | ||
|
||
@Override | ||
public boolean canAdaptTokenStreamTo(Type type) { | ||
if (type instanceof ParameterizedType parameterizedType) { | ||
if (parameterizedType.getRawType() == Flux.class) { | ||
Type[] typeArguments = parameterizedType.getActualTypeArguments(); | ||
return typeArguments.length == 1 && typeArguments[0] == String.class; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public Object adapt(TokenStream tokenStream) { | ||
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); | ||
tokenStream.onNext(sink::tryEmitNext) | ||
.onComplete(aiMessageResponse -> sink.tryEmitComplete()) | ||
.onError(sink::tryEmitError) | ||
.start(); | ||
return sink.asFlux(); | ||
} | ||
} |
1 change: 1 addition & 0 deletions
1
...ctor/src/main/resources/META-INF/services/dev.langchain4j.spi.services.TokenStreamAdapter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
dev.langchain4j.reactor.TokenStreamToFluxAdapter |
39 changes: 39 additions & 0 deletions
39
langchain4j-reactor/src/test/java/dev/langchain4j/reactor/AiServiceWithFluxTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package dev.langchain4j.reactor; | ||
|
||
import dev.langchain4j.model.chat.StreamingChatLanguageModel; | ||
import dev.langchain4j.model.chat.mock.StreamingChatModelMock; | ||
import dev.langchain4j.service.AiServices; | ||
import org.junit.jupiter.api.Test; | ||
import reactor.core.publisher.Flux; | ||
import reactor.test.StepVerifier; | ||
|
||
import java.util.List; | ||
|
||
public class AiServiceWithFluxTest { | ||
|
||
interface Assistant { | ||
|
||
Flux<String> stream(String userMessage); | ||
} | ||
|
||
@Test | ||
void should_stream() { | ||
|
||
// given | ||
List<String> tokens = List.of("The", " capital", " of", " Germany", " is", " Berlin", "."); | ||
|
||
StreamingChatLanguageModel model = StreamingChatModelMock.thatAlwaysStreams(tokens); | ||
|
||
Assistant assistant = AiServices.builder(Assistant.class) | ||
.streamingChatLanguageModel(model) | ||
.build(); | ||
|
||
// when | ||
Flux<String> flux = assistant.stream("What is the capital of Germany?"); | ||
|
||
// then | ||
StepVerifier.create(flux) | ||
.expectNextSequence(tokens) | ||
.verifyComplete(); | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
langchain4j-reactor/src/test/java/dev/langchain4j/reactor/TokenStreamToFluxAdapterTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package dev.langchain4j.reactor; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import reactor.core.publisher.Flux; | ||
|
||
import java.lang.reflect.Type; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
class TokenStreamToFluxAdapterTest { | ||
|
||
interface Assistant { | ||
|
||
Flux<String> fluxOfString(); | ||
|
||
Flux flux(); | ||
|
||
Flux<Object> fluxOfObject(); | ||
} | ||
|
||
@Test | ||
void test_canAdapt() { | ||
|
||
TokenStreamToFluxAdapter adapter = new TokenStreamToFluxAdapter(); | ||
|
||
assertThat(adapter.canAdaptTokenStreamTo(getReturnTypeOfMethod("fluxOfString"))).isTrue(); | ||
|
||
assertThat(adapter.canAdaptTokenStreamTo(getReturnTypeOfMethod("flux"))).isFalse(); | ||
assertThat(adapter.canAdaptTokenStreamTo(getReturnTypeOfMethod("fluxOfObject"))).isFalse(); | ||
} | ||
|
||
private static Type getReturnTypeOfMethod(String methodName) { | ||
try { | ||
return Assistant.class.getDeclaredMethod(methodName).getGenericReturnType(); | ||
} catch (NoSuchMethodException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters