Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
triump2020 committed Feb 24, 2025
1 parent 9b89c47 commit 0233af3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
35 changes: 35 additions & 0 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ func (c *PushClient) toSubscribeTable(
if err != nil {
return nil, err
}
case SubRspTableNotExist:
c.subscribed.clearTable(dbID, tableID)
return nil, moerr.NewInternalErrorf(
ctx,
"to subcribe table:%d failed, since table is not exist",
tableID)
case Unsubscribing:
//need to wait for unsubscribe succeed for making the subscribe and unsubscribe execute in order,
// otherwise the partition state will leak log tails.
Expand Down Expand Up @@ -627,6 +633,17 @@ func (c *PushClient) receiveOneLogtail(ctx context.Context, e *Engine) error {
logutil.Errorf("%s dispatch unsubscribe response failed, err: %s", logTag, err)
return err
}
} else if errRsp := resp.response.GetError(); errRsp != nil {
status := errRsp.GetStatus()
if uint16(status.GetCode()) == moerr.OkExpectedEOB {
c.subscribed.setTableSubNotExist(
errRsp.GetTable().GetDbId(),
errRsp.GetTable().GetTbId())
}
logutil.Errorf("%s subsribe table:%d failed, err:%s",
logTag,
errRsp.GetTable().GetTbId(),
status.GetMessage())
}
return nil
}
Expand Down Expand Up @@ -1325,6 +1342,24 @@ func (c *PushClient) Disconnect() error {
return c.subscriber.logTailClient.Close()
}

func (s *subscribedTable) setTableSubNotExist(dbId, tblId uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.m[tblId] = SubTableStatus{
DBID: dbId,
SubState: SubRspTableNotExist,
LatestTime: time.Now(),
}
logutil.Infof("%s subscribe tbl[db: %d, tbl: %d] response failed, since table is not exist",
logTag, dbId, tblId)
}

func (s *subscribedTable) clearTable(dbId, tblId uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.m, tblId)
}

func (s *subscribedTable) setTableSubscribed(dbId, tblId uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand Down
10 changes: 8 additions & 2 deletions pkg/vm/engine/disttae/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) {
func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) {
statser := statistic.StatsInfoFromContext(wrapKey.Ctx)
crs := new(perfcounter.CounterSet)

//logutil.Infof("xxxx updateTableStats,start to update table stats, table ID: %d", wrapKey.Key.TableID)
if !gs.shouldUpdate(wrapKey.Key) {
return
}
Expand Down Expand Up @@ -699,10 +699,14 @@ func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) {
wrapKey.Key.DatabaseID,
wrapKey.Key.DbName)
if err != nil {
logutil.Errorf("wait logtail updated error: %s, table ID: %d", err, wrapKey.Key.TableID)
logutil.Errorf(
"updateTableStats:Failed to subsrcribe table:%d, err:%s",
wrapKey.Key.TableID,
err)
broadcastWithoutUpdate()
return
}
//logutil.Infof("xxxx updateTableStats,subscribe table success, table ID: %d", wrapKey.Key.TableID)

// wait until the table's logtail has been updated.
//logtailUpdated, err := gs.waitLogtailUpdated(wrapKey.Key.TableID)
Expand Down Expand Up @@ -735,9 +739,11 @@ func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) {
gs.mu.Lock()
defer gs.mu.Unlock()
if updated {
logutil.Infof("xxxx updateTableStats,update table stats success, table ID: %d", wrapKey.Key.TableID)
gs.mu.statsInfoMap[wrapKey.Key] = stats
gs.broadcastStats(wrapKey.Key)
} else if _, ok := gs.mu.statsInfoMap[wrapKey.Key]; !ok {
logutil.Infof("xxxx updateTableStats,update table stats failed, table ID: %d", wrapKey.Key.TableID)
gs.mu.statsInfoMap[wrapKey.Key] = nil
}

Expand Down
20 changes: 12 additions & 8 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,24 @@ func (tbl *txnTable) PrefetchAllMeta(ctx context.Context) bool {
}

func (tbl *txnTable) Stats(ctx context.Context, sync bool) (*pb.StatsInfo, error) {
//_, err := tbl.getPartitionState(ctx)
//if err != nil {
// logutil.Errorf("failed to get partition state of table %d: %v", tbl.tableId, err)
// return nil, err
//}
_, err := tbl.getPartitionState(ctx)
if err != nil {
logutil.Errorf("failed to get partition state of table %d: %v", tbl.tableId, err)
return nil, err
}
if !tbl.db.op.IsSnapOp() {
return tbl.getEngine().Stats(ctx, pb.StatsInfoKey{
//logutil.Infof("xxxx Stats start, table:%s, tableID:%d, txn:%s, sync:%v",
// tbl.tableName, tbl.tableId, tbl.db.op.Txn().DebugString(), sync)
stats := tbl.getEngine().Stats(ctx, pb.StatsInfoKey{
AccId: tbl.accountId,
DatabaseID: tbl.db.databaseId,
TableID: tbl.tableId,
TableName: tbl.tableName,
DbName: tbl.db.databaseName,
}, sync), nil
}, sync)
//logutil.Infof("xxxx Stats end, table:%s, tableID:%d, txn:%s, stats:%p",
// tbl.tableName, tbl.tableId, tbl.db.op.Txn().DebugString(), stats)
return stats, nil
}
info, err := tbl.stats(ctx)
if err != nil {
Expand Down Expand Up @@ -1990,7 +1995,6 @@ func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.Part
tbl.tableName,
tbl.db.databaseId,
tbl.db.databaseName)

return ps, err
}

Expand Down

0 comments on commit 0233af3

Please sign in to comment.