Skip to content

Commit 77c7956

Browse files
authored
feat: PhotoSync is performed on a dedicated UploadQueue (#1471)
2 parents 6896800 + fe6ad03 commit 77c7956

16 files changed

+412
-190
lines changed

kDrive/AppRouter.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public struct AppRouter: AppNavigable {
660660

661661
// Resolving an upload queue will restart it if this is the first time
662662
@InjectService var uploadService: UploadServiceable
663-
uploadService.rebuildUploadQueueFromObjectsInRealm()
663+
uploadService.rebuildUploadQueue()
664664
}
665665

666666
// MARK: RouterFileNavigable

kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -504,11 +504,11 @@ extension PhotoSyncSettingsViewController: FooterButtonDelegate {
504504
sender.setLoading(false)
505505
}
506506

507-
DispatchQueue.global(qos: .userInitiated).async {
507+
DispatchQueue.global(qos: .default).async {
508508
// Add new pictures to be uploaded and reload upload queue
509509
self.photoLibraryUploader.scheduleNewPicturesForUpload()
510510
@InjectService var uploadService: UploadServiceable
511-
uploadService.rebuildUploadQueueFromObjectsInRealm()
511+
uploadService.rebuildUploadQueue()
512512
}
513513
}
514514
}

kDriveCore/DI/FactoryService.swift

+17-4
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ public enum FactoryService {
124124
factoryParameters: nil,
125125
resolver: resolver)
126126
},
127+
Factory(type: UploadQueueDelegate.self) { _, _ in
128+
UploadParallelismOrchestrator()
129+
},
127130
Factory(type: BGTaskScheduler.self) { _, _ in
128131
BGTaskScheduler.shared
129132
},
@@ -232,12 +235,22 @@ public enum FactoryService {
232235
}
233236

234237
static var uploadQueues: [FactoryWithIdentifier] {
235-
let globalUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
236-
UploadQueue()
238+
let globalUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
239+
let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
240+
forCustomTypeIdentifier: nil,
241+
factoryParameters: nil,
242+
resolver: resolver)
243+
244+
return UploadQueue(delegate: uploadQueueDelegate)
237245
}
238246

239-
let photoUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
240-
PhotoUploadQueue()
247+
let photoUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
248+
let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
249+
forCustomTypeIdentifier: nil,
250+
factoryParameters: nil,
251+
resolver: resolver)
252+
253+
return PhotoUploadQueue(delegate: uploadQueueDelegate)
241254
}
242255

243256
let services = [

kDriveCore/Data/Models/Upload/UploadFile.swift

+4
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ public final class UploadFile: Object, UploadFilable {
138138
return Constants.formatFileSize(size)
139139
}
140140

141+
public var isPhotoSyncUpload: Bool {
142+
type == .phAsset
143+
}
144+
141145
var type: UploadFileType {
142146
return UploadFileType(rawValue: rawType) ?? .unknown
143147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Infomaniak kDrive - iOS App
3+
Copyright (C) 2025 Infomaniak Network SA
4+
5+
This program is free software: you can redistribute it and/or modify
6+
it under the terms of the GNU General Public License as published by
7+
the Free Software Foundation, either version 3 of the License, or
8+
(at your option) any later version.
9+
10+
This program is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
19+
import Foundation
20+
import InfomaniakDI
21+
22+
public final class UploadParallelismOrchestrator {
23+
@LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable
24+
@LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable
25+
@LazyInjectService private var appContextService: AppContextServiceable
26+
27+
private let serialEventQueue = DispatchQueue(
28+
label: "com.infomaniak.drive.upload-parallelism-orchestrator.event",
29+
qos: .default
30+
)
31+
32+
private var uploadParallelismHeuristic: WorkloadParallelismHeuristic?
33+
private var memoryPressureObserver: DispatchSourceMemoryPressure?
34+
35+
private var availableParallelism: Int {
36+
guard let uploadParallelismHeuristic else {
37+
return ParallelismDefaults.reducedParallelism
38+
}
39+
return uploadParallelismHeuristic.currentParallelism
40+
}
41+
42+
private lazy var allQueues = [globalUploadQueue, photoUploadQueue]
43+
44+
public init() {
45+
setupObservation()
46+
}
47+
48+
private func setupObservation() {
49+
serialEventQueue.async {
50+
self.observeMemoryWarnings()
51+
self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self)
52+
}
53+
}
54+
55+
func observeMemoryWarnings() {
56+
guard appContextService.context == .fileProviderExtension else {
57+
return
58+
}
59+
60+
let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main)
61+
memoryPressureObserver = source
62+
source.setEventHandler { [weak self] in
63+
guard let self else { return }
64+
let event: DispatchSource.MemoryPressureEvent = source.data
65+
switch event {
66+
case DispatchSource.MemoryPressureEvent.normal:
67+
Log.uploadQueue("MemoryPressureEvent normal", level: .info)
68+
case DispatchSource.MemoryPressureEvent.warning:
69+
Log.uploadQueue("MemoryPressureEvent warning", level: .info)
70+
case DispatchSource.MemoryPressureEvent.critical:
71+
Log.uploadQueue("MemoryPressureEvent critical", level: .error)
72+
serialEventQueue.async {
73+
@InjectService var uploadService: UploadServiceable
74+
uploadService.rescheduleRunningOperations()
75+
}
76+
default:
77+
break
78+
}
79+
}
80+
source.resume()
81+
}
82+
83+
private func computeUploadParallelismPerQueueAndApply() {
84+
serialEventQueue.async {
85+
let currentAvailableParallelism = self.availableParallelism
86+
Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)")
87+
88+
let activeQueues = self.allQueues.filter(\.isActive)
89+
let inactiveQueues = self.allQueues.filter { !$0.isActive }
90+
91+
assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match")
92+
93+
inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) }
94+
95+
Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial")
96+
guard !activeQueues.isEmpty else {
97+
Log.uploadQueue("No active queues")
98+
return
99+
}
100+
101+
let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count)
102+
Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)")
103+
activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) }
104+
}
105+
}
106+
}
107+
108+
extension UploadParallelismOrchestrator: UploadQueueDelegate {
109+
public func operationQueueBecameEmpty(_ queue: UploadQueue) {
110+
computeUploadParallelismPerQueueAndApply()
111+
}
112+
113+
public func operationQueueNoLongerEmpty(_ queue: UploadQueue) {
114+
computeUploadParallelismPerQueueAndApply()
115+
}
116+
}
117+
118+
extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate {
119+
public func parallelismShouldChange(value: Int) {
120+
computeUploadParallelismPerQueueAndApply()
121+
}
122+
}

kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift

+31-15
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@
1919
import Foundation
2020
import InfomaniakDI
2121

22+
public enum ParallelismDefaults {
23+
static let reducedParallelism = 2
24+
25+
static let serial = 1
26+
}
27+
2228
/// Delegate protocol of UploadParallelismHeuristic
23-
protocol ParallelismHeuristicDelegate: AnyObject {
29+
public protocol ParallelismHeuristicDelegate: AnyObject {
2430
/// This method is called with a new parallelism to apply each time to the uploadQueue
2531
/// - Parameter value: The new parallelism value to use
2632
func parallelismShouldChange(value: Int)
@@ -30,61 +36,71 @@ protocol ParallelismHeuristicDelegate: AnyObject {
3036
///
3137
/// Value can change depending on many factors, including thermal state battery or extension mode.
3238
/// Scaling is achieved given the number of active cores available.
33-
final class WorkloadParallelismHeuristic {
34-
/// With 2 Operations max, and a chuck of 1MiB max, the UploadQueue can spike to max 4MiB memory usage.
35-
private static let reducedParallelism = 2
36-
39+
public final class WorkloadParallelismHeuristic {
3740
@LazyInjectService private var appContextService: AppContextServiceable
3841

3942
private weak var delegate: ParallelismHeuristicDelegate?
4043

44+
private let serialEventQueue = DispatchQueue(
45+
label: "com.infomaniak.drive.parallelism-heuristic.event",
46+
qos: .default
47+
)
48+
4149
init(delegate: ParallelismHeuristicDelegate) {
4250
self.delegate = delegate
51+
setupObservation()
52+
}
4353

54+
private func setupObservation() {
4455
// Update on thermal change
4556
NotificationCenter.default.addObserver(
4657
self,
47-
selector: #selector(computeParallelism),
58+
selector: #selector(computeParallelismInQueue),
4859
name: ProcessInfo.thermalStateDidChangeNotification,
4960
object: nil
5061
)
5162

5263
// Update on low power mode
5364
NotificationCenter.default.addObserver(
5465
self,
55-
selector: #selector(computeParallelism),
66+
selector: #selector(computeParallelismInQueue),
5667
name: NSNotification.Name.NSProcessInfoPowerStateDidChange,
5768
object: nil
5869
)
5970

60-
// Update the value a first time
61-
computeParallelism()
71+
computeParallelismInQueue()
6272
}
6373

6474
deinit {
6575
NotificationCenter.default.removeObserver(self, name: ProcessInfo.thermalStateDidChangeNotification, object: nil)
6676
NotificationCenter.default.removeObserver(self, name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil)
6777
}
6878

69-
@objc private func computeParallelism() {
79+
@objc private func computeParallelismInQueue() {
80+
serialEventQueue.async {
81+
self.computeParallelism()
82+
}
83+
}
84+
85+
private func computeParallelism() {
7086
let processInfo = ProcessInfo.processInfo
7187

7288
// If the device is too hot we cool down now
7389
let thermalState = processInfo.thermalState
7490
guard thermalState != .critical else {
75-
currentParallelism = Self.reducedParallelism
91+
currentParallelism = ParallelismDefaults.reducedParallelism
7692
return
7793
}
7894

7995
// In low power mode, we reduce parallelism
8096
guard !processInfo.isLowPowerModeEnabled else {
81-
currentParallelism = Self.reducedParallelism
97+
currentParallelism = ParallelismDefaults.reducedParallelism
8298
return
8399
}
84100

85101
// In extension, to reduce memory footprint, we reduce drastically parallelism
86102
guard !appContextService.isExtension else {
87-
currentParallelism = Self.reducedParallelism
103+
currentParallelism = ParallelismDefaults.reducedParallelism
88104
return
89105
}
90106

@@ -93,14 +109,14 @@ final class WorkloadParallelismHeuristic {
93109

94110
// Beginning with .serious state, we start reducing the load on the system
95111
guard thermalState != .serious else {
96-
currentParallelism = max(Self.reducedParallelism, parallelism / 2)
112+
currentParallelism = max(ParallelismDefaults.reducedParallelism, parallelism / 2)
97113
return
98114
}
99115

100116
currentParallelism = parallelism
101117
}
102118

103-
public private(set) var currentParallelism = 0 {
119+
public private(set) var currentParallelism = ParallelismDefaults.reducedParallelism {
104120
didSet {
105121
delegate?.parallelismShouldChange(value: currentParallelism)
106122
}

kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ extension UploadService: UploadNotifiable {
4848

4949
public func sendFileUploadStateNotificationIfNeeded(with result: UploadCompletionResult) {
5050
Log.uploadQueue("sendFileUploadStateNotificationIfNeeded")
51-
serialQueue.async { [weak self] in
51+
serialEventQueue.async { [weak self] in
5252
guard let self else { return }
5353
guard let uploadFile = result.uploadFile,
5454
uploadFile.error != .taskRescheduled,

kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ extension UploadService: UploadObservable {
4949
fileId: String? = nil,
5050
using closure: @escaping (UploadFile, File?) -> Void) -> ObservationToken {
5151
var token: ObservationToken!
52-
serialQueue.sync { [weak self] in
52+
serialEventQueue.sync { [weak self] in
5353
guard let self else { return }
5454
let key = UUID()
5555
observations.didUploadFile[key] = { [weak self, weak observer] uploadFile, driveFile in
@@ -81,7 +81,7 @@ extension UploadService: UploadObservable {
8181
parentId: Int,
8282
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
8383
var token: ObservationToken!
84-
serialQueue.sync { [weak self] in
84+
serialEventQueue.sync { [weak self] in
8585
guard let self else { return }
8686
let key = UUID()
8787
observations.didChangeUploadCountInParent[key] = { [weak self, weak observer] updatedParentId, count in
@@ -111,7 +111,7 @@ extension UploadService: UploadObservable {
111111
driveId: Int,
112112
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
113113
var token: ObservationToken!
114-
serialQueue.sync { [weak self] in
114+
serialEventQueue.sync { [weak self] in
115115
guard let self else { return }
116116
let key = UUID()
117117
observations.didChangeUploadCountInDrive[key] = { [weak self, weak observer] updatedDriveId, count in

kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ extension UploadService: UploadPublishable {
4343
userId: Int,
4444
driveId: Int) {
4545
Log.uploadQueue("publishUploadCount")
46-
serialQueue.async { [weak self] in
46+
serialEventQueue.async { [weak self] in
4747
guard let self else { return }
4848
publishUploadCountInParent(parentId: parentId, userId: userId, driveId: driveId)
4949
publishUploadCountInDrive(userId: userId, driveId: driveId)
@@ -54,7 +54,7 @@ extension UploadService: UploadPublishable {
5454
userId: Int,
5555
driveId: Int) {
5656
Log.uploadQueue("publishUploadCountInParent")
57-
serialQueue.async { [weak self] in
57+
serialEventQueue.async { [weak self] in
5858
guard let self else { return }
5959

6060
let uploadCount = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId).count
@@ -69,7 +69,7 @@ extension UploadService: UploadPublishable {
6969
public func publishUploadCountInDrive(userId: Int,
7070
driveId: Int) {
7171
Log.uploadQueue("publishUploadCountInDrive")
72-
serialQueue.async { [weak self] in
72+
serialEventQueue.async { [weak self] in
7373
guard let self else { return }
7474
let uploadCount = getUploadingFiles(userId: userId, driveId: driveId).count
7575
for closure in observations.didChangeUploadCountInDrive.values {
@@ -84,7 +84,7 @@ extension UploadService: UploadPublishable {
8484
Log.uploadQueue("publishFileUploaded")
8585
logFileUploadedWithSuccess(for: result.uploadFile)
8686
sendFileUploadStateNotificationIfNeeded(with: result)
87-
serialQueue.async { [weak self] in
87+
serialEventQueue.async { [weak self] in
8888
guard let self else { return }
8989
for closure in observations.didUploadFile.values {
9090
guard let uploadFile = result.uploadFile, !uploadFile.isInvalidated else {

0 commit comments

Comments
 (0)