diff --git a/pkg/pb/statsinfo/statsinfo.pb.go b/pkg/pb/statsinfo/statsinfo.pb.go index b339a2e7d68d6..9155da7c7c05c 100644 --- a/pkg/pb/statsinfo/statsinfo.pb.go +++ b/pkg/pb/statsinfo/statsinfo.pb.go @@ -417,6 +417,8 @@ type StatsInfoKey struct { DatabaseID uint64 `protobuf:"varint,1,opt,name=DatabaseID,proto3" json:"DatabaseID,omitempty"` TableID uint64 `protobuf:"varint,2,opt,name=TableID,proto3" json:"TableID,omitempty"` AccId uint32 `protobuf:"varint,3,opt,name=AccId,proto3" json:"AccId,omitempty"` + TableName string `protobuf:"bytes,4,opt,name=TableName,proto3" json:"TableName,omitempty"` + DbName string `protobuf:"bytes,5,opt,name=DbName,proto3" json:"DbName,omitempty"` } func (m *StatsInfoKey) Reset() { *m = StatsInfoKey{} } @@ -473,6 +475,20 @@ func (m *StatsInfoKey) GetAccId() uint32 { return 0 } +func (m *StatsInfoKey) GetTableName() string { + if m != nil { + return m.TableName + } + return "" +} + +func (m *StatsInfoKey) GetDbName() string { + if m != nil { + return m.DbName + } + return "" +} + type StatsInfoKeys struct { Keys []StatsInfoKey `protobuf:"bytes,1,rep,name=Keys,proto3" json:"Keys"` } @@ -535,62 +551,63 @@ func init() { func init() { proto.RegisterFile("statsinfo.proto", fileDescriptor_a3f8e561c9795adb) } var fileDescriptor_a3f8e561c9795adb = []byte{ - // 877 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xcf, 0x6f, 0xe3, 0x44, - 0x18, 0x8d, 0x63, 0x37, 0x8d, 0x3f, 0xa7, 0xed, 0x6a, 0x54, 0x75, 0x47, 0x15, 0x32, 0x26, 0x80, - 0x64, 0x56, 0x6c, 0x22, 0xc2, 0x65, 0x59, 0x7e, 0x48, 0xed, 0x16, 0xd8, 0x68, 0x37, 0x5d, 0x34, - 0x29, 0x7b, 0x00, 0x09, 0x69, 0x92, 0x4e, 0x52, 0x53, 0xc7, 0xb6, 0x6c, 0xa7, 0x24, 0xfd, 0x2b, - 0xb8, 0x73, 0xe1, 0xcf, 0xd9, 0xe3, 0x1e, 0xf7, 0x84, 0x50, 0x7b, 0xe0, 0xdf, 0x40, 0xf3, 0x8d, - 0x9d, 0x4c, 0x8b, 0x55, 0xd4, 0x53, 0xe6, 0x7d, 0x7e, 0xef, 0xcd, 0x7c, 0x93, 0x37, 0x33, 0xb0, - 0x93, 0xe5, 0x3c, 0xcf, 0x82, 0x68, 0x12, 0x77, 0x92, 0x34, 0xce, 0x63, 0x62, 0xaf, 0x0a, 0xfb, - 0x8f, 0xa7, 0x41, 0x7e, 0x36, 0x1f, 0x75, 0xc6, 0xf1, 0xac, 0x3b, 0x8d, 0xa7, 0x71, 0x17, 0x19, - 0xa3, 0xf9, 0x04, 0x11, 0x02, 0x1c, 0x29, 0x65, 0xfb, 0x1f, 0x03, 0x9c, 0xe1, 0xd9, 0x7c, 0x32, - 0x09, 0xc5, 0x73, 0xc1, 0x13, 0xf2, 0x08, 0xac, 0x97, 0x62, 0x92, 0x53, 0xc3, 0x33, 0x7c, 0xa7, - 0xb7, 0xd7, 0x59, 0xcf, 0xa4, 0xb1, 0x18, 0x72, 0xc8, 0xa7, 0xb0, 0xc1, 0x82, 0xe9, 0x59, 0x4e, - 0xeb, 0x77, 0x92, 0x15, 0x89, 0x3c, 0x00, 0xf3, 0x85, 0x58, 0x52, 0xd3, 0x33, 0x7c, 0x83, 0xc9, - 0x21, 0xd9, 0x85, 0x8d, 0xd7, 0x3c, 0x9c, 0x0b, 0x6a, 0x61, 0x4d, 0x01, 0xb2, 0x07, 0x8d, 0xe7, - 0x02, 0x6d, 0x37, 0x3c, 0xc3, 0x37, 0x59, 0x81, 0xc8, 0x36, 0xd4, 0x87, 0x97, 0xb4, 0x81, 0xb5, - 0xfa, 0xf0, 0x52, 0xaa, 0x8f, 0xe7, 0x61, 0x98, 0xd1, 0x4d, 0x2c, 0x29, 0x40, 0x28, 0x6c, 0x32, - 0x71, 0x21, 0xd2, 0x4c, 0xd0, 0xa6, 0x67, 0xf8, 0x4d, 0x56, 0xc2, 0xf6, 0xbb, 0x3a, 0xb4, 0x8a, - 0x65, 0x31, 0x1e, 0x4d, 0x05, 0x79, 0x0f, 0xec, 0x7e, 0x36, 0xcc, 0xd3, 0x93, 0x65, 0x22, 0xb0, - 0xdf, 0x26, 0x5b, 0x17, 0x8a, 0xe9, 0xea, 0xab, 0xe9, 0x1e, 0x81, 0x75, 0x92, 0x0a, 0x81, 0xeb, - 0xbf, 0x63, 0x63, 0x24, 0x47, 0xb6, 0x3a, 0x08, 0xa2, 0xa2, 0x2d, 0x39, 0xc4, 0x0a, 0x5f, 0x60, - 0x47, 0xb2, 0xc2, 0x17, 0x84, 0x80, 0x35, 0x08, 0xa2, 0x8c, 0x36, 0x3c, 0xd3, 0x6f, 0x31, 0x1c, - 0x63, 0x8d, 0x2f, 0x64, 0x47, 0xaa, 0xc6, 0x17, 0x58, 0x63, 0xf1, 0x6f, 0x19, 0x6d, 0x7a, 0xa6, - 0x6f, 0x32, 0x1c, 0xaf, 0x5b, 0xb7, 0xb1, 0x58, 0xb4, 0xbe, 0x07, 0x8d, 0x01, 0x5f, 0xbc, 0x14, - 0x11, 0x05, 0xb5, 0x71, 0x0a, 0x49, 0xf6, 0x77, 0x21, 0x9f, 0x66, 0xd4, 0xf1, 0x4c, 0xbf, 0xc9, - 0x14, 0x90, 0x1b, 0xf5, 0xea, 0x42, 0xa4, 0x21, 0x4f, 0x68, 0x0b, 0x57, 0x55, 0x42, 0xf9, 0xe5, - 0xc7, 0x28, 0x98, 0xc4, 0xe9, 0x8c, 0x6e, 0xa9, 0x2f, 0x05, 0x94, 0x33, 0x30, 0x91, 0xcd, 0xc3, - 0x9c, 0x6e, 0x7b, 0xa6, 0x6f, 0xb0, 0x02, 0xb5, 0xff, 0xb0, 0xc1, 0x1e, 0xca, 0xfd, 0xe8, 0x47, - 0x93, 0x98, 0x3c, 0x81, 0xc6, 0xf1, 0xe9, 0xc5, 0x80, 0x27, 0xd4, 0xf0, 0x4c, 0xdf, 0xe9, 0x79, - 0xfa, 0x5e, 0x95, 0xac, 0x8e, 0xa2, 0x7c, 0x1b, 0xe5, 0xe9, 0x92, 0x15, 0x7c, 0x72, 0x00, 0xf6, - 0x20, 0x88, 0x5e, 0xf3, 0x50, 0x8a, 0xeb, 0x28, 0xfe, 0xb0, 0x52, 0xbc, 0x62, 0x29, 0xfd, 0x5a, - 0x85, 0x16, 0x7c, 0x51, 0x58, 0x98, 0x77, 0x59, 0x94, 0xac, 0xd2, 0xa2, 0xc4, 0xe4, 0x7b, 0x70, - 0x8e, 0x78, 0xce, 0x65, 0x0a, 0xa4, 0x89, 0x85, 0x26, 0x1f, 0x57, 0x9a, 0x68, 0x3c, 0x65, 0xa3, - 0x2b, 0xc9, 0x11, 0x80, 0xfc, 0x67, 0x9e, 0x45, 0xb9, 0xf4, 0xd9, 0x40, 0x9f, 0x8f, 0xaa, 0x37, - 0x63, 0x45, 0x53, 0x36, 0x9a, 0x8e, 0x7c, 0x09, 0x9b, 0xc3, 0xe0, 0x12, 0x97, 0xd2, 0x40, 0x8b, - 0x0f, 0x2a, 0x2d, 0x0a, 0x8e, 0xd2, 0x97, 0x0a, 0x32, 0x84, 0x1d, 0x3d, 0xf3, 0xd2, 0x64, 0x13, - 0x4d, 0x3e, 0xa9, 0x36, 0xb9, 0xc9, 0x55, 0x66, 0xb7, 0x1d, 0x88, 0x07, 0xce, 0x61, 0x18, 0x8f, - 0xcf, 0x8f, 0xe7, 0xb3, 0x91, 0x48, 0xf1, 0x9c, 0x99, 0x4c, 0x2f, 0x91, 0x1e, 0xec, 0x1e, 0x8c, - 0xc7, 0xf3, 0x94, 0xe7, 0xe2, 0xd5, 0xe8, 0x57, 0x31, 0xce, 0x0b, 0xaa, 0x8d, 0xd4, 0xca, 0x6f, - 0xa4, 0x03, 0xe4, 0x20, 0x49, 0xd2, 0x78, 0x71, 0x43, 0xa1, 0xa2, 0x5c, 0xf1, 0x85, 0xec, 0x43, - 0xf3, 0x84, 0x8f, 0x42, 0xf1, 0x2c, 0xca, 0xa9, 0x83, 0x39, 0x5d, 0x61, 0x79, 0xb4, 0x71, 0x7c, - 0xcc, 0x67, 0x02, 0xe3, 0x6d, 0xb3, 0x75, 0x81, 0xb8, 0x00, 0x27, 0xc1, 0x4c, 0x0c, 0xc5, 0x38, - 0x8e, 0x4e, 0x31, 0xe3, 0x26, 0xd3, 0x2a, 0xfb, 0x5f, 0x80, 0xa3, 0xa5, 0x53, 0x9e, 0xdd, 0x73, - 0xb1, 0xc4, 0x1b, 0xc2, 0x66, 0x72, 0x28, 0x4f, 0xd4, 0x05, 0x5e, 0x5c, 0x75, 0x75, 0x71, 0x21, - 0x78, 0x5a, 0x7f, 0x62, 0xec, 0x7f, 0x05, 0xdb, 0x37, 0xb3, 0x79, 0x6f, 0xf5, 0x8d, 0x58, 0xde, - 0x4b, 0xfd, 0x0d, 0x3c, 0xb8, 0x9d, 0xc7, 0xff, 0xd3, 0x5b, 0xba, 0xfe, 0x6b, 0xd8, 0xb9, 0x95, - 0xc3, 0x7b, 0xc9, 0x9f, 0x42, 0x4b, 0xcf, 0xe0, 0xbd, 0xb4, 0x3f, 0xc3, 0x6e, 0x55, 0xf4, 0x2a, - 0x3c, 0x1e, 0xeb, 0x1e, 0x4e, 0xef, 0xe1, 0x7f, 0xef, 0x61, 0x74, 0xd0, 0xcc, 0xdb, 0xbf, 0x40, - 0x6b, 0x95, 0x70, 0xf9, 0xec, 0xb8, 0x00, 0x72, 0x9f, 0x46, 0x3c, 0x13, 0xfd, 0x23, 0xf4, 0xb6, - 0x98, 0x56, 0x91, 0xf7, 0x1f, 0x66, 0xa5, 0x7f, 0x54, 0x2c, 0xb4, 0x84, 0xb2, 0x81, 0x83, 0xf1, - 0xb8, 0x7f, 0x8a, 0x8f, 0xc0, 0x16, 0x53, 0xa0, 0x7d, 0x08, 0x5b, 0xba, 0x7f, 0x46, 0x3e, 0x03, - 0x4b, 0xfe, 0x16, 0xd7, 0xdf, 0xc3, 0xaa, 0x93, 0xf6, 0x42, 0x2c, 0x0f, 0xad, 0x37, 0x7f, 0xbd, - 0x5f, 0x63, 0x48, 0x3d, 0xfc, 0xe1, 0xcd, 0x95, 0x6b, 0xbc, 0xbd, 0x72, 0x8d, 0xbf, 0xaf, 0xdc, - 0xda, 0xef, 0xd7, 0x6e, 0xed, 0xcf, 0x6b, 0xd7, 0x78, 0x7b, 0xed, 0xd6, 0xde, 0x5d, 0xbb, 0xb5, - 0x9f, 0x7a, 0xda, 0x9b, 0x3e, 0xe3, 0x79, 0x1a, 0x2c, 0xe2, 0x34, 0x98, 0x06, 0x51, 0x09, 0x22, - 0xd1, 0x4d, 0xce, 0xa7, 0xdd, 0x64, 0xd4, 0x5d, 0x4d, 0x35, 0x6a, 0xe0, 0xfb, 0xfe, 0xf9, 0xbf, - 0x01, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x21, 0xa2, 0x09, 0x2c, 0x08, 0x00, 0x00, + // 894 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0xcf, 0x6f, 0xe3, 0x44, + 0x18, 0xad, 0x63, 0x27, 0x8d, 0x3f, 0xa7, 0xed, 0x6a, 0x54, 0x75, 0x47, 0x15, 0x32, 0x26, 0x80, + 0x64, 0x56, 0x6c, 0x22, 0xc2, 0x65, 0x59, 0x7e, 0x48, 0xed, 0x06, 0xd8, 0x68, 0x37, 0x5d, 0x34, + 0x29, 0x7b, 0x80, 0xd3, 0x38, 0x9d, 0xa4, 0xa6, 0x8e, 0x6d, 0xd9, 0x4e, 0x49, 0xfa, 0x57, 0x70, + 0xe0, 0xc6, 0x85, 0x3f, 0x67, 0x8f, 0x7b, 0xdc, 0x13, 0x42, 0xed, 0x81, 0x7f, 0x03, 0xcd, 0x37, + 0x76, 0xe2, 0x14, 0xab, 0xa8, 0xa7, 0xce, 0xfb, 0xe6, 0xbd, 0x37, 0xf3, 0x4d, 0xdf, 0x4c, 0x0c, + 0x7b, 0x69, 0xc6, 0xb3, 0xd4, 0x0f, 0x27, 0x51, 0x27, 0x4e, 0xa2, 0x2c, 0x22, 0xe6, 0xaa, 0x70, + 0xf8, 0x78, 0xea, 0x67, 0xe7, 0x73, 0xaf, 0x33, 0x8e, 0x66, 0xdd, 0x69, 0x34, 0x8d, 0xba, 0xc8, + 0xf0, 0xe6, 0x13, 0x44, 0x08, 0x70, 0xa4, 0x94, 0xed, 0x7f, 0x34, 0xb0, 0x46, 0xe7, 0xf3, 0xc9, + 0x24, 0x10, 0xcf, 0x05, 0x8f, 0xc9, 0x23, 0x30, 0x5e, 0x8a, 0x49, 0x46, 0x35, 0x47, 0x73, 0xad, + 0xde, 0x41, 0x67, 0xbd, 0x52, 0x89, 0xc5, 0x90, 0x43, 0x3e, 0x85, 0x3a, 0xf3, 0xa7, 0xe7, 0x19, + 0xad, 0xdd, 0x49, 0x56, 0x24, 0xf2, 0x00, 0xf4, 0x17, 0x62, 0x49, 0x75, 0x47, 0x73, 0x35, 0x26, + 0x87, 0x64, 0x1f, 0xea, 0xaf, 0x79, 0x30, 0x17, 0xd4, 0xc0, 0x9a, 0x02, 0xe4, 0x00, 0x1a, 0xcf, + 0x05, 0xda, 0xd6, 0x1d, 0xcd, 0xd5, 0x59, 0x8e, 0xc8, 0x2e, 0xd4, 0x46, 0x57, 0xb4, 0x81, 0xb5, + 0xda, 0xe8, 0x4a, 0xaa, 0x4f, 0xe6, 0x41, 0x90, 0xd2, 0x6d, 0x2c, 0x29, 0x40, 0x28, 0x6c, 0x33, + 0x71, 0x29, 0x92, 0x54, 0xd0, 0xa6, 0xa3, 0xb9, 0x4d, 0x56, 0xc0, 0xf6, 0xbb, 0x1a, 0xb4, 0xf2, + 0x6d, 0x31, 0x1e, 0x4e, 0x05, 0x79, 0x0f, 0xcc, 0x41, 0x3a, 0xca, 0x92, 0xd3, 0x65, 0x2c, 0xb0, + 0xdf, 0x26, 0x5b, 0x17, 0xf2, 0xe5, 0x6a, 0xab, 0xe5, 0x1e, 0x81, 0x71, 0x9a, 0x08, 0x81, 0xfb, + 0xbf, 0xe3, 0x60, 0x24, 0x47, 0xb6, 0x3a, 0xf4, 0xc3, 0xbc, 0x2d, 0x39, 0xc4, 0x0a, 0x5f, 0x60, + 0x47, 0xb2, 0xc2, 0x17, 0x84, 0x80, 0x31, 0xf4, 0xc3, 0x94, 0x36, 0x1c, 0xdd, 0x6d, 0x31, 0x1c, + 0x63, 0x8d, 0x2f, 0x64, 0x47, 0xaa, 0xc6, 0x17, 0x58, 0x63, 0xd1, 0xaf, 0x29, 0x6d, 0x3a, 0xba, + 0xab, 0x33, 0x1c, 0xaf, 0x5b, 0x37, 0xb1, 0x98, 0xb7, 0x7e, 0x00, 0x8d, 0x21, 0x5f, 0xbc, 0x14, + 0x21, 0x05, 0x75, 0x70, 0x0a, 0x49, 0xf6, 0x77, 0x01, 0x9f, 0xa6, 0xd4, 0x72, 0x74, 0xb7, 0xc9, + 0x14, 0x90, 0x07, 0xf5, 0xea, 0x52, 0x24, 0x01, 0x8f, 0x69, 0x0b, 0x77, 0x55, 0x40, 0x39, 0xf3, + 0x63, 0xe8, 0x4f, 0xa2, 0x64, 0x46, 0x77, 0xd4, 0x4c, 0x0e, 0xe5, 0x0a, 0x4c, 0xa4, 0xf3, 0x20, + 0xa3, 0xbb, 0x8e, 0xee, 0x6a, 0x2c, 0x47, 0xed, 0x3f, 0x4c, 0x30, 0x47, 0xf2, 0x3c, 0x06, 0xe1, + 0x24, 0x22, 0x4f, 0xa0, 0x71, 0x72, 0x76, 0x39, 0xe4, 0x31, 0xd5, 0x1c, 0xdd, 0xb5, 0x7a, 0x4e, + 0xf9, 0xac, 0x0a, 0x56, 0x47, 0x51, 0xbe, 0x0d, 0xb3, 0x64, 0xc9, 0x72, 0x3e, 0x39, 0x02, 0x73, + 0xe8, 0x87, 0xaf, 0x79, 0x20, 0xc5, 0x35, 0x14, 0x7f, 0x58, 0x29, 0x5e, 0xb1, 0x94, 0x7e, 0xad, + 0x42, 0x0b, 0xbe, 0xc8, 0x2d, 0xf4, 0xbb, 0x2c, 0x0a, 0x56, 0x61, 0x51, 0x60, 0xf2, 0x3d, 0x58, + 0x7d, 0x9e, 0x71, 0x99, 0x02, 0x69, 0x62, 0xa0, 0xc9, 0xc7, 0x95, 0x26, 0x25, 0x9e, 0xb2, 0x29, + 0x2b, 0x49, 0x1f, 0x40, 0xfe, 0x67, 0x9e, 0x85, 0x99, 0xf4, 0xa9, 0xa3, 0xcf, 0x47, 0xd5, 0x87, + 0xb1, 0xa2, 0x29, 0x9b, 0x92, 0x8e, 0x7c, 0x09, 0xdb, 0x23, 0xff, 0x0a, 0xb7, 0xd2, 0x40, 0x8b, + 0x0f, 0x2a, 0x2d, 0x72, 0x8e, 0xd2, 0x17, 0x0a, 0x32, 0x82, 0xbd, 0x72, 0xe6, 0xa5, 0xc9, 0x36, + 0x9a, 0x7c, 0x52, 0x6d, 0xb2, 0xc9, 0x55, 0x66, 0xb7, 0x1d, 0x88, 0x03, 0xd6, 0x71, 0x10, 0x8d, + 0x2f, 0x4e, 0xe6, 0x33, 0x4f, 0x24, 0x78, 0xcf, 0x74, 0x56, 0x2e, 0x91, 0x1e, 0xec, 0x1f, 0x8d, + 0xc7, 0xf3, 0x84, 0x67, 0xe2, 0x95, 0xf7, 0x8b, 0x18, 0x67, 0x39, 0xd5, 0x44, 0x6a, 0xe5, 0x1c, + 0xe9, 0x00, 0x39, 0x8a, 0xe3, 0x24, 0x5a, 0x6c, 0x28, 0x54, 0x94, 0x2b, 0x66, 0xc8, 0x21, 0x34, + 0x4f, 0xb9, 0x17, 0x88, 0x67, 0x61, 0x46, 0x2d, 0xcc, 0xe9, 0x0a, 0xcb, 0xab, 0x8d, 0xe3, 0x13, + 0x3e, 0x13, 0x18, 0x6f, 0x93, 0xad, 0x0b, 0xc4, 0x06, 0x38, 0xf5, 0x67, 0x62, 0x24, 0xc6, 0x51, + 0x78, 0x86, 0x19, 0xd7, 0x59, 0xa9, 0x72, 0xf8, 0x05, 0x58, 0xa5, 0x74, 0xca, 0xbb, 0x7b, 0x21, + 0x96, 0xf8, 0x42, 0x98, 0x4c, 0x0e, 0xe5, 0x8d, 0xba, 0xc4, 0x87, 0xab, 0xa6, 0x1e, 0x2e, 0x04, + 0x4f, 0x6b, 0x4f, 0xb4, 0xc3, 0xaf, 0x60, 0x77, 0x33, 0x9b, 0xf7, 0x56, 0x6f, 0xc4, 0xf2, 0x5e, + 0xea, 0x6f, 0xe0, 0xc1, 0xed, 0x3c, 0xfe, 0x9f, 0xde, 0x28, 0xeb, 0xbf, 0x86, 0xbd, 0x5b, 0x39, + 0xbc, 0x97, 0xfc, 0x29, 0xb4, 0xca, 0x19, 0xbc, 0x97, 0xf6, 0x67, 0xd8, 0xaf, 0x8a, 0x5e, 0x85, + 0xc7, 0xe3, 0xb2, 0x87, 0xd5, 0x7b, 0xf8, 0xdf, 0x77, 0x18, 0x1d, 0x4a, 0xe6, 0xed, 0xdf, 0x35, + 0x68, 0xad, 0x22, 0x2e, 0x7f, 0x77, 0x6c, 0x00, 0x79, 0x50, 0x1e, 0x4f, 0xc5, 0xa0, 0x8f, 0xe6, + 0x06, 0x2b, 0x55, 0xe4, 0x03, 0x88, 0x61, 0x19, 0xf4, 0xf3, 0x9d, 0x16, 0x50, 0x76, 0x70, 0x34, + 0x1e, 0x0f, 0xce, 0xf0, 0x57, 0x60, 0x87, 0x29, 0xb0, 0x99, 0x36, 0xe3, 0x76, 0xda, 0x0e, 0xa0, + 0xd1, 0xf7, 0x70, 0xaa, 0x8e, 0x53, 0x39, 0x6a, 0x1f, 0xc3, 0x4e, 0x79, 0x57, 0x29, 0xf9, 0x0c, + 0x0c, 0xf9, 0x37, 0x7f, 0x35, 0x1f, 0x56, 0x5d, 0xd0, 0x17, 0x62, 0x79, 0x6c, 0xbc, 0xf9, 0xeb, + 0xfd, 0x2d, 0x86, 0xd4, 0xe3, 0x1f, 0xde, 0x5c, 0xdb, 0xda, 0xdb, 0x6b, 0x5b, 0xfb, 0xfb, 0xda, + 0xde, 0xfa, 0xed, 0xc6, 0xde, 0xfa, 0xf3, 0xc6, 0xd6, 0xde, 0xde, 0xd8, 0x5b, 0xef, 0x6e, 0xec, + 0xad, 0x9f, 0x7a, 0xa5, 0x4f, 0x81, 0x19, 0xcf, 0x12, 0x7f, 0x11, 0x25, 0xfe, 0xd4, 0x0f, 0x0b, + 0x10, 0x8a, 0x6e, 0x7c, 0x31, 0xed, 0xc6, 0x5e, 0x77, 0xb5, 0x94, 0xd7, 0xc0, 0xcf, 0x82, 0xcf, + 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x9c, 0xb8, 0x34, 0x7e, 0x63, 0x08, 0x00, 0x00, } func (m *ShuffleHeap) Marshal() (dAtA []byte, err error) { @@ -1042,6 +1059,20 @@ func (m *StatsInfoKey) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.DbName) > 0 { + i -= len(m.DbName) + copy(dAtA[i:], m.DbName) + i = encodeVarintStatsinfo(dAtA, i, uint64(len(m.DbName))) + i-- + dAtA[i] = 0x2a + } + if len(m.TableName) > 0 { + i -= len(m.TableName) + copy(dAtA[i:], m.TableName) + i = encodeVarintStatsinfo(dAtA, i, uint64(len(m.TableName))) + i-- + dAtA[i] = 0x22 + } if m.AccId != 0 { i = encodeVarintStatsinfo(dAtA, i, uint64(m.AccId)) i-- @@ -1313,6 +1344,14 @@ func (m *StatsInfoKey) ProtoSize() (n int) { if m.AccId != 0 { n += 1 + sovStatsinfo(uint64(m.AccId)) } + l = len(m.TableName) + if l > 0 { + n += 1 + l + sovStatsinfo(uint64(l)) + } + l = len(m.DbName) + if l > 0 { + n += 1 + l + sovStatsinfo(uint64(l)) + } return n } @@ -3127,6 +3166,70 @@ func (m *StatsInfoKey) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TableName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStatsinfo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthStatsinfo + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStatsinfo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TableName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DbName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStatsinfo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthStatsinfo + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStatsinfo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DbName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStatsinfo(dAtA[iNdEx:]) diff --git a/pkg/vm/engine/disttae/db.go b/pkg/vm/engine/disttae/db.go index 664256a8d4092..fc75838d15e99 100644 --- a/pkg/vm/engine/disttae/db.go +++ b/pkg/vm/engine/disttae/db.go @@ -33,7 +33,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl" "github.com/matrixorigin/matrixone/pkg/util/fault" - "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" @@ -359,7 +358,7 @@ func (e *Engine) init(ctx context.Context) error { e.catalog.Store(newcache) // clear all tables in global stats. - e.globalStats.clearTables() + //e.globalStats.clearTables() return nil } @@ -651,19 +650,21 @@ func (e *Engine) GetOrCreateLatestPart( func (e *Engine) LazyLoadLatestCkp( ctx context.Context, - tblHandler engine.Relation) (*logtailreplay.Partition, error) { + tableID uint64, + tableName string, + dbID uint64, + dbName string) (*logtailreplay.Partition, error) { var ( - ok bool - tbl *txnTable + //ok bool + //tbl *txnTable ) + //if tbl, ok = tblHandler.(*txnTable); !ok { + // delegate := tblHandler.(*txnTableDelegate) + // tbl = delegate.origin + //} - if tbl, ok = tblHandler.(*txnTable); !ok { - delegate := tblHandler.(*txnTableDelegate) - tbl = delegate.origin - } - - part := e.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId) + part := e.GetOrCreateLatestPart(dbID, tableID) cache := e.GetLatestCatalogCache() if err := part.ConsumeCheckpoints( @@ -673,10 +674,10 @@ func (e *Engine) LazyLoadLatestCkp( ctx, e.service, checkpoint, - tbl.tableId, - tbl.tableName, - tbl.db.databaseId, - tbl.db.databaseName, + tableID, + tableName, + dbID, + dbName, e.mp, e.fs) if err != nil { @@ -690,7 +691,15 @@ func (e *Engine) LazyLoadLatestCkp( } }() for _, entry := range entries { - if err = consumeEntry(ctx, tbl.primarySeqnum, e, cache, state, entry, false); err != nil { + //the primarySeqnum is not used in the consume of checkpoint. + if err = consumeEntry( + ctx, + -1, + e, + cache, + state, + entry, + false); err != nil { return err } } diff --git a/pkg/vm/engine/disttae/engine.go b/pkg/vm/engine/disttae/engine.go index 8e44d18436817..c8c53e7eaef29 100644 --- a/pkg/vm/engine/disttae/engine.go +++ b/pkg/vm/engine/disttae/engine.go @@ -777,7 +777,7 @@ func (e *Engine) cleanMemoryTableWithTable(dbId, tblId uint64) { // When removing the PartitionState, you need to remove the tid in globalStats, // When re-subscribing, globalStats will wait for the PartitionState to be consumed before updating the object state. - e.globalStats.RemoveTid(tblId) + //e.globalStats.RemoveTid(tblId) logutil.Debugf("clean memory table of tbl[dbId: %d, tblId: %d]", dbId, tblId) } diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index ca958473b2a69..d13af075072b2 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -94,6 +94,7 @@ const ( Subscribed Unsubscribing Unsubscribed + SubRspTableNotExist FakeLogtailServerAddress = "fake address for ut" ) @@ -368,33 +369,30 @@ func (c *PushClient) validLogTailMustApplied(snapshotTS timestamp.Timestamp) { } func (c *PushClient) skipSubscribeIf( - ctx context.Context, tbl *txnTable) (bool, *logtailreplay.PartitionState) { + ctx context.Context, + tableID uint64, + dbID uint64) (bool, *logtailreplay.PartitionState) { //if table has been subscribed, return quickly. - if ps, ok := c.isSubscribed(tbl.db.databaseId, tbl.tableId); ok { + if ps, ok := c.isSubscribed(dbID, tableID); ok { return true, ps } - - // no need to subscribe a view - // for issue #19192 - if strings.ToUpper(tbl.relKind) == "V" { - return true, nil - } - return false, nil } func (c *PushClient) toSubscribeTable( ctx context.Context, - tbl *txnTable) (ps *logtailreplay.PartitionState, err error) { + tableID uint64, + tableName string, + dbID uint64, + dbName string, +) (ps *logtailreplay.PartitionState, err error) { var skip bool - if skip, ps = c.skipSubscribeIf(ctx, tbl); skip { + if skip, ps = c.skipSubscribeIf(ctx, tableID, dbID); skip { return ps, nil } - tableId := tbl.tableId - - state, err := c.toSubIfUnsubscribed(ctx, tbl.db.databaseId, tableId) + state, err := c.toSubIfUnsubscribed(ctx, dbID, tableID) if err != nil { return nil, err } @@ -407,19 +405,26 @@ func (c *PushClient) toSubscribeTable( case Subscribing: //wait for the next possible state: subscribed or unsubscribed or unsubscribing or Subscribing - state, err = c.waitUntilSubscribingChanged(ctx, tbl.db.databaseId, tableId) + state, err = c.waitUntilSubscribingChanged(ctx, dbID, tableID) if err != nil { return nil, err } case SubRspReceived: - state, ps, err = c.loadAndConsumeLatestCkp(ctx, tableId, tbl) + state, ps, err = c.loadAndConsumeLatestCkp(ctx, tableID, tableName, dbID, dbName) if err != nil { return nil, err } + case SubRspTableNotExist: + c.subscribed.clearTable(dbID, tableID) + return nil, moerr.NewInternalErrorf( + ctx, + "%s to subcribe table:%d failed, since table is not exist", + logTag, + tableID) case Unsubscribing: //need to wait for unsubscribe succeed for making the subscribe and unsubscribe execute in order, // otherwise the partition state will leak log tails. - state, err = c.waitUntilUnsubscribingChanged(ctx, tbl.db.databaseId, tableId) + state, err = c.waitUntilUnsubscribingChanged(ctx, dbID, tableID) if err != nil { return nil, err } @@ -427,7 +432,7 @@ func (c *PushClient) toSubscribeTable( case Subscribed: //if table has been subscribed, return the ps. logutil.Infof("%s subscribe tbl[db: %d, tbl: %d, %s] succeed", - logTag, tbl.db.databaseId, tbl.tableId, tbl.tableName) + logTag, dbID, tableID, tableName) return case Unsubscribed: @@ -629,6 +634,17 @@ func (c *PushClient) receiveOneLogtail(ctx context.Context, e *Engine) error { logutil.Errorf("%s dispatch unsubscribe response failed, err: %s", logTag, err) return err } + } else if errRsp := resp.response.GetError(); errRsp != nil { + status := errRsp.GetStatus() + if uint16(status.GetCode()) == moerr.OkExpectedEOB { + c.subscribed.setTableSubNotExist( + errRsp.GetTable().GetDbId(), + errRsp.GetTable().GetTbId()) + } + logutil.Errorf("%s subsribe table:%d failed, err:%s", + logTag, + errRsp.GetTable().GetTbId(), + status.GetMessage()) } return nil } @@ -999,10 +1015,10 @@ func (c *PushClient) doGCUnusedTable(ctx context.Context) { // never unsubscribe the mo_databases, mo_tables, mo_columns. continue } - if !c.eng.safeToUnsubscribe(k) { - logutil.Infof("%s table [%d-%d] is not safe to unsubscribe", logTag, v.DBID, k) - continue - } + //if !c.eng.safeToUnsubscribe(k) { + // logutil.Infof("%s table [%d-%d] is not safe to unsubscribe", logTag, v.DBID, k) + // continue + //} if !v.LatestTime.After(shouldClean) { if v.SubState != Subscribed { continue @@ -1178,21 +1194,23 @@ func (s *subscribedTable) isSubscribed(dbId, tblId uint64) bool { // consumeLatestCkp consume the latest checkpoint of the table if not consumed, and return the latest partition state. func (c *PushClient) loadAndConsumeLatestCkp( ctx context.Context, - tableId uint64, - tbl *txnTable, + tableID uint64, + tableName string, + dbID uint64, + dbName string, ) (SubscribeState, *logtailreplay.PartitionState, error) { c.subscribed.mutex.Lock() defer c.subscribed.mutex.Unlock() - v, exist := c.subscribed.m[tableId] + v, exist := c.subscribed.m[tableID] if exist && (v.SubState == SubRspReceived || v.SubState == Subscribed) { - part, err := c.eng.LazyLoadLatestCkp(ctx, tbl) + part, err := c.eng.LazyLoadLatestCkp(ctx, tableID, tableName, dbID, dbName) if err != nil { return InvalidSubState, nil, err } //update latest time - c.subscribed.m[tableId] = SubTableStatus{ - DBID: tbl.db.databaseId, + c.subscribed.m[tableID] = SubTableStatus{ + DBID: dbID, SubState: Subscribed, LatestTime: time.Now(), } @@ -1203,13 +1221,13 @@ func (c *PushClient) loadAndConsumeLatestCkp( if !c.subscriber.ready() { return Unsubscribed, nil, moerr.NewInternalError(ctx, "log tail subscriber is not ready") } - c.subscribed.m[tableId] = SubTableStatus{ - DBID: tbl.db.databaseId, + c.subscribed.m[tableID] = SubTableStatus{ + DBID: dbID, SubState: Subscribing, } - if err := c.subscribeTable(ctx, api.TableID{DbId: tbl.db.databaseId, TbId: tableId}); err != nil { + if err := c.subscribeTable(ctx, api.TableID{DbId: dbID, TbId: tableID}); err != nil { //restore the table status. - delete(c.subscribed.m, tableId) + delete(c.subscribed.m, tableID) return Unsubscribed, nil, err } return Subscribing, nil, nil @@ -1325,6 +1343,24 @@ func (c *PushClient) Disconnect() error { return c.subscriber.logTailClient.Close() } +func (s *subscribedTable) setTableSubNotExist(dbId, tblId uint64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.m[tblId] = SubTableStatus{ + DBID: dbId, + SubState: SubRspTableNotExist, + LatestTime: time.Now(), + } + logutil.Infof("%s subscribe tbl[db: %d, tbl: %d] response failed, since table is not exist", + logTag, dbId, tblId) +} + +func (s *subscribedTable) clearTable(dbId, tblId uint64) { + s.mutex.Lock() + defer s.mutex.Unlock() + delete(s.m, tblId) +} + func (s *subscribedTable) setTableSubscribed(dbId, tblId uint64) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 7585213c7a358..7715b3697578e 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -150,7 +150,7 @@ func WithUpdateWorkerFactor(f int) GlobalStatsOption { } // WithStatsUpdater set the update function to update stats info. -func WithStatsUpdater(f func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { +func WithStatsUpdater(f func(context.Context, *logtailreplay.PartitionState, pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { return func(s *GlobalStats) { s.statsUpdater = f } @@ -226,7 +226,7 @@ type GlobalStats struct { // statsUpdate is the function which updates the stats info. // If it is nil, set it to doUpdate. - statsUpdater func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool + statsUpdater func(context.Context, *logtailreplay.PartitionState, pb.StatsInfoKey, *pb.StatsInfo) bool // for test only currently. approxObjectNumUpdater func() int64 @@ -671,7 +671,7 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) { func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) { statser := statistic.StatsInfoFromContext(wrapKey.Ctx) crs := new(perfcounter.CounterSet) - + //logutil.Infof("xxxx updateTableStats,start to update table stats, table ID: %d", wrapKey.Key.TableID) if !gs.shouldUpdate(wrapKey.Key) { return } @@ -689,24 +689,43 @@ func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) { gs.mu.cond.Broadcast() } - // wait until the table's logtail has been updated. - logtailUpdated, err := gs.waitLogtailUpdated(wrapKey.Key.TableID) + // Get the latest partition state of the table. + //Notice that for snapshot read, subscribing the table maybe failed since the invalid table id, + //We should handle this case in next PR if needed. + ps, err := gs.engine.pClient.toSubscribeTable( + wrapKey.Ctx, + wrapKey.Key.TableID, + wrapKey.Key.TableName, + wrapKey.Key.DatabaseID, + wrapKey.Key.DbName) if err != nil { - logutil.Errorf("wait logtail updated error: %s, table ID: %d", err, wrapKey.Key.TableID) - broadcastWithoutUpdate() - return - } - if !logtailUpdated { - logutil.Warnf("logtail not updated, table ID: %d", wrapKey.Key.TableID) + logutil.Errorf( + "updateTableStats:Failed to subsrcribe table:%d, err:%s", + wrapKey.Key.TableID, + err) broadcastWithoutUpdate() return } + //logutil.Infof("xxxx updateTableStats,subscribe table success, table ID: %d", wrapKey.Key.TableID) + + // wait until the table's logtail has been updated. + //logtailUpdated, err := gs.waitLogtailUpdated(wrapKey.Key.TableID) + //if err != nil { + // logutil.Errorf("wait logtail updated error: %s, table ID: %d", err, wrapKey.Key.TableID) + // broadcastWithoutUpdate() + // return + //} + //if !logtailUpdated { + // logutil.Warnf("logtail not updated, table ID: %d", wrapKey.Key.TableID) + // broadcastWithoutUpdate() + // return + //} stats := plan2.NewStatsInfo() newCtx := perfcounter.AttachS3RequestKey(wrapKey.Ctx, crs) if gs.statsUpdater != nil { - updated = gs.statsUpdater(newCtx, wrapKey.Key, stats) + updated = gs.statsUpdater(newCtx, ps, wrapKey.Key, stats) } statser.AddBuildPlanStatsS3Request(statistic.S3Request{ List: crs.FileService.S3.List.Load(), @@ -720,9 +739,11 @@ func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) { gs.mu.Lock() defer gs.mu.Unlock() if updated { + logutil.Infof("xxxx updateTableStats,update table stats success, table ID: %d", wrapKey.Key.TableID) gs.mu.statsInfoMap[wrapKey.Key] = stats gs.broadcastStats(wrapKey.Key) } else if _, ok := gs.mu.statsInfoMap[wrapKey.Key]; !ok { + logutil.Infof("xxxx updateTableStats,update table stats failed, table ID: %d", wrapKey.Key.TableID) gs.mu.statsInfoMap[wrapKey.Key] = nil } @@ -730,7 +751,7 @@ func (gs *GlobalStats) updateTableStats(wrapKey pb.StatsInfoKeyWithContext) { gs.mu.cond.Broadcast() } -func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { +func (gs *GlobalStats) doUpdate(ctx context.Context, ps *logtailreplay.PartitionState, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { table := gs.engine.GetLatestCatalogCache().GetTableById(key.AccId, key.DatabaseID, key.TableID) // table or its definition is nil, means that the table is created but not committed yet. if table == nil || table.TableDef == nil { @@ -738,8 +759,8 @@ func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats return false } - partitionState := gs.engine.GetOrCreateLatestPart(key.DatabaseID, key.TableID).Snapshot() - approxObjectNum := int64(partitionState.ApproxDataObjectsNum()) + //partitionState := gs.engine.GetOrCreateLatestPart(key.DatabaseID, key.TableID).Snapshot() + approxObjectNum := int64(ps.ApproxDataObjectsNum()) if gs.approxObjectNumUpdater == nil && approxObjectNum == 0 { // There are no objects flushed yet. return false @@ -749,7 +770,7 @@ func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats now := timestamp.Timestamp{PhysicalTime: time.Now().UnixNano()} req := newUpdateStatsRequest( table.TableDef, - partitionState, + ps, gs.engine.fs, types.TimestampToTS(now), approxObjectNum, diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index 354c52cd645ef..d782cae6cc820 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -34,6 +34,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" ) func TestGetStats(t *testing.T) { @@ -42,7 +43,11 @@ func TestGetStats(t *testing.T) { defer cancel() gs := NewGlobalStats(ctx, nil, nil, WithUpdateWorkerFactor(4), - WithStatsUpdater(func(_ context.Context, key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { + WithStatsUpdater(func( + _ context.Context, + ps *logtailreplay.PartitionState, + key statsinfo.StatsInfoKey, + info *statsinfo.StatsInfo) bool { info.BlockNumber = 20 return true }), @@ -163,7 +168,8 @@ func TestUpdateStats(t *testing.T) { TableID: 1001, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(ctx, k, stats) + ps := logtailreplay.NewPartitionState("", true, 1001) + updated := e.globalStats.doUpdate(ctx, ps, k, stats) assert.False(t, updated) }) }) @@ -182,7 +188,8 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(ctx, k, stats) + ps := logtailreplay.NewPartitionState("", true, tid) + updated := e.globalStats.doUpdate(ctx, ps, k, stats) assert.False(t, updated) }) }) @@ -201,7 +208,8 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(ctx, k, stats) + ps := logtailreplay.NewPartitionState("", true, tid) + updated := e.globalStats.doUpdate(ctx, ps, k, stats) assert.True(t, updated) }, WithApproxObjectNumUpdater(func() int64 { return 10 diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 9652b3171a424..ea618411ffb9c 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -161,18 +161,18 @@ func (tbl *txnTable) Stats(ctx context.Context, sync bool) (*pb.StatsInfo, error logutil.Errorf("failed to get partition state of table %d: %v", tbl.tableId, err) return nil, err } - if !tbl.db.op.IsSnapOp() { - return tbl.getEngine().Stats(ctx, pb.StatsInfoKey{ - AccId: tbl.accountId, - DatabaseID: tbl.db.databaseId, - TableID: tbl.tableId, - }, sync), nil - } - info, err := tbl.stats(ctx) - if err != nil { - return nil, err - } - return info, nil + //logutil.Infof("xxxx Stats start, table:%s, tableID:%d, txn:%s, sync:%v", + // tbl.tableName, tbl.tableId, tbl.db.op.Txn().DebugString(), sync) + return tbl.getEngine().Stats(ctx, pb.StatsInfoKey{ + AccId: tbl.accountId, + DatabaseID: tbl.db.databaseId, + TableID: tbl.tableId, + TableName: tbl.tableName, + DbName: tbl.db.databaseName, + }, sync), nil + //logutil.Infof("xxxx Stats end, table:%s, tableID:%d, txn:%s, stats:%p", + // tbl.tableName, tbl.tableId, tbl.db.op.Txn().DebugString(), stats) + //} } func (tbl *txnTable) stats(ctx context.Context) (*pb.StatsInfo, error) { @@ -1964,9 +1964,9 @@ func (tbl *txnTable) getPartitionState( func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.PartitionState, err error) { eng := tbl.eng.(*Engine) var createdInTxn bool - defer func() { - eng.globalStats.notifyLogtailUpdate(tbl.tableId, err == nil && !createdInTxn) - }() + //defer func() { + // eng.globalStats.notifyLogtailUpdate(tbl.tableId, err == nil && !createdInTxn) + //}() createdInTxn, err = tbl.isCreatedInTxn(ctx) if err != nil { @@ -1976,7 +1976,18 @@ func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.Part return } - ps, err = eng.PushClient().toSubscribeTable(ctx, tbl) + // no need to subscribe a view + // for issue #19192 + if strings.ToUpper(tbl.relKind) == "V" { + return + } + + ps, err = eng.PushClient().toSubscribeTable( + ctx, + tbl.tableId, + tbl.tableName, + tbl.db.databaseId, + tbl.db.databaseName) return ps, err } @@ -2184,12 +2195,12 @@ func (tbl *txnTable) primaryKeysMayBeChanged( return false, moerr.NewInternalErrorNoCtx("primary key modification is not allowed in snapshot transaction") } - - //snap, err := tbl.getPartitionState(ctx) - //if err != nil { - // return false, err - //} - part, err := tbl.eng.(*Engine).LazyLoadLatestCkp(ctx, tbl) + part, err := tbl.eng.(*Engine).LazyLoadLatestCkp( + ctx, + tbl.tableId, + tbl.tableName, + tbl.db.databaseId, + tbl.db.databaseName) if err != nil { return false, err } diff --git a/pkg/vm/engine/tae/logtail/service/server.go b/pkg/vm/engine/tae/logtail/service/server.go index 848b50d113e9c..dac1835f499a8 100644 --- a/pkg/vm/engine/tae/logtail/service/server.go +++ b/pkg/vm/engine/tae/logtail/service/server.go @@ -16,6 +16,7 @@ package service import ( "context" + "fmt" "sync/atomic" "time" @@ -529,8 +530,15 @@ func (s *LogtailServer) getSubLogtailPhase( closeCB() } s.logger.Error("fail to fetch table total logtail", zap.Error(subErr), zap.Any("table", table)) + + subErrCode, ok := moerr.GetMoErrCode(subErr) + if !ok { + subErrCode = moerr.ErrInternal + } + subErrMsg := fmt.Sprintf("fail to fetch table total logtail:%s", subErr.Error()) + if err := sub.session.SendErrorResponse( - sendCtx, table, moerr.ErrInternal, "fail to fetch table total logtail", + sendCtx, table, subErrCode, subErrMsg, ); err != nil { err = moerr.AttachCause(sendCtx, err) s.logger.Error("fail to send error response", zap.Error(err)) diff --git a/pkg/vm/engine/test/partition_state_test.go b/pkg/vm/engine/test/partition_state_test.go index 9f626ef4b6aaf..c90008189b643 100644 --- a/pkg/vm/engine/test/partition_state_test.go +++ b/pkg/vm/engine/test/partition_state_test.go @@ -270,7 +270,11 @@ func Test_Bug_CheckpointInsertObjectOverwrittenMergeDeletedObject(t *testing.T) engineTbl, err := engineDB.Relation(ctx, tableName, nil) require.Nil(t, err) - _, err = disttaeEngine.Engine.LazyLoadLatestCkp(ctx, engineTbl) + _, err = disttaeEngine.Engine.LazyLoadLatestCkp(ctx, + engineTbl.GetTableID(ctx), + engineTbl.GetTableName(), + engineTbl.GetDBID(ctx), + databaseName) require.Nil(t, err) stats, err := disttaeEngine.GetPartitionStateStats(ctx, database.GetID(), rel.ID()) @@ -500,7 +504,11 @@ func Test_EmptyObjectStats(t *testing.T) { engineTbl, err := engineDB.Relation(p.Ctx, tableName, nil) require.Nil(t, err) - _, err = disttaeEngine.Engine.LazyLoadLatestCkp(p.Ctx, engineTbl) + _, err = disttaeEngine.Engine.LazyLoadLatestCkp(p.Ctx, + engineTbl.GetTableID(p.Ctx), + engineTbl.GetTableName(), + engineTbl.GetDBID(p.Ctx), + databaseName) require.Nil(t, err) stats, err := disttaeEngine.GetPartitionStateStats(p.Ctx, database.GetID(), rel.ID()) diff --git a/proto/statsinfo.proto b/proto/statsinfo.proto index e7c64e08a1b9a..6803ce9beb67d 100644 --- a/proto/statsinfo.proto +++ b/proto/statsinfo.proto @@ -71,6 +71,8 @@ message StatsInfoKey { uint64 DatabaseID = 1; uint64 TableID = 2; uint32 AccId = 3; + string TableName = 4; + string DbName = 5; } message StatsInfoKeys {