Skip to content

Commit

Permalink
fix: 修复ddc_blob references字段过大导致慢查询问题 #2181
Browse files Browse the repository at this point in the history
* fix: 修复ddc_blob references字段过大导致慢查询问题 #2181

* fix: 增加ExpiredDdcRefCleanupJob测试 #2181

* fix: 增加ExpiredDdcRefCleanupJob测试 #2181

* fix: 增加DdcBlobCleanupJob测试 #2181

* fix: 增加ref对应的blob数量超过限制日志输出 #2181
  • Loading branch information
cnlkl authored Nov 15, 2024
1 parent 8527c4f commit 1b264d0
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,10 @@ data class TDdcBlob(
* ref类型引用 ref/{bucket}/{key}
* blob类型引用 blob/{blobId}
*/
var references: Set<String> = emptySet()
@Deprecated("性能原因不在使用,关联关系存在TDdcReferenceBlob中")
var references: Set<String> = emptySet(),
/**
* 被引用的次数,计数为0时将被清理
*/
var refCount: Long = 0L,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.tencent.bkrepo.ddc.model

import org.springframework.data.mongodb.core.index.CompoundIndex
import org.springframework.data.mongodb.core.index.CompoundIndexes
import org.springframework.data.mongodb.core.mapping.Document


@Document("ddc_blob_ref")
@CompoundIndexes(
CompoundIndex(
name = "projectId_repoName_blobId_ref_idx",
def = "{'projectId': 1, 'repoName': 1, 'blobId': 1, 'ref': 1}",
unique = true,
background = true
),
CompoundIndex(
name = "projectId_repoName_ref_blobId_idx",
def = "{'projectId': 1, 'repoName': 1, 'ref': 1, 'blobId': 1}",
unique = true,
background = true
),
)
data class TDdcBlobRef(
var id: String? = null,
var projectId: String,
var repoName: String,
/**
* blob blake3 hash
*/
var blobId: String,
/**
* 引用了该blob的ref或blob,ref的inline blob中直接或间接引用的所有blob都会关联到ref
* ref类型引用 ref/{bucket}/{key}
* blob类型引用 blob/{blobId}
*/
var ref: String,
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ data class Blob(
val size: Long,
val blobId: ContentHash,
val contentId: ContentHash,
val references: Set<String> = emptySet(),
val sha1: String? = null,
) {
companion object {
Expand All @@ -52,7 +51,6 @@ data class Blob(
size = size,
blobId = ContentHash.fromHex(blobId),
contentId = ContentHash.fromHex(contentId),
references = references,
sha1 = sha1
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.tencent.bkrepo.ddc.repository

import com.tencent.bkrepo.common.mongo.dao.simple.SimpleMongoDao
import com.tencent.bkrepo.ddc.model.TDdcBlobRef
import com.tencent.bkrepo.ddc.utils.DdcUtils.buildRef
import org.springframework.dao.DuplicateKeyException
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.stereotype.Repository

@Repository
class BlobRefRepository : SimpleMongoDao<TDdcBlobRef>() {
fun addRefToBlob(projectId: String, repoName: String, bucket: String, refKey: String, blobIds: Set<String>) {
if (blobIds.size > DEFAULT_BLOB_SIZE_LIMIT) {
val ref = "$projectId/$repoName/${buildRef(bucket, refKey)}"
logger.error("blobs of ref[$ref] exceed size limit, size[${blobIds.size}]]")
}
blobIds.forEach {
try {
insert(
TDdcBlobRef(
id = null,
projectId = projectId,
repoName = repoName,
blobId = it,
ref = buildRef(bucket, refKey)
)
)
} catch (ignore: DuplicateKeyException) {
}
}
}

fun removeRefFromBlob(projectId: String, repoName: String, bucket: String, refKey: String): List<TDdcBlobRef> {
val criteria = Criteria
.where(TDdcBlobRef::projectId.name).isEqualTo(projectId)
.and(TDdcBlobRef::repoName.name).isEqualTo(repoName)
.and(TDdcBlobRef::ref.name).isEqualTo(buildRef(bucket, refKey))
return determineMongoTemplate().findAllAndRemove(Query(criteria), TDdcBlobRef::class.java)
}

companion object {
private const val DEFAULT_BLOB_SIZE_LIMIT = 20
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.bkrepo.ddc.repository

import com.tencent.bkrepo.common.mongo.dao.simple.SimpleMongoDao
import com.tencent.bkrepo.ddc.model.TDdcBlob
import com.tencent.bkrepo.ddc.utils.DdcUtils.buildRef
import org.springframework.dao.DuplicateKeyException
import org.springframework.data.domain.Sort
import org.springframework.data.mongodb.core.query.Criteria
Expand Down Expand Up @@ -76,11 +77,11 @@ class BlobRepository : SimpleMongoDao<TDdcBlob>() {
}
}

fun addRefToBlob(projectId: String, repoName: String, bucket: String, refKey: String, blobIds: Set<String>) {
fun incRefCount(projectId: String, repoName: String, blobIds: Set<String>, inc: Long = 1L) {
val criteria = TDdcBlob::projectId.isEqualTo(projectId)
.and(TDdcBlob::repoName.name).isEqualTo(repoName)
.and(TDdcBlob::blobId.name).inValues(blobIds)
val update = Update().addToSet(TDdcBlob::references.name, "ref/$bucket/$refKey")
val update = Update().inc(TDdcBlob::refCount.name, inc)
updateMulti(Query(criteria), update)
}

Expand All @@ -89,7 +90,7 @@ class BlobRepository : SimpleMongoDao<TDdcBlob>() {
val criteria = Criteria
.where(TDdcBlob::projectId.name).isEqualTo(projectId)
.and(TDdcBlob::repoName.name).isEqualTo(repoName)
.and(TDdcBlob::references.name).inValues("ref/${bucket}/${refKey}")
.and(TDdcBlob::references.name).inValues(buildRef(bucket, refKey))
val update = Update().pull(TDdcBlob::references.name, refKey)
updateMulti(Query(criteria), update)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ import com.tencent.bkrepo.ddc.exception.BlobNotFoundException
import com.tencent.bkrepo.ddc.model.TDdcBlob
import com.tencent.bkrepo.ddc.pojo.Blob
import com.tencent.bkrepo.ddc.pojo.Reference
import com.tencent.bkrepo.ddc.repository.BlobRefRepository
import com.tencent.bkrepo.ddc.repository.BlobRepository
import org.springframework.stereotype.Service
import java.time.LocalDateTime

@Service
class BlobService(
private val blobRepository: BlobRepository,
private val blobRefRepository: BlobRefRepository,
private val nodeService: NodeService,
private val storageManager: StorageManager
) {
Expand Down Expand Up @@ -107,10 +109,15 @@ class BlobService(
}

fun addRefToBlobs(ref: Reference, blobIds: Set<String>) {
blobRepository.addRefToBlob(ref.projectId, ref.repoName, ref.bucket, ref.key.toString(), blobIds)
blobRefRepository.addRefToBlob(ref.projectId, ref.repoName, ref.bucket, ref.key.toString(), blobIds)
blobRepository.incRefCount(ref.projectId, ref.repoName, blobIds)
}

fun removeRefFromBlobs(projectId: String, repoName: String, bucket: String, key: String) {
val blobIds = HashSet<String>()
blobRefRepository.removeRefFromBlob(projectId, repoName, bucket, key).mapTo(blobIds) { it.blobId }
blobRepository.incRefCount(projectId, repoName, blobIds, -1L)
// 兼容旧逻辑,所有blob的references字段为空后可以移除该逻辑
blobRepository.removeRefFromBlob(projectId, repoName, bucket, key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ object DdcUtils {
fun TDdcRef.fullPath() = "/$bucket/$key"

fun TDdcBlob.fullPath() = "/blobs/$blobId"

fun buildRef(bucket: String, key: String): String = "ref/$bucket/$key"
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.exists
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.lte
import org.springframework.data.mongodb.core.query.size
import org.springframework.stereotype.Component
import java.time.LocalDateTime
Expand All @@ -53,11 +54,16 @@ class DdcBlobCleanupJob(
override fun buildQuery(): Query {
val referencesCriteria = Criteria().orOperator(
Blob::references.exists(false),
Blob::references.size(0)
Blob::references.size(0),
)
val refCountCriteria = Criteria().orOperator(
Blob::refCount.exists(false),
Blob::refCount.lte(0L),
)
// 最近1小时上传的blob不清理,避免将未finalized的ref所引用的blob清理掉
val criteria = Criteria().andOperator(
referencesCriteria,
refCountCriteria,
Criteria.where("lastModifiedDate").lt(LocalDateTime.now().minusHours(1L))
)
val query = Query(criteria)
Expand Down Expand Up @@ -93,7 +99,8 @@ class DdcBlobCleanupJob(
val projectId: String,
val repoName: String,
val blobId: String,
val references: Set<String> = emptySet()
val references: Set<String> = emptySet(),
val refCount: Long = 0L,
)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,37 @@ class ExpiredDdcRefCleanupJob(
)
}

// 从blob ref列表中移除ref
val refKey = "ref/${row.bucket}/${row.key}"
val criteria = Criteria
removeBlobRef(row)
}

private fun removeBlobRef(row: Ref) {
// 移除blob与ref关联关系
val refKey = buildRef(row.bucket, row.key)
var criteria = Criteria
.where(BlobRef::projectId.name).isEqualTo(row.projectId)
.and(BlobRef::repoName.name).isEqualTo(row.repoName)
.and(BlobRef::ref.name).isEqualTo(refKey)
val blobIds = HashSet<String>()
mongoTemplate.findAllAndRemove(
Query(criteria),
BlobRef::class.java,
COLLECTION_NAME_BLOB_REF
).mapTo(blobIds) { it.blobId }

// 减少blob引用计数
criteria = Criteria
.where(DdcBlobCleanupJob.Blob::projectId.name).isEqualTo(row.projectId)
.and(DdcBlobCleanupJob.Blob::repoName.name).isEqualTo(row.repoName)
.and(DdcBlobCleanupJob.Blob::blobId.name).inValues(blobIds)
var update = Update().inc(DdcBlobCleanupJob.Blob::refCount.name, -1L)
mongoTemplate.updateMulti(Query(criteria), update, DdcBlobCleanupJob.COLLECTION_NAME)

// 兼容旧逻辑,从blob ref列表中移除ref,所有blob的reference字段都清空后可移除该代码
criteria = Criteria
.where(DdcBlobCleanupJob.Blob::projectId.name).isEqualTo(row.projectId)
.and(DdcBlobCleanupJob.Blob::repoName.name).isEqualTo(row.repoName)
.and(DdcBlobCleanupJob.Blob::references.name).inValues(refKey)
val update = Update().pull(DdcBlobCleanupJob.Blob::references.name, refKey)
update = Update().pull(DdcBlobCleanupJob.Blob::references.name, refKey)
mongoTemplate.updateMulti(Query(criteria), update, DdcBlobCleanupJob.COLLECTION_NAME)
}

Expand All @@ -104,8 +128,21 @@ class ExpiredDdcRefCleanupJob(
val inlineBlob: Binary? = null
)

data class BlobRef(
val id: String,
val projectId: String,
val repoName: String,
val blobId: String,
val ref: String,
)

companion object {
const val COLLECTION_NAME = "ddc_ref"
const val COLLECTION_NAME_LEGACY = "ddc_legacy_ref"
const val COLLECTION_NAME_BLOB_REF = "ddc_blob_ref"

fun buildRef(bucket: String, key: String): String {
return "ref/$bucket/$key"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.tencent.bkrepo.job.batch.task.ddc

import com.tencent.bkrepo.common.metadata.service.log.OperateLogService
import com.tencent.bkrepo.common.metadata.service.node.NodeService
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.job.batch.JobBaseTest
import com.tencent.bkrepo.job.batch.task.ddc.DdcTestUtils.insertBlob
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo


@DisplayName("DDC Blob清理测试")
@DataMongoTest
class DdcBlobCleanupJobTest @Autowired constructor(
private val blobCleanupJob: DdcBlobCleanupJob,
private val mongoTemplate: MongoTemplate,
) : JobBaseTest() {

@MockBean
private lateinit var nodeService: NodeService

@MockBean
lateinit var operateLogService: OperateLogService

@Test
fun test() {
generateData()
blobCleanupJob.start()
listOf(blobIds[0], blobIds[2], blobIds[6]).forEach { assertFalse(blobExists(it)) }
listOf(blobIds[1], blobIds[3], blobIds[4], blobIds[5], blobIds[7]).forEach { assertTrue(blobExists(it)) }
verify(nodeService, times(3)).deleteNode(any())
}

private fun blobExists(id: String): Boolean {
return mongoTemplate.exists(Query(Criteria.where(ID).isEqualTo(id)), DdcBlobCleanupJob.COLLECTION_NAME)
}

private fun generateData() {
val ref = "ref/legacytexture/366f955a863296d36ce99868d015752ad0e29f81"
// 旧数据
mongoTemplate.insertBlob(blobIds[0], "1", emptySet())
mongoTemplate.insertBlob(blobIds[1], "2", setOf(ref))

// 混合
mongoTemplate.insertBlob(blobIds[2], "3", emptySet(), 0L)
mongoTemplate.insertBlob(blobIds[3], "4", setOf(ref), 1L)
mongoTemplate.insertBlob(blobIds[4], "7", setOf(ref), 0L)
mongoTemplate.insertBlob(blobIds[5], "8", emptySet(), 1L)

// 新数据
mongoTemplate.insertBlob(blobIds[6], "5", null, 0L)
mongoTemplate.insertBlob(blobIds[7], "6", null, 1L)
}

companion object {
private val blobIds = Array(8) {
"11bb03c690c9fab0c5e3ed9" + it
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.tencent.bkrepo.job.batch.task.ddc

import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.job.UT_PROJECT_ID
import com.tencent.bkrepo.job.UT_REPO_NAME
import com.tencent.bkrepo.job.batch.task.ddc.ExpiredDdcRefCleanupJob.Companion.COLLECTION_NAME_BLOB_REF
import org.bson.types.ObjectId
import org.springframework.data.mongodb.core.MongoTemplate
import java.time.LocalDateTime

object DdcTestUtils {
fun MongoTemplate.insertBlob(id: String, blobId: String, ref: Set<String>?, refCount: Long? = null) {
val blob = mutableMapOf<String, Any>(
ID to ObjectId(id),
"projectId" to UT_PROJECT_ID,
"repoName" to UT_REPO_NAME,
"blobId" to blobId,
"lastModifiedDate" to LocalDateTime.now().minusHours(2L)
)
ref?.let { blob["references"] = it }
refCount?.let { blob["refCount"] = it }
insert(blob, DdcBlobCleanupJob.COLLECTION_NAME)
}

fun MongoTemplate.insertBlobRef(id: String, blobId: String, ref: String): ExpiredDdcRefCleanupJob.BlobRef {
val blobRef = ExpiredDdcRefCleanupJob.BlobRef(
id = id,
projectId = UT_PROJECT_ID,
repoName = UT_REPO_NAME,
blobId = blobId,
ref = ref,
)
insert(blobRef, COLLECTION_NAME_BLOB_REF)
return blobRef
}
}
Loading

0 comments on commit 1b264d0

Please sign in to comment.