Skip to content

Commit

Permalink
UnclosableOutputStream passes through BufferWritableOutputStream (#2415)
Browse files Browse the repository at this point in the history
UnclosableOutputStream passes through BufferWritableOutputStream
  • Loading branch information
carterkozak authored Nov 20, 2024
1 parent a29bb3a commit 1c95b69
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 74 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2415.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: UnclosableOutputStream passes through BufferWritableOutputStream
links:
- https://github.com/palantir/conjure-java/pull/2415
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void serialize(BinaryResponseBody value, HttpServerExchange exchange) thr
// understood by clients. try-with-resource ends up calling OutputStream.close before the
// exception handler is invoked, which tells the server (and then client) that the response bytes
// have been fully sent successfully.
value.write(new UnclosableOutputStream(exchange.getOutputStream()));
value.write(UnclosableOutputStreams.wrap(exchange.getOutputStream()));
} finally {
Tracer.fastCompleteSpan(SerializeBinaryTagTranslator.INSTANCE, SerializeBinaryTagTranslator.INSTANCE);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.conjure.java.undertow.runtime;

import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeIoException;
import io.undertow.io.BufferWritableOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/** Helper stream used in {@link ConjureBodySerDe} to make error propagation easier. */
final class UnclosableOutputStreams {

/**
* Wraps the provided {@link OutputStream} such that {@link OutputStream#close()} is a no-op.
* This implementation ensures that streams like {@code UndertowOutputStream} are wrapped in a
* way that preserves the {@link BufferWritableOutputStream} interface.
*/
static OutputStream wrap(OutputStream delegate) {
if (delegate instanceof BufferWritableOutputStream) {
return new UnclosableBufferWritableOutputStream(delegate, (BufferWritableOutputStream) delegate);
} else {
return new UnclosableOutputStream(delegate);
}
}

private static class UnclosableOutputStream extends OutputStream {

private final OutputStream delegate;
private boolean closeCalled;

UnclosableOutputStream(OutputStream delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "Delegate is required");
}

@Override
public final void write(int value) throws IOException {
assertOpen();
delegate.write(value);
}

@Override
public final void write(byte[] buffer) throws IOException {
assertOpen();
delegate.write(buffer);
}

@Override
public final void write(byte[] buffer, int off, int len) throws IOException {
assertOpen();
delegate.write(buffer, off, len);
}

@Override
public final void flush() throws IOException {
delegate.flush();
}

@Override
public final void close() {
closeCalled = true;
}

/** Asserts {@link #close()} has not been called. */
protected final void assertOpen() throws IOException {
if (closeCalled) {
throw new SafeIoException("Stream is closed");
}
}

@Override
public final String toString() {
return getClass().getSimpleName() + '{' + delegate + '}';
}
}

private static final class UnclosableBufferWritableOutputStream extends UnclosableOutputStream
implements BufferWritableOutputStream {
private final BufferWritableOutputStream bufferWritable;

UnclosableBufferWritableOutputStream(OutputStream outputStream, BufferWritableOutputStream bufferWritable) {
super(outputStream);
this.bufferWritable = bufferWritable;
}

@Override
public void write(ByteBuffer[] buffers) throws IOException {
assertOpen();
bufferWritable.write(buffers);
}

@Override
public void write(ByteBuffer byteBuffer) throws IOException {
assertOpen();
bufferWritable.write(byteBuffer);
}

@Override
public void transferFrom(FileChannel source) throws IOException {
assertOpen();
bufferWritable.transferFrom(source);
}
}

private UnclosableOutputStreams() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package com.palantir.conjure.java.undertow.runtime;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.undertow.io.BufferWritableOutputStream;
import io.undertow.io.UndertowOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

Expand All @@ -28,7 +32,7 @@ class UnclosableOutputStreamTest {
@Test
void testClosure() throws IOException {
OutputStream delegate = Mockito.mock(OutputStream.class);
UnclosableOutputStream stream = new UnclosableOutputStream(delegate);
OutputStream stream = UnclosableOutputStreams.wrap(delegate);
stream.write(1);
Mockito.verify(delegate).write(1);
// Close the unclosable stream
Expand All @@ -42,4 +46,26 @@ void testClosure() throws IOException {
Mockito.verify(delegate).flush();
Mockito.verifyNoMoreInteractions(delegate);
}

@Test
void testBufferWriteableClosure() throws IOException {
OutputStream delegate = Mockito.mock(UndertowOutputStream.class);
OutputStream stream = UnclosableOutputStreams.wrap(delegate);
stream.write(1);
Mockito.verify(delegate).write(1);
// Close the unclosable stream
stream.close();
// Closure mustn't be passed through
Mockito.verify(delegate, Mockito.never()).close();
// Writes to the wrapper must fail because that "view" is closed
assertThat(stream)
.isInstanceOfSatisfying(BufferWritableOutputStream.class, bufferWritable -> assertThatThrownBy(
() -> bufferWritable.write(ByteBuffer.wrap(new byte[1])))
.isInstanceOf(IOException.class));

// Flush may still be passed through
stream.flush();
Mockito.verify(delegate).flush();
Mockito.verifyNoMoreInteractions(delegate);
}
}

0 comments on commit 1c95b69

Please sign in to comment.