diff --git a/Mixin/Service/Audio/Playlist/PlaylistManager.swift b/Mixin/Service/Audio/Playlist/PlaylistManager.swift index 81c2a34938..c3c908d110 100644 --- a/Mixin/Service/Audio/Playlist/PlaylistManager.swift +++ b/Mixin/Service/Audio/Playlist/PlaylistManager.swift @@ -167,8 +167,8 @@ class PlaylistManager: NSObject { name: MessageDAO.didInsertMessageNotification, object: nil) notificationCenter.addObserver(self, - selector: #selector(messageDAOWillDeleteMessage(_:)), - name: MessageDAO.willDeleteMessageNotification, + selector: #selector(messageWillDelete(_:)), + name: DeleteMessageWork.willDeleteNotification, object: nil) notificationCenter.addObserver(self, selector: #selector(conversationDAOWillClearConversation(_:)), @@ -773,8 +773,8 @@ extension PlaylistManager { loadLaterItemsIfNeeded() } - @objc private func messageDAOWillDeleteMessage(_ notification: Notification) { - guard let messageId = notification.userInfo?[MessageDAO.UserInfoKey.messageId] as? String else { + @objc private func messageWillDelete(_ notification: Notification) { + guard let messageId = notification.userInfo?[DeleteMessageWork.messageIdUserInfoKey] as? String else { return } guard case .conversation = source else { diff --git a/Mixin/Service/Job/AttachmentUploadJob.swift b/Mixin/Service/Job/AttachmentUploadJob.swift index a691700f01..7205e36b0c 100644 --- a/Mixin/Service/Job/AttachmentUploadJob.swift +++ b/Mixin/Service/Job/AttachmentUploadJob.swift @@ -78,7 +78,8 @@ class AttachmentUploadJob: AttachmentLoadingJob { return false } guard let fileUrl = fileUrl else { - MessageDAO.shared.deleteMessage(id: messageId) + let work = DeleteMessageWork(message: message) + WorkManager.general.addWork(work) return false } diff --git a/Mixin/UserInterface/Controllers/Chat/ConversationViewController.swift b/Mixin/UserInterface/Controllers/Chat/ConversationViewController.swift index b08a6f1189..ad0985e118 100644 --- a/Mixin/UserInterface/Controllers/Chat/ConversationViewController.swift +++ b/Mixin/UserInterface/Controllers/Chat/ConversationViewController.swift @@ -2683,10 +2683,8 @@ extension ConversationViewController { guard let weakSelf = self, let indexPath = weakSelf.dataSource.indexPath(where: { $0.messageId == message.messageId }) else { return } - let (deleted, childMessageIds) = MessageDAO.shared.deleteMessage(id: message.messageId) - if deleted { - ReceiveMessageService.shared.stopRecallMessage(item: message, childMessageIds: childMessageIds) - } + let work = DeleteMessageWork(message: message) + WorkManager.general.addWork(work) DispatchQueue.main.sync { _ = weakSelf.dataSource?.removeViewModel(at: indexPath) weakSelf.tableView.reloadData() diff --git a/Mixin/UserInterface/Controllers/Home/DatabaseUpgradeViewController.swift b/Mixin/UserInterface/Controllers/Home/DatabaseUpgradeViewController.swift index 1491aceecc..16051d537a 100644 --- a/Mixin/UserInterface/Controllers/Home/DatabaseUpgradeViewController.swift +++ b/Mixin/UserInterface/Controllers/Home/DatabaseUpgradeViewController.swift @@ -41,6 +41,7 @@ class DatabaseUpgradeViewController: UIViewController { AppGroupContainer.migrateIfNeeded() TaskDatabase.reloadCurrent() UserDatabase.reloadCurrent() + WorkDatabase.reloadCurrent() if !AppGroupUserDefaults.Database.isSentSenderKeyCleared { UserDatabase.current.clearSentSenderKey() diff --git a/Mixin/UserInterface/Controllers/Home/HomeViewController.swift b/Mixin/UserInterface/Controllers/Home/HomeViewController.swift index d6bd4f7fc7..b658c9beb8 100644 --- a/Mixin/UserInterface/Controllers/Home/HomeViewController.swift +++ b/Mixin/UserInterface/Controllers/Home/HomeViewController.swift @@ -145,6 +145,7 @@ class HomeViewController: UIViewController { if AppGroupUserDefaults.User.hasRecoverMedia { ConcurrentJobQueue.shared.addJob(job: RecoverMediaJob()) } + WorkManager.general.wakeUpPersistedWorks(with: [DeleteMessageWork.self]) initializeFTSIfNeeded() refreshExternalSchemesIfNeeded() } diff --git a/Mixin/UserInterface/Controllers/Login/LoginVerificationCodeViewController.swift b/Mixin/UserInterface/Controllers/Login/LoginVerificationCodeViewController.swift index 764c2c3151..17cbef5509 100644 --- a/Mixin/UserInterface/Controllers/Login/LoginVerificationCodeViewController.swift +++ b/Mixin/UserInterface/Controllers/Login/LoginVerificationCodeViewController.swift @@ -100,6 +100,7 @@ class LoginVerificationCodeViewController: VerificationCodeViewController { TaskDatabase.reloadCurrent() UserDatabase.reloadCurrent() + WorkDatabase.reloadCurrent() if AppGroupUserDefaults.User.isLogoutByServer { UserDatabase.current.clearSentSenderKey() AppGroupUserDefaults.Database.isSentSenderKeyCleared = true diff --git a/MixinServices/MixinServices/Database/User/DAO/ExpiredMessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/ExpiredMessageDAO.swift index 83433792d7..37aa6c08a2 100644 --- a/MixinServices/MixinServices/Database/User/DAO/ExpiredMessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/ExpiredMessageDAO.swift @@ -90,18 +90,16 @@ public final class ExpiredMessageDAO: UserDatabaseDAO { .limit(100) .fetchAll(db) let expiredMessages = try MessageDAO.shared.getFullMessages(messageIds: expiredMessageIds) - for id in expiredMessageIds { - let (deleted, childMessageIds) = try MessageDAO.shared.deleteMessage(id: id, with: db) + for message in expiredMessages { + let (deleted, childMessageIds) = try MessageDAO.shared.deleteMessage(message, with: db) if deleted { - if let message = expiredMessages.first(where: { $0.messageId == id }) { - ReceiveMessageService.shared.stopRecallMessage(item: message, childMessageIds: childMessageIds) - if message.status != MessageStatus.READ.rawValue { - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: message.conversationId) - } + ReceiveMessageService.shared.stopRecallMessage(item: message, childMessageIds: childMessageIds) + if message.status != MessageStatus.READ.rawValue { + try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: message.conversationId) } NotificationCenter.default.post(onMainThread: Self.expiredMessageDidDeleteNotification, object: nil, - userInfo: [Self.messageIdKey: id]) + userInfo: [Self.messageIdKey: message.messageId]) } } if !expiredMessageIds.isEmpty { diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift index 48da8ff655..24aed4db5e 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift @@ -14,7 +14,6 @@ public final class MessageDAO: UserDatabaseDAO { public static let shared = MessageDAO() - public static let willDeleteMessageNotification = Notification.Name("one.mixin.services.MessageDAO.willDeleteMessage") public static let didInsertMessageNotification = Notification.Name("one.mixin.services.did.insert.msg") public static let didRedecryptMessageNotification = Notification.Name("one.mixin.services.did.redecrypt.msg") public static let messageMediaStatusDidUpdateNotification = Notification.Name("one.mixin.services.MessageDAO.MessageMediaStatusDidUpdate") @@ -764,47 +763,53 @@ public final class MessageDAO: UserDatabaseDAO { } } - @discardableResult - public func deleteMessage(id: String) -> (deleted: Bool, childMessageIds: [String]) { - NotificationCenter.default.post(onMainThread: Self.willDeleteMessageNotification, - object: self, - userInfo: [UserInfoKey.messageId: id]) - var deleted = false - var childMessageIds: [String] = [] - db.write { (db) in - (deleted, childMessageIds) = try deleteMessage(id: id, with: db) - } - return (deleted, childMessageIds) - } - - func deleteMessage(id: String, with database: GRDB.Database) throws -> (deleted: Bool, childMessageIds: [String]) { - var deleteCount = 0 - var childMessageIds: [String] = [] - let conversationId: String? = try Message - .select(Message.column(of: .conversationId)) - .filter(Message.column(of: .messageId) == id) - .fetchOne(database) - deleteCount = try Message + func deleteMessage(_ message: MessageItem, with database: GRDB.Database) throws -> (deleted: Bool, childMessageIds: [String]) { + let id = message.messageId + let deleteCount = try Message .filter(Message.column(of: .messageId) == id) .deleteAll(database) try MessageMention .filter(MessageMention.column(of: .messageId) == id) .deleteAll(database) try deleteFTSContent(database, messageId: id) - childMessageIds = try TranscriptMessage + let childMessageIds: [String] = try TranscriptMessage .select(TranscriptMessage.column(of: .messageId)) .filter(TranscriptMessage.column(of: .transcriptId) == id) .fetchAll(database) try TranscriptMessage .filter(TranscriptMessage.column(of: .transcriptId) == id) .deleteAll(database) - if let conversationId = conversationId { - try PinMessageDAO.shared.delete(messageIds: [id], conversationId: conversationId, from: database) - try clearPinMessageContent(quoteMessageIds: [id], conversationId: conversationId, from: database) - } + try PinMessageDAO.shared.delete(messageIds: [id], conversationId: message.conversationId, from: database) + try clearPinMessageContent(quoteMessageIds: [id], conversationId: message.conversationId, from: database) return (deleteCount > 0, childMessageIds) } + public func delete(id: String, conversationId: String, completion: @escaping () -> Void) { + db.write { db in + try Message + .filter(Message.column(of: .messageId) == id) + .deleteAll(db) + try MessageMention + .filter(MessageMention.column(of: .messageId) == id) + .deleteAll(db) + try deleteFTSContent(db, messageId: id) + try PinMessageDAO.shared.delete(messageIds: [id], conversationId: conversationId, from: db) + try clearPinMessageContent(quoteMessageIds: [id], conversationId: conversationId, from: db) + db.afterNextTransactionCommit { _ in + completion() + } + } + } + + public func deleteLegacyMessage(with id: String) { + db.write { db in + try Message + .filter(Message.column(of: .messageId) == id) + .deleteAll(db) + try deleteFTSContent(db, messageId: id) + } + } + public func hasSentMessage(inConversationOf conversationId: String) -> Bool { let possibleStatus = [ MessageStatus.SENDING.rawValue, diff --git a/MixinServices/MixinServices/Database/User/DAO/TranscriptMessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/TranscriptMessageDAO.swift index 480c5e3ba5..85004e99b4 100644 --- a/MixinServices/MixinServices/Database/User/DAO/TranscriptMessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/TranscriptMessageDAO.swift @@ -190,4 +190,9 @@ public final class TranscriptMessageDAO: UserDatabaseDAO { } } + public func deleteTranscriptMessages(with transcriptId: String) { + db.delete(TranscriptMessage.self, + where: TranscriptMessage.column(of: .transcriptId) == transcriptId) + } + } diff --git a/MixinServices/MixinServices/Database/Work/WorkDAO.swift b/MixinServices/MixinServices/Database/Work/WorkDAO.swift new file mode 100644 index 0000000000..5f182c9273 --- /dev/null +++ b/MixinServices/MixinServices/Database/Work/WorkDAO.swift @@ -0,0 +1,35 @@ +import Foundation +import GRDB + +public class WorkDAO { + + public static let shared = WorkDAO() + + public var db: Database { + WorkDatabase.current + } + + public func save(work: PersistedWork, completion: @escaping () -> Void) { + db.write { db in + try work.save(db) + db.afterNextTransactionCommit { _ in + completion() + } + } + } + + public func works(with types: [String]) -> [PersistedWork] { + db.select(where: types.contains(PersistedWork.column(of: .type))) + } + + public func delete(id: String) { + db.delete(PersistedWork.self, where: PersistedWork.column(of: .id) == id) + } + + public func update(context: Data?, forWorkWith id: String) { + db.update(PersistedWork.self, + assignments: [PersistedWork.column(of: .context).set(to: context)], + where: PersistedWork.column(of: .id) == id) + } + +} diff --git a/MixinServices/MixinServices/Database/Work/WorkDatabase.swift b/MixinServices/MixinServices/Database/Work/WorkDatabase.swift new file mode 100644 index 0000000000..ce34edac01 --- /dev/null +++ b/MixinServices/MixinServices/Database/Work/WorkDatabase.swift @@ -0,0 +1,44 @@ +import GRDB + +public final class WorkDatabase: Database { + + public private(set) static var current: WorkDatabase! = try! WorkDatabase(url: AppGroupContainer.workDatabaseURL) + + public override class var config: Configuration { + var config = super.config + config.label = "Work" + return config + } + + public override var needsMigration: Bool { + try! pool.read({ (db) -> Bool in + let migrationsCompleted = try migrator.hasCompletedMigrations(db) + return !migrationsCompleted + }) + } + + private var migrator: DatabaseMigrator { + var migrator = DatabaseMigrator() + + migrator.registerMigration("create_table") { db in + try db.create(table: "works") { td in + td.column("id", .text).primaryKey().notNull() + td.column("type", .text).notNull() + td.column("context", .blob) + td.column("priority", .integer).notNull() + } + } + + return migrator + } + + public static func reloadCurrent() { + current = try! WorkDatabase(url: AppGroupContainer.workDatabaseURL) + current.migrate() + } + + private func migrate() { + try! migrator.migrate(pool) + } + +} diff --git a/MixinServices/MixinServices/Foundation/File Management/AppGroupContainer.swift b/MixinServices/MixinServices/Foundation/File Management/AppGroupContainer.swift index d667fd4ea1..ef372cc2c5 100644 --- a/MixinServices/MixinServices/Foundation/File Management/AppGroupContainer.swift +++ b/MixinServices/MixinServices/Foundation/File Management/AppGroupContainer.swift @@ -45,6 +45,10 @@ public enum AppGroupContainer { accountUrl.appendingPathComponent("task.db", isDirectory: false) } + public static var workDatabaseURL: URL { + accountUrl.appendingPathComponent("work.db", isDirectory: false) + } + @available(iOSApplicationExtension, unavailable) public static func migrateIfNeeded() { guard !AppGroupUserDefaults.isDocumentsMigrated else { diff --git a/MixinServices/MixinServices/Foundation/File Management/AttachmentContainer.swift b/MixinServices/MixinServices/Foundation/File Management/AttachmentContainer.swift index c99ef51d78..c226806f7f 100644 --- a/MixinServices/MixinServices/Foundation/File Management/AttachmentContainer.swift +++ b/MixinServices/MixinServices/Foundation/File Management/AttachmentContainer.swift @@ -60,22 +60,25 @@ public enum AttachmentContainer { } let url = AttachmentContainer.url(for: category, filename: mediaUrl) try? FileManager.default.removeItem(at: url) + Logger.general.debug(category: "AttachmentContainer", message: "\(url) deleted") if category == .videos { let thumbUrl = AttachmentContainer.videoThumbnailURL(videoFilename: mediaUrl) try? FileManager.default.removeItem(at: thumbUrl) + Logger.general.debug(category: "AttachmentContainer", message: "\(thumbUrl) deleted") } } public static func removeAll(transcriptId: String) { let url = Self.url(transcriptId: transcriptId, filename: nil) try? FileManager.default.removeItem(at: url) + Logger.general.debug(category: "AttachmentContainer", message: "\(url) deleted") } } public extension AttachmentContainer { - enum Category: CaseIterable { + enum Category: String, CaseIterable, Codable { case audios case files diff --git a/MixinServices/MixinServices/Foundation/User Defaults/AppGroupUserDefaults+User.swift b/MixinServices/MixinServices/Foundation/User Defaults/AppGroupUserDefaults+User.swift index 273d97bf84..dc18a4d1d2 100644 --- a/MixinServices/MixinServices/Foundation/User Defaults/AppGroupUserDefaults+User.swift +++ b/MixinServices/MixinServices/Foundation/User Defaults/AppGroupUserDefaults+User.swift @@ -84,6 +84,7 @@ extension AppGroupUserDefaults { || TaskDatabase.current.needsMigration || SignalDatabase.current.needsMigration || UserDatabase.current.needsMigration + || WorkDatabase.current.needsMigration } @Default(namespace: .user, key: Key.localVersion, defaultValue: uninitializedVersion) diff --git a/MixinServices/MixinServices/Services/WebSocket/Jobs/AttachmentLoadingJob.swift b/MixinServices/MixinServices/Services/WebSocket/Jobs/AttachmentLoadingJob.swift index 0a923f7b26..3c225c897e 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Jobs/AttachmentLoadingJob.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Jobs/AttachmentLoadingJob.swift @@ -24,6 +24,7 @@ open class AttachmentLoadingJob: AsynchronousJob { return } if weakSelf.isCancelled || !LoginManager.shared.isLoggedIn { + Logger.general.debug(category: "AttachmentLoadingJob", message: "\(weakSelf.jobId) is cancelled") weakSelf.finishJob() return } else if let error = error { diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/WebSocketService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/WebSocketService.swift index 87e95b65ec..28a55c58eb 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/WebSocketService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/WebSocketService.swift @@ -141,7 +141,7 @@ public class WebSocketService { case let .failure(error): if case .invalidRequestData = error { if let param = message.params, let messageId = param.messageId, messageId != messageId.lowercased() { - MessageDAO.shared.deleteMessage(id: messageId) + MessageDAO.shared.deleteLegacyMessage(with: messageId) JobDAO.shared.removeJob(jobId: message.id) } } diff --git a/MixinServices/MixinServices/Services/Work/DeleteMessageWork.swift b/MixinServices/MixinServices/Services/Work/DeleteMessageWork.swift new file mode 100644 index 0000000000..81627772a2 --- /dev/null +++ b/MixinServices/MixinServices/Services/Work/DeleteMessageWork.swift @@ -0,0 +1,157 @@ +import Foundation +import GRDB +import MixinServices + +public protocol DeletableMessage { + var messageId: String { get } + var conversationId: String { get } + var category: String { get } + var mediaUrl: String? { get } +} + +extension Message: DeletableMessage { + +} + +extension MessageItem: DeletableMessage { + +} + +/* + Straight execute + ┌───────────────┐ ┌───────────┐Completed┌────────────────┐ + │Init(preparing)├─►│Persistence├────────►│Delete DB Record│ + └───────────────┘ └───────────┘ └────┬───────────┘ + │ + ┌───────────┐Execute┌─────┐ │Completed + │Delete file│◄──────┤Ready│◄──┘ + └───────────┘ └─────┘ + + Awake from persistence + ┌────────────┐ ┌────────────────┐Completed┌───────────┐ + │Awake(ready)├─►│Delete DB Record├────────►│Delete file│ + └────────────┘ └────────────────┘ └───────────┘ + */ + +public final class DeleteMessageWork: Work { + + public enum Attachment: Codable { + case media(category: String, filename: String) + case transcript + } + + public enum Error: Swift.Error { + case invalidContext + } + + public static let willDeleteNotification = Notification.Name("one.mixin.services.DeleteMessageWork.willDelete") + public static let messageIdUserInfoKey = "msg" + + let messageId: String + let conversationId: String + let attachment: Attachment? + + @Synchronized(value: false) + private var hasDatabaseRecordDeleted: Bool + + public convenience init(message: DeletableMessage) { + let attachment: Attachment? + if ["_IMAGE", "_DATA", "_AUDIO", "_VIDEO"].contains(where: message.category.hasSuffix), let filename = message.mediaUrl { + attachment = .media(category: message.category, filename: filename) + } else if message.category.hasSuffix("_TRANSCRIPT") { + attachment = .transcript + } else { + attachment = nil + } + self.init(messageId: message.messageId, conversationId: message.conversationId, attachment: attachment, state: .preparing) + } + + private init(messageId: String, conversationId: String, attachment: Attachment?, state: State) { + self.messageId = messageId + self.conversationId = conversationId + self.attachment = attachment + super.init(id: "delete-message-\(messageId)", state: state) + } + + public override func start() { + state = .executing + if hasDatabaseRecordDeleted { + deleteFile() + state = .finished(.success) + } else { + MessageDAO.shared.delete(id: messageId, conversationId: conversationId) { + Logger.general.debug(category: "DeleteMessageWork", message: "\(self.messageId) Message deleted from database") + self.deleteFile() + self.state = .finished(.success) + } + } + } + + private func deleteFile() { + switch attachment { + case let .media(category, filename): + AttachmentContainer.removeMediaFiles(mediaUrl: filename, category: category) + case .transcript: + let transcriptId = messageId + let childMessageIds = TranscriptMessageDAO.shared.childrenMessageIds(transcriptId: transcriptId) + let jobIds = childMessageIds.map { transcriptMessageId in + AttachmentDownloadJob.jobId(transcriptId: transcriptId, messageId: transcriptMessageId) + } + for id in jobIds { + ConcurrentJobQueue.shared.cancelJob(jobId: id) + } + AttachmentContainer.removeAll(transcriptId: transcriptId) + TranscriptMessageDAO.shared.deleteTranscriptMessages(with: transcriptId) + case .none: + break + } + } + +} + +extension DeleteMessageWork: PersistableWork { + + private struct Context: Codable { + let messageId: String + let conversationId: String + let attachment: Attachment? + } + + public static let typeIdentifier: String = "delete_message" + + public var context: Data? { + let context = Context(messageId: messageId, + conversationId: conversationId, + attachment: attachment) + return try? JSONEncoder.default.encode(context) + } + + public var priority: PersistedWork.Priority { + .medium + } + + public convenience init(id: String, context: Data?) throws { + guard + let context = context, + let context = try? JSONDecoder.default.decode(Context.self, from: context) + else { + throw Error.invalidContext + } + self.init(messageId: context.messageId, + conversationId: context.conversationId, + attachment: context.attachment, + state: .ready) + } + + public func persistenceDidComplete() { + NotificationCenter.default.post(onMainThread: Self.willDeleteNotification, + object: self, + userInfo: [Self.messageIdUserInfoKey: messageId]) + MessageDAO.shared.delete(id: messageId, conversationId: conversationId) { + self.state = .ready + } + hasDatabaseRecordDeleted = true + Logger.general.debug(category: "DeleteMessageWork", message: "\(messageId) Message deleted from database") + } + +} diff --git a/MixinServices/MixinServices/Services/Work/PersistableWork.swift b/MixinServices/MixinServices/Services/Work/PersistableWork.swift new file mode 100644 index 0000000000..aa6bb89252 --- /dev/null +++ b/MixinServices/MixinServices/Services/Work/PersistableWork.swift @@ -0,0 +1,27 @@ +import Foundation + +public protocol PersistableWork: Work { + + static var typeIdentifier: String { get } + + var context: Data? { get } + var priority: PersistedWork.Priority { get } + + init(id: String, context: Data?) throws + + func updatePersistedContext() + func persistenceDidComplete() + +} + +extension PersistableWork { + + public func updatePersistedContext() { + WorkDAO.shared.update(context: context, forWorkWith: id) + } + + public func persistenceDidComplete() { + + } + +} diff --git a/MixinServices/MixinServices/Services/Work/PersistedWork.swift b/MixinServices/MixinServices/Services/Work/PersistedWork.swift new file mode 100644 index 0000000000..558978b2b7 --- /dev/null +++ b/MixinServices/MixinServices/Services/Work/PersistedWork.swift @@ -0,0 +1,41 @@ +import Foundation +import GRDB + +public struct PersistedWork: Codable, DatabaseColumnConvertible, PersistableRecord, MixinFetchableRecord { + + public struct Priority: RawRepresentable, Codable { + + public static let high = Priority(rawValue: 90) + public static let medium = Priority(rawValue: 50) + public static let low = Priority(rawValue: 10) + + public let rawValue: Int + + public init(rawValue: Int) { + self.rawValue = rawValue + } + + } + + public enum CodingKeys: CodingKey { + case id + case type + case context + case priority + } + + public static let databaseTableName = "works" + + public let id: String + public let type: String + public let context: Data? + public let priority: Priority + + public init(id: String, type: String, context: Data?, priority: Priority) { + self.id = id + self.type = type + self.context = context + self.priority = priority + } + +} diff --git a/MixinServices/MixinServices/Services/Work/Work.swift b/MixinServices/MixinServices/Services/Work/Work.swift new file mode 100644 index 0000000000..aa1234e6ae --- /dev/null +++ b/MixinServices/MixinServices/Services/Work/Work.swift @@ -0,0 +1,146 @@ +import Foundation + +protocol WorkStateMonitor: AnyObject { + func work(_ work: Work, stateDidChangeTo newState: Work.State) +} + +open class Work { + + public let id: String + + private let lock = NSRecursiveLock() + + private var _state: State + + private weak var stateMonitor: WorkStateMonitor? + + public init(id: String, state: State) { + self.id = id + self._state = state + } + + func setStateMonitor(_ monitor: WorkStateMonitor) -> Bool { + lock.lock() + defer { + lock.unlock() + } + if stateMonitor == nil { + stateMonitor = monitor + return true + } else { + return false + } + } + + // Override this if you want precisely control of state + open func start() { + state = .executing + do { + try main() + state = .finished(.success) + } catch { + state = .finished(.failed(error)) + } + } + + // Override this for straight synchronous works + open func main() throws { + + } + + open func cancel() { + state = .finished(.cancelled) + } + +} + +// MARK: - Equatable +extension Work: Equatable { + + public static func == (lhs: Work, rhs: Work) -> Bool { + lhs.id == rhs.id + } + +} + +// MARK: - Hashable +extension Work: Hashable { + + public func hash(into hasher: inout Hasher) { + hasher.combine(id) + } + +} + +// MARK: - CustomStringConvertible +extension Work: CustomStringConvertible { + + open var description: String { + id + } + +} + +// MARK: - States +extension Work { + + public enum State { + + public enum Result { + case success + case failed(Error) + case cancelled + } + + case preparing + case ready + case executing + case finished(Result) + + } + + public var state: State { + set { + lock.lock() + switch (_state, newValue) { + case (.preparing, .ready), (.ready, .executing), (.ready, .finished), (.executing, .finished): + _state = newValue + stateMonitor?.work(self, stateDidChangeTo: newValue) + default: + assertionFailure("Work's state shouldn't be set to \(newValue) from \(_state)") + } + lock.unlock() + } + get { + lock.lock() + let state = _state + lock.unlock() + return _state + } + } + + public var isReady: Bool { + if case .ready = state { + return true + } else { + return false + } + } + + public var isExecuting: Bool { + if case .executing = state { + return true + } else { + return false + } + } + + public var isFinished: Bool { + if case .finished = state { + return true + } else { + return false + } + } + +} diff --git a/MixinServices/MixinServices/Services/Work/WorkManager.swift b/MixinServices/MixinServices/Services/Work/WorkManager.swift new file mode 100644 index 0000000000..ea1136fd21 --- /dev/null +++ b/MixinServices/MixinServices/Services/Work/WorkManager.swift @@ -0,0 +1,146 @@ +import Foundation + +public class WorkManager { + + public static let general = WorkManager(label: "General", maxConcurrentWorkCount: 6) + + let maxConcurrentWorkCount: Int + let label: StaticString + + private let lock = NSRecursiveLock() + private let dispatchQueue: DispatchQueue + + private var executingWorks: Set = [] + private var pendingWorks: [Work] = [] + + var works: [Work] { + lock.lock() + var works = Array(executingWorks) + works.append(contentsOf: pendingWorks) + lock.unlock() + return works + } + + init(label: StaticString, maxConcurrentWorkCount: Int) { + self.label = label + self.maxConcurrentWorkCount = maxConcurrentWorkCount + let attributes: DispatchQueue.Attributes = maxConcurrentWorkCount == 1 ? [] : .concurrent + dispatchQueue = DispatchQueue(label: "one.mixin.services.WorkManager.\(label)", attributes: attributes) + } + + public func wakeUpPersistedWorks(with types: [PersistableWork.Type], completion: ((WorkManager) -> Void)? = nil) { + dispatchQueue.async { + let keyPairs = types.map { type in + (type.typeIdentifier, type) + } + let types = [String: PersistableWork.Type](uniqueKeysWithValues: keyPairs) + let identifiers = [String](types.keys) + for persisted in WorkDAO.shared.works(with: identifiers) { + guard let Work = types[persisted.type] else { + continue + } + do { + let work = try Work.init(id: persisted.id, context: persisted.context) + self.addWork(work, persistIfAvailable: false) + } catch { + Logger.general.error(category: "WorkManager", message: "[\(self.label)] Failed to init \(persisted)") + } + } + completion?(self) + } + } + + public func addWork(_ work: Work) { + addWork(work, persistIfAvailable: true) + } + + public func cancelAllWorks() { + Logger.general.debug(category: "WorkManager", message: "[\(label)] Will cancel all works") + works.forEach { $0.cancel() } + } + + public func cancelWork(with id: String) { + guard let work = works.first(where: { $0.id == id }) else { + Logger.general.debug(category: "WorkManager", message: "[\(label)] Cancel \(id) but finds nothing") + return + } + Logger.general.debug(category: "WorkManager", message: "[\(label)] Cancel \(id)") + work.cancel() + } + + private func addWork(_ work: Work, persistIfAvailable: Bool) { + guard work.setStateMonitor(self) else { + assertionFailure("Adding work to multiple manager is not supported") + return + } + lock.lock() + defer { + lock.unlock() + } + let isAlreadyScheduled = executingWorks.contains(work) || pendingWorks.contains(work) + guard !isAlreadyScheduled else { + Logger.general.warn(category: "WorkManager", message: "[\(label)] Add a duplicated work: \(work)") + return + } + if persistIfAvailable, let work = work as? PersistableWork { + let persisted = PersistedWork(id: work.id, + type: type(of: work).typeIdentifier, + context: work.context, + priority: work.priority) + WorkDAO.shared.save(work: persisted, + completion: work.persistenceDidComplete) + } + if work.isReady, executingWorks.count < maxConcurrentWorkCount { + Logger.general.debug(category: "WorkManager", message: "[\(label)] Start \(work) because of adding to queue") + executingWorks.insert(work) + dispatchQueue.async(execute: work.start) + } else { + Logger.general.debug(category: "WorkManager", message: "[\(label)] Pending \(work)") + pendingWorks.append(work) + } + } + +} + +extension WorkManager: WorkStateMonitor { + + func work(_ work: Work, stateDidChangeTo newState: Work.State) { + switch newState { + case .preparing: + assertionFailure("No way a work becomes preparing") + case .ready: + lock.lock() + if executingWorks.count < maxConcurrentWorkCount, let pendingWorksIndex = pendingWorks.firstIndex(of: work) { + Logger.general.debug(category: "WorkManager", message: "[\(label)] Execute \(work) because of readiness change") + pendingWorks.remove(at: pendingWorksIndex) + executingWorks.insert(work) + lock.unlock() + dispatchQueue.async(execute: work.start) + } else { + lock.unlock() + } + case .executing: + Logger.general.debug(category: "WorkManager", message: "[\(label)] Executing: \(work)") + case .finished(let result): + Logger.general.debug(category: "WorkManager", message: "[\(label)] Finished: \(work)") + if let work = work as? PersistableWork { + WorkDAO.shared.delete(id: work.id) + } + lock.lock() + pendingWorks.removeAll { pendingWork in + pendingWork == work + } + executingWorks.remove(work) + if executingWorks.count < maxConcurrentWorkCount, let index = pendingWorks.firstIndex(where: { $0.isReady }) { + let nextWork = pendingWorks.remove(at: index) + executingWorks.insert(nextWork) + lock.unlock() + Logger.general.debug(category: "WorkManager", message: "[\(label)] Execute \(nextWork) because of another work finished") + dispatchQueue.async(execute: work.start) + } else { + lock.unlock() + } + } + } + +}