Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent schema parsing and unmarshalling causes nil pointer dereference #481

Open
kfcss opened this issue Dec 10, 2024 · 4 comments
Open

Comments

@kfcss
Copy link

kfcss commented Dec 10, 2024

Hi there.

It seems there is a concurrency bug that can cause nil pointer dereferences.

This GitHub issue first presents the stack trace, then a test case that can (sometimes) cause it and finally a best bet on the root cause. As a bonus, a stack trace against an older version of the library is also presented.

The stack trace

This is the stack trace I get running against v2.27.0.

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x40 pc=0x102fbf314]

goroutine 11352 [running]:
github.com/hamba/avro/v2.(*Field).String(...)
	external/com_github_hamba_avro_v2/schema.go:845
github.com/hamba/avro/v2.(*RecordSchema).String(0x14021ec9c70)
	external/com_github_hamba_avro_v2/schema.go:642 +0x84
github.com/hamba/avro/v2.(*fingerprinter).Fingerprint(0x14021ec9cc0, {0x1033b87a0?, 0x14021ec9c70?})
	external/com_github_hamba_avro_v2/schema.go:291 +0xc4
github.com/hamba/avro/v2.(*RecordSchema).Fingerprint(0x2a5a0?)
	external/com_github_hamba_avro_v2/schema.go:691 +0x3c
github.com/hamba/avro/v2.(*cacheFingerprinter).CacheFingerprint(0x14021ec9cf0, {0x1033be080?, 0x14021ec9c70?}, 0x102fa49d4?)
	external/com_github_hamba_avro_v2/schema.go:336 +0x2a8
github.com/hamba/avro/v2.(*RecordSchema).CacheFingerprint(0x140055e58b8?)
	external/com_github_hamba_avro_v2/schema.go:701 +0x54
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be180, 0x14030677910}, {0x1033c26e0, 0x140000f6e10})
	external/com_github_hamba_avro_v2/codec.go:142 +0x22c
github.com/hamba/avro/v2.decoderOfNullableUnion(0x140055e5df8, {0x1033be140?, 0x14034956840}, {0x1033c25a0, 0x140000f6db0})
	external/com_github_hamba_avro_v2/codec_union.go:183 +0x138
github.com/hamba/avro/v2.createDecoderOfUnion(0x140055e5df8, 0x14034956840, {0x1033c25a0, 0x140000f6db0})
	external/com_github_hamba_avro_v2/codec_union.go:30 +0x1f0
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be140, 0x14034956840}, {0x1033c25a0, 0x140000f6db0})
	external/com_github_hamba_avro_v2/codec.go:154 +0x3f0
github.com/hamba/avro/v2.decoderOfStruct(0x140055e5df8, {0x1033be080?, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
	external/com_github_hamba_avro_v2/codec_record.go:107 +0x534
github.com/hamba/avro/v2.createDecoderOfRecord(0x140055e5df8, {0x1033be080, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
	external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be080, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
	external/com_github_hamba_avro_v2/codec.go:139 +0x528
github.com/hamba/avro/v2.(*frozenConfig).DecoderOf(0x140001ba090, {0x1033be080, 0x1400866bba0}, {0x1033c25a0, 0x1400103cf30})
	external/com_github_hamba_avro_v2/codec.go:75 +0xcc
github.com/hamba/avro/v2.(*Reader).ReadVal(0x1401cc5e540, {0x1033be080, 0x1400866bba0}, {0x1032ed6a0, 0x1400bcc2a80})
	external/com_github_hamba_avro_v2/codec.go:45 +0xd4
github.com/hamba/avro/v2.(*frozenConfig).Unmarshal(0x140001ba090, {0x1033be080, 0x1400866bba0}, {0x14000492380?, 0x0?, 0x0?}, {0x1032ed6a0, 0x1400bcc2a80})
	external/com_github_hamba_avro_v2/config.go:163 +0x98
github.com/hamba/avro/v2.Unmarshal(...)
	external/com_github_hamba_avro_v2/decoder.go:48
<redacted>.Test_Concurrent.func1()
	<redacted>/avrohandler_test.go:81 +0xe8
<redacted>.Test_Concurrent in goroutine 7
	<redacted>/avrohandler_test.go:74 +0x6e8

The test case

I have created a test case that can (sometimes) cause failures. I need to run it multiple times. Since our build system is bazel, I run it with runs_per_test=20.

package debezium

import (
	"math/rand"
	"strconv"
	"strings"
	"sync"
	"testing"

	"github.com/hamba/avro/v2"
	"github.com/stretchr/testify/require"
)

func Test_Concurrent(t *testing.T) {
	kvs := DbzEnvelope[Payload]{
		Before: &Payload{
			Tenant:       "foo",
			Store:        "bar",
			PartitionKey: "baz",
			SortKey:      "qaz",
			Version:      "faz",
			UpdatedAt:    "2024-01-01T00:00:00Z",
			Tags:         &[]*string{toPtr("tag1"), toPtr("tag2")},
			Data:         toPtr("data"),
			Encoding:     toPtr("encoding"),
			Schema:       toPtr(int64(1)),
			LookupKeys:   &[]*string{toPtr("key1"), toPtr("key2")},
		},
		After: &Payload{
			Tenant:       "foo",
			Store:        "bar",
			PartitionKey: "baz",
			SortKey:      "qaz",
			Version:      "faz",
			UpdatedAt:    "2024-01-01T00:00:00Z",
			Tags:         &[]*string{toPtr("tag1"), toPtr("tag2")},
			Data:         toPtr("data"),
			Encoding:     toPtr("encoding"),
			Schema:       toPtr(int64(1)),
			LookupKeys:   &[]*string{toPtr("key1"), toPtr("key2")},
		},
		Source: DbzSource{
			Version:   "test",
			Connector: "ba",
			Name:      "f",
			TsMs:      43,
			TsUs:      nil,
			TsNs:      nil,
			Snapshot:  nil,
			Db:        "asdf",
			Sequence:  nil,
			Schema:    "f",
			Table:     "f",
			TxID:      nil,
			Lsn:       nil,
			Xmin:      nil,
		},
		Op:          "read",
		TsMs:        nil,
		TsUs:        nil,
		TsNs:        nil,
		Transaction: nil,
	}

	schema, err := avro.Parse(genSchema())
	require.NoError(t, err)

	msg, err := avro.Marshal(schema, kvs)
	require.NoError(t, err)

	wg := sync.WaitGroup{}
	for i := 0; i < 20000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			schema, err := avro.Parse(genSchema())
			require.NoError(t, err)

			var kvsEnvelope Envelope[KVPayload]
			err = avro.Unmarshal(schema, msg, &kvsEnvelope)
			require.NoError(t, err)
		}()
	}
	wg.Wait()
}

func toPtr[T any](x T) *T {
	return &x
}

func genSchema() string {
	return strings.ReplaceAll(_schema, "%s", strconv.Itoa(rand.Intn(1000)))
}

const _schema = `
{
  "type": "record",
  "name": "Envelope",
  "namespace": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "tenant",
              "type": "string"
            },
            {
              "name": "store",
              "type": "string"
            },
            {
              "name": "partition_key",
              "type": "string"
            },
            {
              "name": "sort_key",
              "type": "string"
            },
            {
              "name": "version",
              "type": "string"
            },
            {
              "name": "updated_at",
              "type": {
                "type": "string",
                "connect.version": 1,
                "connect.name": "io.debezium.time.ZonedTimestamp"
              }
            },
            {
              "name": "tags",
              "type": [
                "null",
                {
                  "type": "array",
                  "items": [
                    "null",
                    "string"
                  ]
                }
              ],
              "default": null
            },
            {
              "name": "data",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "encoding",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "schema",
              "type": [
                "null",
                "long"
              ],
              "default": null
            },
            {
              "name": "lookup_keys",
              "type": [
                "null",
                {
                  "type": "array",
                  "items": [
                    "null",
                    "string"
                  ]
                }
              ],
              "default": null
            }
          ],
          "connect.name": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.postgresql",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "name": "snapshot",
            "type": [
              {
                "type": "string",
                "connect.version": 1,
                "connect.parameters": {
                  "allowed": "true,last,false,incremental"
                },
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum"
              },
              "null"
            ],
            "default": "false"
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "sequence",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ts_us",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "ts_ns",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "name": "txId",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "lsn",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "xmin",
            "type": [
              "null",
              "long"
            ],
            "default": null
          }
        ],
        "connect.name": "io.debezium.connector.postgresql.Source"
      }
    },
    {
      "name": "transaction",
      "type": [
        "null",
        {
          "type": "record",
          "name": "block",
          "namespace": "event",
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ],
          "connect.version": 1,
          "connect.name": "event.block"
        }
      ],
      "default": null
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "ts_us",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "ts_ns",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ],
  "connect.version": 2,
  "connect.name": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs.Envelope"
}`

type Payload struct {
	Tenant       string     `avro:"tenant"`
	Store        string     `avro:"store"`
	PartitionKey string     `avro:"partition_key"`
	SortKey      string     `avro:"sort_key"`
	Version      string     `avro:"version"`
	UpdatedAt    string     `avro:"updated_at"`
	Tags         *[]*string `avro:"tags"`
	Data         *string    `avro:"data"`
	Encoding     *string    `avro:"encoding"`
	Schema       *int64     `avro:"schema"`
	LookupKeys   *[]*string `avro:"lookup_keys"`
}

type DbzOpType string

type DbzSource struct {
	Version   string  `avro:"version"`
	Connector string  `avro:"connector"`
	Name      string  `avro:"name"`
	TsMs      int64   `avro:"ts_ms"`
	TsUs      *int64  `avro:"ts_us"`
	TsNs      *int64  `avro:"ts_ns"`
	Snapshot  *string `avro:"snapshot"`
	Db        string  `avro:"db"`
	Sequence  *string `avro:"sequence"`
	Schema    string  `avro:"schema"`
	Table     string  `avro:"table"`
	TxID      *int64  `avro:"txId"`
	Lsn       *int64  `avro:"lsn"`
	Xmin      *int64  `avro:"xmin"`
}

type DbzBlock struct {
	ID                  string `avro:"id"`
	TotalOrder          int64  `avro:"total_order"`
	DataCollectionOrder int64  `avro:"data_collection_order"`
}

type DbzEnvelope[T any] struct {
	Before      *T        `avro:"before"`
	After       *T        `avro:"after"`
	Source      DbzSource `avro:"source"`
	Op          DbzOpType `avro:"op"`
	TsMs        *int64    `avro:"ts_ms"`
	TsUs        *int64    `avro:"ts_us"`
	TsNs        *int64    `avro:"ts_ns"`
	Transaction *DbzBlock `avro:"transaction"`
}

The root cause

I think it is caused by adding a RecordSchema to the schema cache before it is fully parsed.

Let me try to elaborate.

  1. First a slice of fields containing all nils is allocated at

    fields := make([]*Field, len(r.Fields))

  2. A record is constructed at

    avro/schema_parse.go

    Lines 254 to 262 in 45d131b

    case Record:
    rec, err = NewRecordSchema(r.Name, r.Namespace, fields,
    WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props),
    )
    case Error:
    rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields,
    WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props),
    )
    }

  3. The record is added to the cache at

    cache.Add(rec.FullName(), ref)

  4. but the fields are not populated before the code reaches

    avro/schema_parse.go

    Lines 277 to 283 in 45d131b

    for i, f := range r.Fields {
    field, err := parseField(rec.namespace, f, seen, cache)
    if err != nil {
    return nil, err
    }
    fields[i] = field
    }

This can cause others to use the cache with nil fields, causing the stack trace above.

I think the solution would be to parse the fields before adding it to the cache.

Bonus stack trace against v2.8.1

The bug was found while on an older version (v2.8.1). The stack trace below is when tested against that version:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x8 pc=0x1043a81cc]

goroutine 11148 [running]:
github.com/hamba/avro/v2.(*Field).Name(...)
	external/com_github_hamba_avro_v2/schema.go:600
github.com/hamba/avro/v2.decoderOfStruct(0x1400019a580, {0x1047a8348?, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
	external/com_github_hamba_avro_v2/codec_record.go:62 +0x11c
github.com/hamba/avro/v2.createDecoderOfRecord(0x1400019a580, {0x1047a8348, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
	external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8348, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
	external/com_github_hamba_avro_v2/codec.go:96 +0x2e8
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8310, 0x14034796f60}, {0x1047ad2a0, 0x140003bfef0})
	external/com_github_hamba_avro_v2/codec.go:99 +0x4dc
github.com/hamba/avro/v2.decoderOfPtrUnion(0x1400019a580, {0x1047a8230?, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
	external/com_github_hamba_avro_v2/codec_union.go:161 +0xbc
github.com/hamba/avro/v2.createDecoderOfUnion(0x1400019a580, {0x1047a8230, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
	external/com_github_hamba_avro_v2/codec_union.go:26 +0xb8
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8230, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
	external/com_github_hamba_avro_v2/codec.go:111 +0x278
github.com/hamba/avro/v2.decoderOfStruct(0x1400019a580, {0x1047a8348?, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
	external/com_github_hamba_avro_v2/codec_record.go:80 +0x1cc
github.com/hamba/avro/v2.createDecoderOfRecord(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
	external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
	external/com_github_hamba_avro_v2/codec.go:96 +0x2e8
github.com/hamba/avro/v2.(*frozenConfig).DecoderOf(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad160, 0x1400092fc20})
	external/com_github_hamba_avro_v2/codec.go:74 +0xb8
github.com/hamba/avro/v2.(*Reader).ReadVal(0x1400169a8c0, {0x1047a8348, 0x14002143550}, {0x1046d9380, 0x1400236b7a0})
	external/com_github_hamba_avro_v2/codec.go:44 +0xd4
github.com/hamba/avro/v2.(*frozenConfig).Unmarshal(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x140003fe1c0?, 0x0?, 0x0?}, {0x1046d9380, 0x1400236b7a0})
	external/com_github_hamba_avro_v2/config.go:143 +0x58
github.com/hamba/avro/v2.Unmarshal(...)
	external/com_github_hamba_avro_v2/decoder.go:49
<redacted>.Test_Concurrent.func1()
	<redacted>/avrohandler_test.go:81 +0xf8
created by <redacted>.Test_Concurrent in goroutine 9
	<redacted>/avrohandler_test.go:74 +0x6f8
@nrwiersma
Copy link
Member

The issue with your solution is that it does not account for circular schemas.

@kfcss
Copy link
Author

kfcss commented Dec 11, 2024

Then maybe we could have an internal cache used while parsing.
When parsing is done, everything from the internal cache is added to the final cache.
In that way, one in-process parsing call cannot add not-finished-parsing content that is used by a different parse call.

@kfcss
Copy link
Author

kfcss commented Dec 16, 2024

@nrwiersma any updates? What do you think of the local-cache-while-parsing solution?

@nrwiersma
Copy link
Member

I have not had time to dig into this properly yet, end of year time pressures and all.

The local cache seems like a good option, but I would look in the direction of a wrapper around the SchemaCache, to not burden the rest of the parser with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants