Skip to content

Commit c3edd76

Browse files
committed
Add flush method to writer
1 parent b2b17ac commit c3edd76

File tree

2 files changed

+99
-6
lines changed

2 files changed

+99
-6
lines changed

writer.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,33 @@ func (w *Writer) spawn(f func()) {
548548
}()
549549
}
550550

551+
// Flush writes all currently buffered messages to the kafka cluster. This will
552+
// block until all messages in the batch has been written to kafka, or until the
553+
// context is canceled.
554+
func (w *Writer) Flush(ctx context.Context) error {
555+
w.mutex.Lock()
556+
557+
// flush all writers
558+
for _, writer := range w.writers {
559+
writer.flush()
560+
}
561+
562+
w.mutex.Unlock()
563+
done := make(chan struct{})
564+
565+
go func() {
566+
w.group.Wait()
567+
close(done)
568+
}()
569+
570+
select {
571+
case <-done:
572+
return nil
573+
case <-ctx.Done():
574+
return ctx.Err()
575+
}
576+
}
577+
551578
// Close flushes pending writes, and waits for all writes to complete before
552579
// returning. Calling Close also prevents new writes from being submitted to
553580
// the writer, further calls to WriteMessages and the like will fail with
@@ -1184,7 +1211,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
11841211
batch.complete(err)
11851212
}
11861213

1187-
func (ptw *partitionWriter) close() {
1214+
func (ptw *partitionWriter) flush() {
11881215
ptw.mutex.Lock()
11891216
defer ptw.mutex.Unlock()
11901217

@@ -1194,7 +1221,10 @@ func (ptw *partitionWriter) close() {
11941221
ptw.currBatch = nil
11951222
batch.trigger()
11961223
}
1224+
}
11971225

1226+
func (ptw *partitionWriter) close() {
1227+
ptw.flush()
11981228
ptw.queue.Close()
11991229
}
12001230

writer_test.go

+68-5
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ func TestWriter(t *testing.T) {
105105
scenario: "closing a writer right after creating it returns promptly with no error",
106106
function: testWriterClose,
107107
},
108-
109108
{
110109
scenario: "writing 1 message through a writer using round-robin balancing produces 1 message to the first partition",
111110
function: testWriterRoundRobin1,
@@ -130,6 +129,10 @@ func TestWriter(t *testing.T) {
130129
scenario: "writing a batch of messages",
131130
function: testWriterBatchSize,
132131
},
132+
{
133+
scenario: "writing and flushing a batch of messages",
134+
function: testWriterBatchSize,
135+
},
133136

134137
{
135138
scenario: "writing messages with a small batch byte size",
@@ -450,7 +453,7 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e
450453
}
451454
}
452455

453-
func testWriterBatchBytes(t *testing.T) {
456+
func tetsWriterFlush(t *testing.T) {
454457
topic := makeTopic()
455458
createTopic(t, topic, 1)
456459
defer deleteTopic(t, topic)
@@ -461,9 +464,11 @@ func testWriterBatchBytes(t *testing.T) {
461464
}
462465

463466
w := newTestWriter(WriterConfig{
464-
Topic: topic,
465-
BatchBytes: 50,
466-
BatchTimeout: math.MaxInt32 * time.Second,
467+
Topic: topic,
468+
// Set the batch timeout to a large value to avoid the timeout
469+
BatchSize: 1000,
470+
BatchBytes: 1000000,
471+
BatchTimeout: 1000 * time.Second,
467472
Balancer: &RoundRobin{},
468473
})
469474
defer w.Close()
@@ -480,6 +485,11 @@ func testWriterBatchBytes(t *testing.T) {
480485
return
481486
}
482487

488+
if err := w.Flush(ctx); err != nil {
489+
t.Errorf("flush error %v", err)
490+
return
491+
}
492+
483493
if w.Stats().Writes != 2 {
484494
t.Error("didn't create expected batches")
485495
return
@@ -503,6 +513,59 @@ func testWriterBatchBytes(t *testing.T) {
503513
}
504514
}
505515

516+
func testWriterBatchBytes(t *testing.T) {
517+
topic := makeTopic()
518+
createTopic(t, topic, 1)
519+
defer deleteTopic(t, topic)
520+
521+
offset, err := readOffset(topic, 0)
522+
if err != nil {
523+
t.Fatal(err)
524+
}
525+
526+
w := newTestWriter(WriterConfig{
527+
Topic: topic,
528+
BatchBytes: 50,
529+
BatchTimeout: math.MaxInt32 * time.Second,
530+
Balancer: &RoundRobin{},
531+
})
532+
defer w.Close()
533+
534+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
535+
defer cancel()
536+
if err := w.WriteMessages(ctx, []Message{
537+
{Value: []byte("M0")},
538+
{Value: []byte("M1")},
539+
{Value: []byte("M2")},
540+
{Value: []byte("M3")},
541+
}...); err != nil {
542+
t.Error(err)
543+
return
544+
}
545+
546+
if w.Stats().Writes != 1 {
547+
t.Error("didn't create expected batches")
548+
return
549+
}
550+
msgs, err := readPartition(topic, 0, offset)
551+
if err != nil {
552+
t.Error("error reading partition", err)
553+
return
554+
}
555+
556+
if len(msgs) != 4 {
557+
t.Error("bad messages in partition", msgs)
558+
return
559+
}
560+
561+
for i, m := range msgs {
562+
if string(m.Value) == "M"+strconv.Itoa(i) {
563+
continue
564+
}
565+
t.Error("bad messages in partition", string(m.Value))
566+
}
567+
}
568+
506569
func testWriterBatchSize(t *testing.T) {
507570
topic := makeTopic()
508571
createTopic(t, topic, 1)

0 commit comments

Comments
 (0)