diff --git a/client.go b/client/client.go
similarity index 85%
rename from client.go
rename to client/client.go
index 712f5c1..caf479b 100644
--- a/client.go
+++ b/client/client.go
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package greptime
+package client
 
 import (
 	"context"
@@ -22,6 +22,7 @@ import (
 
 	"github.com/GreptimeTeam/greptimedb-ingester-go/config"
 	"github.com/GreptimeTeam/greptimedb-ingester-go/insert"
+	"github.com/GreptimeTeam/greptimedb-ingester-go/table"
 )
 
 // Client helps to Insert/Query data Into/From GreptimeDB. A Client is safe for concurrent
@@ -34,7 +35,7 @@ type Client struct {
 }
 
 // NewClient helps to create the greptimedb client, which will be responsible Write/Read data To/From GreptimeDB
-func NewClient(cfg *config.Config) (*Client, error) {
+func New(cfg *config.Config) (*Client, error) {
 	conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...)
 	if err != nil {
 		return nil, err
@@ -48,8 +49,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
 	}, nil
 }
 
-// Insert helps to insert multiple rows of multiple tables into greptimedb
-func (c *Client) Insert(ctx context.Context, req insert.InsertsRequest) (*greptimepb.GreptimeResponse, error) {
+func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*greptimepb.GreptimeResponse, error) {
+	req := insert.RowInsertsRequest{}
+	req.AddTable(tables...)
 	request, err := req.Build(c.cfg)
 	if err != nil {
 		return nil, err
diff --git a/client_test.go b/client/client_test.go
similarity index 98%
rename from client_test.go
rename to client/client_test.go
index 3bddfcf..3abef5d 100644
--- a/client_test.go
+++ b/client/client_test.go
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package greptime
+package client
 
 import (
 	"testing"
diff --git a/stream_client.go b/client/stream/stream_client.go
similarity index 100%
rename from stream_client.go
rename to client/stream/stream_client.go
diff --git a/stream_client_test.go b/client/stream/stream_client_test.go
similarity index 100%
rename from stream_client_test.go
rename to client/stream/stream_client_test.go
diff --git a/error/error.go b/error/error.go
index 2727264..33ad331 100644
--- a/error/error.go
+++ b/error/error.go
@@ -19,18 +19,12 @@ import (
 )
 
 var (
-	ErrEmptyDatabase        = errors.New("name of database should not be empty")
-	ErrEmptyTable           = errors.New("name of table should not be be empty")
-	ErrEmptyInserts         = errors.New("at least one insert is required in InsertsRequest")
+	ErrEmptyDatabaseName    = errors.New("name of database should not be empty")
+	ErrEmptyTableName       = errors.New("name of table should not be be empty")
+	ErrEmptyColumnName      = errors.New("name of column should not be be empty")
+	ErrEmptyTables          = errors.New("please add at least one record before sending insert request")
 	ErrEmptyTimestamp       = errors.New("timestamp should not be empty")
-	ErrEmptyQuery           = errors.New("query should not be empty, assign Sql, InstantPromql or RangePromql")
-	ErrEmptyKey             = errors.New("key should not be empty")
-	ErrEmptySql             = errors.New("sql is required in querying")
-	ErrEmptyPromql          = errors.New("promql is required in promql querying")
-	ErrEmptyStep            = errors.New("step is required in range promql")
-	ErrEmptyRange           = errors.New("start and end is required in range promql")
 	ErrInvalidTimePrecision = errors.New("precision of timestamp is not valid")
-	ErrNoSeriesInMetric     = errors.New("empty series in Metric")
 
 	ErrColumnNotSet = errors.New("column not set, please call AddColumn first")
 )
diff --git a/insert/header.go b/insert/header.go
index 52cddb5..e52aa8f 100644
--- a/insert/header.go
+++ b/insert/header.go
@@ -18,8 +18,8 @@ import (
 	greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
 
 	"github.com/GreptimeTeam/greptimedb-ingester-go/config"
-	gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
-	gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
+	err "github.com/GreptimeTeam/greptimedb-ingester-go/error"
+	"github.com/GreptimeTeam/greptimedb-ingester-go/util"
 )
 
 type reqHeader struct {
@@ -27,12 +27,12 @@ type reqHeader struct {
 }
 
 func (h *reqHeader) build(cfg *config.Config) (*greptimepb.RequestHeader, error) {
-	if gutil.IsEmptyString(h.database) {
+	if util.IsEmptyString(h.database) {
 		h.database = cfg.Database
 	}
 
-	if gutil.IsEmptyString(h.database) {
-		return nil, gerr.ErrEmptyDatabase
+	if util.IsEmptyString(h.database) {
+		return nil, err.ErrEmptyDatabaseName
 	}
 
 	header := &greptimepb.RequestHeader{
@@ -57,7 +57,7 @@ func (h RespHeader) IsRateLimited() bool {
 }
 
 func (h RespHeader) IsNil() bool {
-	return h.Code == 0 && gutil.IsEmptyString(h.Msg)
+	return h.Code == 0 && util.IsEmptyString(h.Msg)
 }
 
 type getRespHeader interface {
diff --git a/insert/header_test.go b/insert/header_test.go
index 55fad90..4d3b8b7 100644
--- a/insert/header_test.go
+++ b/insert/header_test.go
@@ -28,7 +28,7 @@ func TestHeaderBuild(t *testing.T) {
 	h := &reqHeader{}
 
 	gh, err := h.build(&config.Config{})
-	assert.ErrorIs(t, err, gerr.ErrEmptyDatabase)
+	assert.ErrorIs(t, err, gerr.ErrEmptyDatabaseName)
 	assert.Nil(t, gh)
 
 	gh, err = h.build(&config.Config{Database: "database"})
diff --git a/insert/insert.go b/insert/insert.go
index 6abe75d..cfec2fb 100644
--- a/insert/insert.go
+++ b/insert/insert.go
@@ -15,101 +15,64 @@
 package insert
 
 import (
-	greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
+	gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
 
 	"github.com/GreptimeTeam/greptimedb-ingester-go/config"
 	gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
 	"github.com/GreptimeTeam/greptimedb-ingester-go/table"
-	gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
 )
 
-type InsertsRequest struct {
-	header  reqHeader
-	inserts []InsertRequest
+type RowInsertsRequest struct {
+	header reqHeader
+	tables []*table.Table
 }
 
-// WithDatabase helps to specify different database from the default one.
-func (r *InsertsRequest) WithDatabase(database string) *InsertsRequest {
+func (r *RowInsertsRequest) IsTablesEmpty() bool {
+	return r.tables == nil || len(r.tables) == 0
+}
+
+func (r *RowInsertsRequest) WithDatabase(database string) *RowInsertsRequest {
 	r.header = reqHeader{
 		database: database,
 	}
 	return r
 }
 
-// Append will include one insert into this InsertsRequest
-func (r *InsertsRequest) Append(insert InsertRequest) *InsertsRequest {
-	if r.inserts == nil {
-		r.inserts = make([]InsertRequest, 0)
+func (r *RowInsertsRequest) AddTable(tables ...*table.Table) *RowInsertsRequest {
+	if r.tables == nil {
+		r.tables = make([]*table.Table, 0)
 	}
 
-	r.inserts = append(r.inserts, insert)
-
+	r.tables = append(r.tables, tables...)
 	return r
 }
 
-func (r InsertsRequest) Build(cfg *config.Config) (*greptimepb.GreptimeRequest, error) {
+func (r RowInsertsRequest) Build(cfg *config.Config) (*gpb.GreptimeRequest, error) {
 	header, err := r.header.build(cfg)
 	if err != nil {
 		return nil, err
 	}
 
-	if len(r.inserts) == 0 {
-		return nil, gerr.ErrEmptyInserts
+	if r.IsTablesEmpty() {
+		return nil, gerr.ErrEmptyTables
 	}
 
-	reqs := make([]*greptimepb.InsertRequest, 0, len(r.inserts))
-	for _, insert := range r.inserts {
-		req, err := insert.build()
-		if err != nil {
-			return nil, err
+	reqs := make([]*gpb.RowInsertRequest, 0, len(r.tables))
+	for _, tbl := range r.tables {
+		req := &gpb.RowInsertRequest{
+			TableName: tbl.Schema.Name,
+			Rows:      tbl.Rows,
 		}
 		reqs = append(reqs, req)
 	}
 
-	req := greptimepb.GreptimeRequest_Inserts{
-		Inserts: &greptimepb.InsertRequests{Inserts: reqs},
+	req := gpb.GreptimeRequest_RowInserts{
+		RowInserts: &gpb.RowInsertRequests{Inserts: reqs},
 	}
 
-	return &greptimepb.GreptimeRequest{
+	return &gpb.GreptimeRequest{
 		Header:  header,
 		Request: &req,
 	}, nil
 
 }
-
-// InsertRequest insert metric to specified table. You can also specify the database in header.
-type InsertRequest struct {
-	table  string
-	metric table.Metric
-}
-
-func (r *InsertRequest) WithTable(table string) *InsertRequest {
-	r.table = table
-	return r
-}
-
-func (r *InsertRequest) WithMetric(metric table.Metric) *InsertRequest {
-	r.metric = metric
-	return r
-}
-
-func (r *InsertRequest) RowCount() uint32 {
-	return uint32(len(r.metric.GetSeries()))
-}
-
-func (r *InsertRequest) build() (*greptimepb.InsertRequest, error) {
-	if gutil.IsEmptyString(r.table) {
-		return nil, gerr.ErrEmptyTable
-	}
-
-	columns, err := r.metric.IntoGreptimeColumn()
-	if err != nil {
-		return nil, err
-	}
-
-	return &greptimepb.InsertRequest{
-		TableName: r.table,
-		Columns:   columns,
-		RowCount:  r.RowCount(),
-	}, nil
-}
diff --git a/table/metric.go b/table/metric.go
deleted file mode 100644
index e078edb..0000000
--- a/table/metric.go
+++ /dev/null
@@ -1,294 +0,0 @@
-// Copyright 2024 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package table
-
-import (
-	"fmt"
-	"math"
-	"time"
-
-	greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
-
-	gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
-	gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
-)
-
-// Metric represents multiple rows of data, and also Metric can specify
-// the timestamp column name and precision
-type Metric struct {
-	timestampAlias     string
-	timestampPrecision time.Duration
-	// orders and columns SHOULD NOT contain timestampAlias key
-	orders  []string
-	columns map[string]column
-
-	series []Series
-}
-
-// GetTagsAndFields get all column names from metric, except timestamp column
-func (m *Metric) GetTagsAndFields() []string {
-	dst := make([]string, len(m.orders))
-	copy(dst, m.orders)
-	return dst
-}
-
-// GetSeries gets all data from metric
-func (m *Metric) GetSeries() []Series {
-	return m.series
-}
-
-// SetTimePrecision set precision for Metric. Valid durations include:
-//   - time.Nanosecond
-//   - time.Microsecond
-//   - time.Millisecond
-//   - time.Second.
-//
-// # Pay attention
-//
-//   - once the precision has been set, it can not be changed
-//   - insert will fail if precision does not match with the existing precision of the schema in greptimedb
-func (m *Metric) SetTimePrecision(precision time.Duration) error {
-	if !gutil.IsValidPrecision(precision) {
-		return gerr.ErrInvalidTimePrecision
-	}
-	m.timestampPrecision = precision
-	return nil
-}
-
-// SetTimestampAlias helps to specify the timestamp column name, default is ts.
-func (m *Metric) SetTimestampAlias(alias string) error {
-	alias, err := gutil.ToColumnName(alias)
-	if err != nil {
-		return err
-	}
-	m.timestampAlias = alias
-	return nil
-}
-
-// GetTimestampAlias get the timestamp column name, default is ts.
-func (m *Metric) GetTimestampAlias() string {
-	if len(m.timestampAlias) == 0 {
-		return "ts"
-	}
-	return m.timestampAlias
-}
-
-// AddSeries add one row to Metric.
-//
-// # Pay Attention
-//
-//   - different row can have different fields, Metric will union all the columns,
-//     leave empty value of one row if the column is not specified in this row
-//   - same column name MUST have same schema, which means Tag,Field,Timestamp and
-//     data type MUST BE the same of the same column name in different rows
-func (m *Metric) AddSeries(s Series) error {
-	if m.columns == nil {
-		m.columns = map[string]column{}
-	}
-
-	if m.orders == nil {
-		m.orders = []string{}
-	}
-
-	if m.series == nil {
-		m.series = []Series{}
-	}
-
-	for _, key := range s.orders {
-		sCol := s.columns[key]
-		if mCol, seen := m.columns[key]; seen {
-			if err := checkColumnEquality(key, mCol, sCol); err != nil {
-				return err
-			}
-		} else {
-			m.orders = append(m.orders, key)
-			m.columns[key] = sCol
-		}
-	}
-
-	m.series = append(m.series, s)
-	return nil
-}
-
-func (m *Metric) IntoGreptimeColumn() ([]*greptimepb.Column, error) {
-	if len(m.series) == 0 {
-		return nil, gerr.ErrNoSeriesInMetric
-	}
-
-	result, err := m.intoDataColumns()
-	if err != nil {
-		return nil, err
-	}
-
-	tsColumn, err := m.intoTimestampColumn()
-	if err != nil {
-		return nil, err
-	}
-
-	return append(result, tsColumn), nil
-}
-
-// nullMaskByteSize helps to calculate how many bytes needed in Mask.shrink
-func (m *Metric) nullMaskByteSize() int {
-	return int(math.Ceil(float64(len(m.series)) / 8.0))
-}
-
-// intoDataColumns does not contain timestamp semantic column
-func (m *Metric) intoDataColumns() ([]*greptimepb.Column, error) {
-	nullMasks := map[string]*gutil.Mask{}
-	mappedCols := map[string]*greptimepb.Column{}
-	for name, col := range m.columns {
-		column := greptimepb.Column{
-			ColumnName:   name,
-			SemanticType: col.semantic,
-			Datatype:     col.typ,
-			Values:       &greptimepb.Column_Values{},
-			NullMask:     nil,
-		}
-		mappedCols[name] = &column
-	}
-
-	for rowIdx, s := range m.series {
-		for name, col := range mappedCols {
-			if val, exist := s.vals[name]; exist {
-				if err := setColumn(col, val); err != nil {
-					return nil, err
-				}
-			} else {
-				nullMask, exist := nullMasks[name]
-				if !exist {
-					nullMask = &gutil.Mask{}
-					nullMasks[name] = nullMask
-				}
-				nullMask.Set(uint(rowIdx))
-			}
-		}
-	}
-
-	if len(nullMasks) > 0 {
-		if err := setNullMask(mappedCols, nullMasks, m.nullMaskByteSize()); err != nil {
-			return nil, err
-		}
-	}
-
-	result := make([]*greptimepb.Column, 0, len(mappedCols))
-	for _, key := range m.orders {
-		result = append(result, mappedCols[key])
-	}
-
-	return result, nil
-}
-
-func (m *Metric) intoTimestampColumn() (*greptimepb.Column, error) {
-	datatype, err := gutil.PrecisionToDataType(m.timestampPrecision)
-	if err != nil {
-		return nil, err
-	}
-	tsColumn := &greptimepb.Column{
-		ColumnName:   m.GetTimestampAlias(),
-		SemanticType: greptimepb.SemanticType_TIMESTAMP,
-		Datatype:     datatype,
-		Values:       &greptimepb.Column_Values{},
-		NullMask:     nil,
-	}
-	nullMask := gutil.Mask{}
-	for _, s := range m.series {
-		switch datatype {
-		case greptimepb.ColumnDataType_TIMESTAMP_SECOND:
-			if err := setColumn(tsColumn, s.timestamp.Unix()); err != nil {
-				return nil, err
-			}
-		case greptimepb.ColumnDataType_TIMESTAMP_MICROSECOND:
-			if err := setColumn(tsColumn, s.timestamp.UnixMicro()); err != nil {
-				return nil, err
-			}
-		case greptimepb.ColumnDataType_TIMESTAMP_NANOSECOND:
-			if err := setColumn(tsColumn, s.timestamp.UnixNano()); err != nil {
-				return nil, err
-			}
-		default: // greptimepb.ColumnDataType_TIMESTAMP_MILLISECOND
-			if err := setColumn(tsColumn, s.timestamp.UnixMilli()); err != nil {
-				return nil, err
-			}
-		}
-	}
-
-	if b, err := nullMask.Shrink(m.nullMaskByteSize()); err != nil {
-		return nil, err
-	} else {
-		tsColumn.NullMask = b
-	}
-
-	return tsColumn, nil
-}
-
-func setColumn(col *greptimepb.Column, val any) error {
-	switch col.Datatype {
-	case greptimepb.ColumnDataType_INT8:
-		col.Values.I8Values = append(col.Values.I8Values, int32(val.(int8)))
-	case greptimepb.ColumnDataType_INT16:
-		col.Values.I16Values = append(col.Values.I16Values, int32(val.(int16)))
-	case greptimepb.ColumnDataType_INT32:
-		col.Values.I32Values = append(col.Values.I32Values, val.(int32))
-	case greptimepb.ColumnDataType_INT64:
-		col.Values.I64Values = append(col.Values.I64Values, val.(int64))
-	case greptimepb.ColumnDataType_UINT8:
-		col.Values.U8Values = append(col.Values.U8Values, uint32(val.(uint8)))
-	case greptimepb.ColumnDataType_UINT16:
-		col.Values.U16Values = append(col.Values.U16Values, uint32(val.(uint16)))
-	case greptimepb.ColumnDataType_UINT32:
-		col.Values.U32Values = append(col.Values.U32Values, val.(uint32))
-	case greptimepb.ColumnDataType_UINT64:
-		col.Values.U64Values = append(col.Values.U64Values, val.(uint64))
-	case greptimepb.ColumnDataType_FLOAT32:
-		col.Values.F32Values = append(col.Values.F32Values, val.(float32))
-	case greptimepb.ColumnDataType_FLOAT64:
-		col.Values.F64Values = append(col.Values.F64Values, val.(float64))
-	case greptimepb.ColumnDataType_BOOLEAN:
-		col.Values.BoolValues = append(col.Values.BoolValues, val.(bool))
-	case greptimepb.ColumnDataType_STRING:
-		col.Values.StringValues = append(col.Values.StringValues, val.(string))
-	case greptimepb.ColumnDataType_BINARY:
-		col.Values.BinaryValues = append(col.Values.BinaryValues, val.([]byte))
-	case greptimepb.ColumnDataType_TIMESTAMP_SECOND:
-		col.Values.TimestampSecondValues = append(col.Values.TimestampSecondValues, val.(int64))
-	case greptimepb.ColumnDataType_TIMESTAMP_MILLISECOND:
-		col.Values.TimestampMillisecondValues = append(col.Values.TimestampMillisecondValues, val.(int64))
-	case greptimepb.ColumnDataType_TIMESTAMP_MICROSECOND:
-		col.Values.TimestampMicrosecondValues = append(col.Values.TimestampMicrosecondValues, val.(int64))
-	case greptimepb.ColumnDataType_TIMESTAMP_NANOSECOND:
-		col.Values.TimestampNanosecondValues = append(col.Values.TimestampNanosecondValues, val.(int64))
-	default:
-		return fmt.Errorf("unknown column data type: %v", col.Datatype)
-	}
-	return nil
-}
-
-func setNullMask(cols map[string]*greptimepb.Column, masks map[string]*gutil.Mask, size int) error {
-	for name, mask := range masks {
-		b, err := mask.Shrink(size)
-		if err != nil {
-			return err
-		}
-
-		col, exist := cols[name]
-		if !exist {
-			return fmt.Errorf("'%s' column not found when set null mask", name)
-		}
-		col.NullMask = b
-	}
-
-	return nil
-}
diff --git a/table/schema/schema.go b/table/schema/schema.go
index b8feba1..1be23ce 100644
--- a/table/schema/schema.go
+++ b/table/schema/schema.go
@@ -15,7 +15,7 @@ func New(name string) *Schema {
 	return &Schema{Name: name}
 }
 
-func (s *Schema) IsZero() bool {
+func (s *Schema) IsEmpty() bool {
 	return s.Columns == nil || len(s.Columns) == 0
 }
 
diff --git a/table/series.go b/table/series.go
deleted file mode 100644
index 5854fb1..0000000
--- a/table/series.go
+++ /dev/null
@@ -1,343 +0,0 @@
-// Copyright 2024 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package table
-
-import (
-	"fmt"
-	"time"
-
-	greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
-
-	gutil "github.com/GreptimeTeam/greptimedb-ingester-go/util"
-)
-
-type column struct {
-	typ      greptimepb.ColumnDataType
-	semantic greptimepb.SemanticType
-}
-
-func checkColumnEquality(key string, col1, col2 column) error {
-	if col1.typ != col2.typ {
-		return fmt.Errorf("the type of '%s' does not match: '%v' and '%v'", key, col1.typ, col2.typ)
-	}
-	if col1.semantic != col2.semantic {
-		return fmt.Errorf("tag and field MUST NOT contain same key: %q", key)
-	}
-
-	return nil
-}
-
-// Series represents one row of data you want to insert into GreptimeDB.
-//   - Tag fields are the index columns, which helps you to query data efficiently
-//   - Field fields are the value columns, which are used for value
-//   - Timestamp field is the timestamp column, which is required
-//
-// you do not need to create schema in advance, it will be created based on Series.
-// But once the schema is created, [Client] has no ability to alert it.
-type Series struct {
-	orders  []string
-	columns map[string]column
-	vals    map[string]any
-
-	timestamp time.Time // required for inserting
-}
-
-// GetTagsAndFields get all column names from metric, except timestamp column
-func (s *Series) GetTagsAndFields() []string {
-	dst := make([]string, len(s.orders))
-	copy(dst, s.orders)
-	return dst
-}
-
-// Get helps to get value of specified column. The second return value
-// indicates if the key was present in Series
-func (s *Series) Get(key string) (any, bool) {
-	val, exist := s.vals[key]
-	return val, exist
-}
-
-// GetUint helps to get uint64 type of the specified key. It can retrieve the following type:
-//   - uint64
-//   - uint32
-//   - uint16
-//   - uint8
-//   - uint
-//
-// if you want uint32 instead of uint64, you can do it like:
-//
-//	if v, ok := s.GetUint(key); ok {
-//		val := uint32(v)
-//	}
-func (s *Series) GetUint(key string) (uint64, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return 0, exist
-	}
-
-	switch t := val.(type) {
-	case uint8:
-		return uint64(t), true
-	case uint16:
-		return uint64(t), true
-	case uint32:
-		return uint64(t), true
-	case uint64:
-		return t, true
-	case uint:
-		return uint64(t), true
-	default:
-		return 0, false
-	}
-}
-
-// GetInt helps to get int64 type of the specified key. It can retrieve the following type:
-//   - int64
-//   - int32
-//   - int16
-//   - int8
-//   - int
-//
-// if you want int32 instead of int64, you can do it like:
-//
-//	if v, ok := s.GetInt(key); ok {
-//		val := int32(v)
-//	}
-func (s *Series) GetInt(key string) (int64, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return 0, exist
-	}
-
-	switch t := val.(type) {
-	case int8:
-		return int64(t), true
-	case int16:
-		return int64(t), true
-	case int32:
-		return int64(t), true
-	case int64:
-		return t, true
-	case int:
-		return int64(t), true
-	default:
-		return 0, false
-	}
-}
-
-// GetFloat helps to get float64 type of the specified key. It can retrieve the following type:
-//   - float64
-//   - float32
-//
-// if you want float32 instead of float64, you can do it like:
-//
-//	if v, ok := s.GetFloat(key); ok {
-//		val := float32(v)
-//	}
-func (s *Series) GetFloat(key string) (float64, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return 0, exist
-	}
-
-	switch t := val.(type) {
-	case float32:
-		return float64(t), true
-	case float64:
-		return t, true
-	default:
-		return 0, false
-	}
-}
-
-func (s *Series) GetBool(key string) (bool, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return false, exist
-	}
-
-	v, ok := val.(bool)
-	return v, ok
-}
-
-func (s *Series) GetString(key string) (string, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return "", exist
-	}
-
-	v, ok := val.(string)
-	return v, ok
-}
-
-func (s *Series) GetBytes(key string) ([]byte, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return nil, exist
-	}
-
-	v, ok := val.([]byte)
-	return v, ok
-}
-
-func (s *Series) GetTimestamp(key string) (time.Time, bool) {
-	val, exist := s.Get(key)
-	if !exist {
-		return time.Time{}, exist
-	}
-
-	v, ok := val.(time.Time)
-	return v, ok
-}
-
-func (s *Series) add(name string, val any, semantic greptimepb.SemanticType) error {
-	key, err := gutil.ToColumnName(name)
-	if err != nil {
-		return err
-	}
-
-	if s.columns == nil {
-		s.columns = map[string]column{}
-	}
-
-	v, err := gutil.Convert(val)
-	if err != nil {
-		return fmt.Errorf("add tag err: %w", err)
-	}
-
-	newCol := column{
-		typ:      v.Type,
-		semantic: semantic,
-	}
-	if col, seen := s.columns[key]; seen {
-		if err := checkColumnEquality(key, col, newCol); err != nil {
-			return err
-		}
-	}
-	s.columns[key] = newCol
-	s.orders = append(s.orders, key)
-
-	if s.vals == nil {
-		s.vals = map[string]any{}
-	}
-	s.vals[key] = v.Val
-
-	return nil
-}
-
-// AddTag prepare tag column, and old value will be replaced if same tag is set.
-// the length of key CAN NOT be longer than 100.
-// If you want to constrain the column type, you can directly use like:
-//   - [Series.AddFloatTag]
-//   - [Series.AddIntTag]
-//   - ...
-func (s *Series) AddTag(key string, val any) error {
-	return s.add(key, val, greptimepb.SemanticType_TAG)
-}
-
-// AddFloatTag helps to constrain the key to be float64 type, if you want to
-// add float32 tag instead of float64, you can do it like:
-//
-//	var i float32 = 1.0
-//	return s.AddFloatTag("memory", float64(i))
-func (s *Series) AddFloatTag(key string, val float64) error {
-	return s.AddTag(key, val)
-}
-
-// AddIntTag helps to constrain the key to be int64 type, if you want to
-// add int32 tag instead of int64, you can do it like:
-//
-//	var i int32 = 1
-//	return s.AddIntTag("account", int64(i))
-func (s *Series) AddIntTag(key string, val int64) error {
-	return s.AddTag(key, val)
-}
-
-// AddUintTag helps to constrain the key to be uint64 type, if you want to
-// add uint32 tag instead of uint64, you can do it like:
-//
-//	var i uint32 = 1
-//	return s.AddUintTag("account", uint64(i))
-func (s *Series) AddUintTag(key string, val uint64) error {
-	return s.AddTag(key, val)
-}
-
-// AddBoolTag helps to constrain the key to be bool type
-func (s *Series) AddBoolTag(key string, val bool) error {
-	return s.AddTag(key, val)
-}
-
-// AddStringTag helps to constrain the key to be string type
-func (s *Series) AddStringTag(key string, val string) error {
-	return s.AddTag(key, val)
-}
-
-// AddBytesTag helps to constrain the key to be []byte type
-func (s *Series) AddBytesTag(key string, val []byte) error {
-	return s.AddTag(key, val)
-}
-
-// AddField prepare field column, and old value will be replaced if same field is set.
-// the length of key CAN NOT be longer than 100
-func (s *Series) AddField(key string, val any) error {
-	return s.add(key, val, greptimepb.SemanticType_FIELD)
-}
-
-// AddFloatField helps to constrain the key to be float64 type, if you want to
-// add float32 tag instead of float64, you can do it like:
-//
-//	var i float32 = 1.0
-//	return s.AddFloatField("memory", float64(i))
-func (s *Series) AddFloatField(key string, val float64) error {
-	return s.AddField(key, val)
-}
-
-// AddIntField helps to constrain the key to be int64 type, if you want to
-// add int32 tag instead of int64, you can do it like:
-//
-//	var i int32 = 1
-//	return s.AddIntField("account", int64(i))
-func (s *Series) AddIntField(key string, val int64) error {
-	return s.AddField(key, val)
-}
-
-// AddUintField helps to constrain the key to be uint64 type, if you want to
-// add uint32 tag instead of uint64, you can do it like:
-//
-//	var i uint32 = 1
-//	return s.AddUintField("account", uint64(i))
-func (s *Series) AddUintField(key string, val uint64) error {
-	return s.AddField(key, val)
-}
-
-// AddBoolField helps to constrain the key to be bool type
-func (s *Series) AddBoolField(key string, val bool) error {
-	return s.AddField(key, val)
-}
-
-// AddStringField helps to constrain the key to be string type
-func (s *Series) AddStringField(key string, val string) error {
-	return s.AddField(key, val)
-}
-
-// AddBytesField helps to constrain the key to be []byte type
-func (s *Series) AddBytesField(key string, val []byte) error {
-	return s.AddField(key, val)
-}
-
-// SetTimestamp is required
-func (s *Series) SetTimestamp(t time.Time) error {
-	s.timestamp = t
-	return nil
-}
diff --git a/table/table.go b/table/table.go
index 4c8b45e..6a9272f 100644
--- a/table/table.go
+++ b/table/table.go
@@ -12,7 +12,7 @@ import (
 
 type Table struct {
 	Schema schema.Schema
-	rows   gpb.Rows
+	Rows   *gpb.Rows
 }
 
 func New(schema schema.Schema) *Table {
@@ -23,7 +23,7 @@ func New(schema schema.Schema) *Table {
 
 	return &Table{
 		Schema: schema,
-		rows: gpb.Rows{
+		Rows: &gpb.Rows{
 			Schema: colSchema,
 			Rows:   make([]*gpb.Row, 0),
 		},
@@ -31,16 +31,16 @@ func New(schema schema.Schema) *Table {
 }
 
 func (t *Table) addRow(row *gpb.Row) {
-	if t.rows.Rows == nil {
-		t.rows.Rows = make([]*gpb.Row, 0)
+	if t.Rows.Rows == nil {
+		t.Rows.Rows = make([]*gpb.Row, 0)
 	}
 
-	t.rows.Rows = append(t.rows.Rows, row)
+	t.Rows.Rows = append(t.Rows.Rows, row)
 }
 
 // AddRow will check if the input matches the schema
 func (t *Table) AddRow(inputs ...any) error {
-	if t.Schema.IsZero() {
+	if t.Schema.IsEmpty() {
 		return err.ErrColumnNotSet
 	}
 
@@ -65,3 +65,7 @@ func (t *Table) AddRow(inputs ...any) error {
 
 	return nil
 }
+
+func (t *Table) IsEmpty() bool {
+	return t.Schema.IsEmpty() || t.Rows.Rows == nil || len(t.Rows.Rows) == 0
+}
diff --git a/util/util.go b/util/util.go
index 890c2be..a337241 100644
--- a/util/util.go
+++ b/util/util.go
@@ -25,15 +25,6 @@ import (
 	gerr "github.com/GreptimeTeam/greptimedb-ingester-go/error"
 )
 
-type Value struct {
-	Val  any
-	Type greptimepb.ColumnDataType
-}
-
-func newValue(val any, typ greptimepb.ColumnDataType) *Value {
-	return &Value{val, typ}
-}
-
 func IsValidPrecision(t time.Duration) bool {
 	return t == time.Second ||
 		t == time.Millisecond ||
@@ -65,10 +56,8 @@ func IsEmptyString(s string) bool {
 }
 
 func ToColumnName(s string) (string, error) {
-	s = strings.TrimSpace(s)
-
-	if len(s) == 0 {
-		return "", gerr.ErrEmptyKey
+	if IsEmptyString(s) {
+		return "", gerr.ErrEmptyColumnName
 	}
 
 	if len(s) >= 100 {