feat(portal): land files platform and frontend workspace refresh

This commit is contained in:
yoyuzh
2026-04-09 18:35:03 +08:00
parent 67cd0f6e6f
commit 99e00cd7f7
68 changed files with 5795 additions and 2911 deletions

View File

@@ -1,10 +1,16 @@
package com.yoyuzh.admin;
import com.yoyuzh.api.v2.tasks.BackgroundTaskResponse;
import com.yoyuzh.auth.CustomUserDetailsService;
import com.yoyuzh.auth.User;
import com.yoyuzh.common.ApiResponse;
import com.yoyuzh.common.PageResponse;
import com.yoyuzh.files.tasks.BackgroundTask;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
@@ -25,6 +31,7 @@ import java.util.List;
public class AdminController {
private final AdminService adminService;
private final CustomUserDetailsService userDetailsService;
@GetMapping("/summary")
public ApiResponse<AdminSummaryResponse> summary() {
@@ -59,6 +66,34 @@ public class AdminController {
return ApiResponse.success(adminService.listStoragePolicies());
}
@PostMapping("/storage-policies")
public ApiResponse<AdminStoragePolicyResponse> createStoragePolicy(
@Valid @RequestBody AdminStoragePolicyUpsertRequest request) {
return ApiResponse.success(adminService.createStoragePolicy(request));
}
@PutMapping("/storage-policies/{policyId}")
public ApiResponse<AdminStoragePolicyResponse> updateStoragePolicy(
@PathVariable Long policyId,
@Valid @RequestBody AdminStoragePolicyUpsertRequest request) {
return ApiResponse.success(adminService.updateStoragePolicy(policyId, request));
}
@PatchMapping("/storage-policies/{policyId}/status")
public ApiResponse<AdminStoragePolicyResponse> updateStoragePolicyStatus(
@PathVariable Long policyId,
@Valid @RequestBody AdminStoragePolicyStatusUpdateRequest request) {
return ApiResponse.success(adminService.updateStoragePolicyStatus(policyId, request.enabled()));
}
@PostMapping("/storage-policies/migrations")
public ApiResponse<BackgroundTaskResponse> createStoragePolicyMigrationTask(
@AuthenticationPrincipal UserDetails userDetails,
@Valid @RequestBody AdminStoragePolicyMigrationCreateRequest request) {
User user = userDetailsService.loadDomainUser(userDetails.getUsername());
return ApiResponse.success(toTaskResponse(adminService.createStoragePolicyMigrationTask(user, request)));
}
@DeleteMapping("/files/{fileId}")
public ApiResponse<Void> deleteFile(@PathVariable Long fileId) {
adminService.deleteFile(fileId);
@@ -99,4 +134,19 @@ public class AdminController {
public ApiResponse<AdminPasswordResetResponse> resetUserPassword(@PathVariable Long userId) {
return ApiResponse.success(adminService.resetUserPassword(userId));
}
private BackgroundTaskResponse toTaskResponse(BackgroundTask task) {
return new BackgroundTaskResponse(
task.getId(),
task.getType(),
task.getStatus(),
task.getUserId(),
task.getPublicStateJson(),
task.getCorrelationId(),
task.getErrorMessage(),
task.getCreatedAt(),
task.getUpdatedAt(),
task.getFinishedAt()
);
}
}

View File

@@ -10,12 +10,18 @@ import com.yoyuzh.common.BusinessException;
import com.yoyuzh.common.ErrorCode;
import com.yoyuzh.common.PageResponse;
import com.yoyuzh.files.core.FileBlobRepository;
import com.yoyuzh.files.core.FileEntityRepository;
import com.yoyuzh.files.core.FileEntityType;
import com.yoyuzh.files.core.FileService;
import com.yoyuzh.files.core.StoredFile;
import com.yoyuzh.files.core.StoredFileEntityRepository;
import com.yoyuzh.files.core.StoredFileRepository;
import com.yoyuzh.files.policy.StoragePolicy;
import com.yoyuzh.files.policy.StoragePolicyRepository;
import com.yoyuzh.files.policy.StoragePolicyService;
import com.yoyuzh.files.tasks.BackgroundTask;
import com.yoyuzh.files.tasks.BackgroundTaskService;
import com.yoyuzh.files.tasks.BackgroundTaskType;
import com.yoyuzh.transfer.OfflineTransferSessionRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
@@ -24,6 +30,7 @@ import org.springframework.data.domain.Sort;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.security.SecureRandom;
import java.time.Instant;
@@ -45,6 +52,9 @@ public class AdminService {
private final AdminMetricsService adminMetricsService;
private final StoragePolicyRepository storagePolicyRepository;
private final StoragePolicyService storagePolicyService;
private final FileEntityRepository fileEntityRepository;
private final StoredFileEntityRepository storedFileEntityRepository;
private final BackgroundTaskService backgroundTaskService;
private final SecureRandom secureRandom = new SecureRandom();
public AdminSummaryResponse getSummary() {
@@ -97,6 +107,75 @@ public class AdminService {
.toList();
}
@Transactional
public AdminStoragePolicyResponse createStoragePolicy(AdminStoragePolicyUpsertRequest request) {
StoragePolicy policy = new StoragePolicy();
policy.setDefaultPolicy(false);
applyStoragePolicyUpsert(policy, request);
return toStoragePolicyResponse(storagePolicyRepository.save(policy));
}
@Transactional
public AdminStoragePolicyResponse updateStoragePolicy(Long policyId, AdminStoragePolicyUpsertRequest request) {
StoragePolicy policy = getRequiredStoragePolicy(policyId);
applyStoragePolicyUpsert(policy, request);
return toStoragePolicyResponse(storagePolicyRepository.save(policy));
}
@Transactional
public AdminStoragePolicyResponse updateStoragePolicyStatus(Long policyId, boolean enabled) {
StoragePolicy policy = getRequiredStoragePolicy(policyId);
if (policy.isDefaultPolicy() && !enabled) {
throw new BusinessException(ErrorCode.UNKNOWN, "默认存储策略不能停用");
}
policy.setEnabled(enabled);
return toStoragePolicyResponse(storagePolicyRepository.save(policy));
}
@Transactional
public BackgroundTask createStoragePolicyMigrationTask(User user, AdminStoragePolicyMigrationCreateRequest request) {
StoragePolicy sourcePolicy = getRequiredStoragePolicy(request.sourcePolicyId());
StoragePolicy targetPolicy = getRequiredStoragePolicy(request.targetPolicyId());
if (sourcePolicy.getId().equals(targetPolicy.getId())) {
throw new BusinessException(ErrorCode.UNKNOWN, "源存储策略和目标存储策略不能相同");
}
if (!targetPolicy.isEnabled()) {
throw new BusinessException(ErrorCode.UNKNOWN, "目标存储策略必须处于启用状态");
}
long candidateEntityCount = fileEntityRepository.countByStoragePolicyIdAndEntityType(
sourcePolicy.getId(),
FileEntityType.VERSION
);
long candidateStoredFileCount = storedFileEntityRepository.countDistinctStoredFilesByStoragePolicyIdAndEntityType(
sourcePolicy.getId(),
FileEntityType.VERSION
);
java.util.Map<String, Object> state = new java.util.LinkedHashMap<>();
state.put("sourcePolicyId", sourcePolicy.getId());
state.put("sourcePolicyName", sourcePolicy.getName());
state.put("targetPolicyId", targetPolicy.getId());
state.put("targetPolicyName", targetPolicy.getName());
state.put("candidateEntityCount", candidateEntityCount);
state.put("candidateStoredFileCount", candidateStoredFileCount);
state.put("migrationPerformed", false);
state.put("migrationMode", "skeleton");
state.put("entityType", FileEntityType.VERSION.name());
state.put("message", "storage policy migration skeleton queued; worker will validate and recount candidates without moving object data");
java.util.Map<String, Object> privateState = new java.util.LinkedHashMap<>(state);
privateState.put("taskType", BackgroundTaskType.STORAGE_POLICY_MIGRATION.name());
return backgroundTaskService.createQueuedTask(
user,
BackgroundTaskType.STORAGE_POLICY_MIGRATION,
state,
privateState,
request.correlationId()
);
}
@Transactional
public void deleteFile(Long fileId) {
StoredFile storedFile = storedFileRepository.findById(fileId)
@@ -214,11 +293,34 @@ public class AdminService {
);
}
private void applyStoragePolicyUpsert(StoragePolicy policy, AdminStoragePolicyUpsertRequest request) {
if (policy.isDefaultPolicy() && !request.enabled()) {
throw new BusinessException(ErrorCode.UNKNOWN, "默认存储策略不能停用");
}
validateStoragePolicyRequest(request);
policy.setName(request.name().trim());
policy.setType(request.type());
policy.setBucketName(normalizeNullable(request.bucketName()));
policy.setEndpoint(normalizeNullable(request.endpoint()));
policy.setRegion(normalizeNullable(request.region()));
policy.setPrivateBucket(request.privateBucket());
policy.setPrefix(normalizePrefix(request.prefix()));
policy.setCredentialMode(request.credentialMode());
policy.setMaxSizeBytes(request.maxSizeBytes());
policy.setCapabilitiesJson(storagePolicyService.writeCapabilities(request.capabilities()));
policy.setEnabled(request.enabled());
}
private User getRequiredUser(Long userId) {
return userRepository.findById(userId)
.orElseThrow(() -> new BusinessException(ErrorCode.UNKNOWN, "用户不存在"));
}
private StoragePolicy getRequiredStoragePolicy(Long policyId) {
return storagePolicyRepository.findById(policyId)
.orElseThrow(() -> new BusinessException(ErrorCode.UNKNOWN, "存储策略不存在"));
}
private String normalizeQuery(String query) {
if (query == null) {
return "";
@@ -226,6 +328,31 @@ public class AdminService {
return query.trim();
}
private String normalizeNullable(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
return value.trim();
}
private String normalizePrefix(String prefix) {
if (!StringUtils.hasText(prefix)) {
return "";
}
return prefix.trim();
}
private void validateStoragePolicyRequest(AdminStoragePolicyUpsertRequest request) {
if (request.type() == com.yoyuzh.files.policy.StoragePolicyType.LOCAL
&& request.credentialMode() != com.yoyuzh.files.policy.StoragePolicyCredentialMode.NONE) {
throw new BusinessException(ErrorCode.UNKNOWN, "本地存储策略必须使用 NONE 凭证模式");
}
if (request.type() == com.yoyuzh.files.policy.StoragePolicyType.S3_COMPATIBLE
&& !StringUtils.hasText(request.bucketName())) {
throw new BusinessException(ErrorCode.UNKNOWN, "S3 存储策略必须提供 bucketName");
}
}
private String generateTemporaryPassword() {
String lowers = "abcdefghjkmnpqrstuvwxyz";
String uppers = "ABCDEFGHJKMNPQRSTUVWXYZ";

View File

@@ -0,0 +1,12 @@
package com.yoyuzh.admin;
import jakarta.validation.constraints.NotNull;
public record AdminStoragePolicyMigrationCreateRequest(
@NotNull(message = "sourcePolicyId 不能为空")
Long sourcePolicyId,
@NotNull(message = "targetPolicyId 不能为空")
Long targetPolicyId,
String correlationId
) {
}

View File

@@ -0,0 +1,9 @@
package com.yoyuzh.admin;
import jakarta.validation.constraints.NotNull;
public record AdminStoragePolicyStatusUpdateRequest(
@NotNull(message = "enabled 不能为空")
Boolean enabled
) {
}

View File

@@ -0,0 +1,28 @@
package com.yoyuzh.admin;
import com.yoyuzh.files.policy.StoragePolicyCapabilities;
import com.yoyuzh.files.policy.StoragePolicyCredentialMode;
import com.yoyuzh.files.policy.StoragePolicyType;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
public record AdminStoragePolicyUpsertRequest(
@NotBlank(message = "存储策略名称不能为空")
String name,
@NotNull(message = "存储策略类型不能为空")
StoragePolicyType type,
String bucketName,
String endpoint,
String region,
boolean privateBucket,
String prefix,
@NotNull(message = "凭证模式不能为空")
StoragePolicyCredentialMode credentialMode,
@Positive(message = "最大对象大小必须大于 0")
long maxSizeBytes,
@NotNull(message = "能力声明不能为空")
StoragePolicyCapabilities capabilities,
boolean enabled
) {
}

View File

@@ -5,6 +5,7 @@ import com.yoyuzh.auth.CustomUserDetailsService;
import com.yoyuzh.auth.User;
import com.yoyuzh.files.upload.UploadSession;
import com.yoyuzh.files.upload.UploadSessionCreateCommand;
import com.yoyuzh.files.upload.UploadSessionUploadMode;
import com.yoyuzh.files.upload.UploadSessionPartCommand;
import com.yoyuzh.files.upload.UploadSessionService;
import com.yoyuzh.files.storage.PreparedUpload;
@@ -19,7 +20,9 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@RestController
@RequestMapping("/api/v2/files/upload-sessions")
@@ -49,6 +52,20 @@ public class UploadSessionV2Controller {
return ApiV2Response.success(toResponse(uploadSessionService.getOwnedSession(user, sessionId)));
}
@GetMapping("/{sessionId}/prepare")
public ApiV2Response<PreparedUploadV2Response> prepareUpload(@AuthenticationPrincipal UserDetails userDetails,
@PathVariable String sessionId) {
User user = userDetailsService.loadDomainUser(userDetails.getUsername());
PreparedUpload preparedUpload = uploadSessionService.prepareOwnedUpload(user, sessionId);
return ApiV2Response.success(new PreparedUploadV2Response(
preparedUpload.direct(),
preparedUpload.uploadUrl(),
preparedUpload.method(),
preparedUpload.headers(),
preparedUpload.storageName()
));
}
@DeleteMapping("/{sessionId}")
public ApiV2Response<UploadSessionV2Response> cancelSession(@AuthenticationPrincipal UserDetails userDetails,
@PathVariable String sessionId) {
@@ -78,6 +95,14 @@ public class UploadSessionV2Controller {
return ApiV2Response.success(toResponse(session));
}
@PostMapping("/{sessionId}/content")
public ApiV2Response<UploadSessionV2Response> uploadContent(@AuthenticationPrincipal UserDetails userDetails,
@PathVariable String sessionId,
@RequestPart("file") MultipartFile file) {
User user = userDetailsService.loadDomainUser(userDetails.getUsername());
return ApiV2Response.success(toResponse(uploadSessionService.uploadOwnedContent(user, sessionId, file)));
}
@GetMapping("/{sessionId}/parts/{partIndex}/prepare")
public ApiV2Response<PreparedUploadV2Response> preparePartUpload(@AuthenticationPrincipal UserDetails userDetails,
@PathVariable String sessionId,
@@ -94,10 +119,18 @@ public class UploadSessionV2Controller {
}
private UploadSessionV2Response toResponse(UploadSession session) {
UploadSessionUploadMode uploadMode = uploadSessionService.resolveUploadMode(session);
if (uploadMode == null) {
uploadMode = session.getMultipartUploadId() != null
? UploadSessionUploadMode.DIRECT_MULTIPART
: UploadSessionUploadMode.PROXY;
}
return new UploadSessionV2Response(
session.getSessionId(),
session.getObjectKey(),
session.getMultipartUploadId() != null,
uploadMode != UploadSessionUploadMode.PROXY,
uploadMode == UploadSessionUploadMode.DIRECT_MULTIPART,
uploadMode.name(),
session.getTargetPath(),
session.getFilename(),
session.getContentType(),
@@ -108,7 +141,38 @@ public class UploadSessionV2Controller {
session.getChunkCount(),
session.getExpiresAt(),
session.getCreatedAt(),
session.getUpdatedAt()
session.getUpdatedAt(),
toStrategyResponse(session.getSessionId(), uploadMode)
);
}
private UploadSessionV2StrategyResponse toStrategyResponse(String sessionId, UploadSessionUploadMode uploadMode) {
String sessionBasePath = "/api/v2/files/upload-sessions/" + sessionId;
return switch (uploadMode) {
case PROXY -> new UploadSessionV2StrategyResponse(
null,
sessionBasePath + "/content",
null,
null,
sessionBasePath + "/complete",
"file"
);
case DIRECT_SINGLE -> new UploadSessionV2StrategyResponse(
sessionBasePath + "/prepare",
null,
null,
null,
sessionBasePath + "/complete",
null
);
case DIRECT_MULTIPART -> new UploadSessionV2StrategyResponse(
null,
null,
sessionBasePath + "/parts/{partIndex}/prepare",
sessionBasePath + "/parts/{partIndex}",
sessionBasePath + "/complete",
null
);
};
}
}

View File

@@ -5,7 +5,9 @@ import java.time.LocalDateTime;
public record UploadSessionV2Response(
String sessionId,
String objectKey,
boolean directUpload,
boolean multipartUpload,
String uploadMode,
String path,
String filename,
String contentType,
@@ -16,6 +18,7 @@ public record UploadSessionV2Response(
int chunkCount,
LocalDateTime expiresAt,
LocalDateTime createdAt,
LocalDateTime updatedAt
LocalDateTime updatedAt,
UploadSessionV2StrategyResponse strategy
) {
}

View File

@@ -0,0 +1,11 @@
package com.yoyuzh.api.v2.files;
public record UploadSessionV2StrategyResponse(
String prepareUrl,
String proxyContentUrl,
String partPrepareUrlTemplate,
String partRecordUrlTemplate,
String completeUrl,
String proxyFormField
) {
}

View File

@@ -2,9 +2,14 @@ package com.yoyuzh.files.core;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
import java.util.Optional;
public interface FileEntityRepository extends JpaRepository<FileEntity, Long> {
Optional<FileEntity> findByObjectKeyAndEntityType(String objectKey, FileEntityType entityType);
long countByStoragePolicyIdAndEntityType(Long storagePolicyId, FileEntityType entityType);
List<FileEntity> findByStoragePolicyIdAndEntityTypeOrderByIdAsc(Long storagePolicyId, FileEntityType entityType);
}

View File

@@ -8,6 +8,8 @@ import com.yoyuzh.common.PageResponse;
import com.yoyuzh.config.FileStorageProperties;
import com.yoyuzh.files.events.FileEventService;
import com.yoyuzh.files.events.FileEventType;
import com.yoyuzh.files.policy.StoragePolicy;
import com.yoyuzh.files.policy.StoragePolicyCapabilities;
import com.yoyuzh.files.policy.StoragePolicyService;
import com.yoyuzh.files.share.CreateFileShareLinkResponse;
import com.yoyuzh.files.share.FileShareDetailsResponse;
@@ -159,6 +161,10 @@ public class FileService {
validateUpload(user, normalizedPath, filename, request.size());
String objectKey = createBlobObjectKey();
StoragePolicyCapabilities capabilities = resolveDefaultStoragePolicyCapabilities();
if (capabilities != null && !capabilities.directUpload()) {
return new InitiateUploadResponse(false, "", "POST", Map.of(), objectKey);
}
PreparedUpload preparedUpload = fileContentStorage.prepareBlobUpload(
normalizedPath,
filename,
@@ -856,6 +862,13 @@ public class FileService {
return storagePolicyService.ensureDefaultPolicy().getId();
}
private StoragePolicyCapabilities resolveDefaultStoragePolicyCapabilities() {
if (storagePolicyService == null) {
return null;
}
return storagePolicyService.readCapabilities(storagePolicyService.ensureDefaultPolicy());
}
private void savePrimaryEntityRelation(StoredFile storedFile, FileEntity primaryEntity) {
if (storedFileEntityRepository == null) {
return;
@@ -927,6 +940,14 @@ public class FileService {
private void validateUpload(User user, String normalizedPath, String filename, long size) {
long effectiveMaxUploadSize = Math.min(maxFileSize, user.getMaxUploadSizeBytes());
StoragePolicy defaultPolicy = storagePolicyService == null ? null : storagePolicyService.ensureDefaultPolicy();
StoragePolicyCapabilities capabilities = defaultPolicy == null ? null : storagePolicyService.readCapabilities(defaultPolicy);
if (defaultPolicy != null && defaultPolicy.getMaxSizeBytes() > 0) {
effectiveMaxUploadSize = Math.min(effectiveMaxUploadSize, defaultPolicy.getMaxSizeBytes());
}
if (capabilities != null && capabilities.maxObjectSize() > 0) {
effectiveMaxUploadSize = Math.min(effectiveMaxUploadSize, capabilities.maxObjectSize());
}
if (size > effectiveMaxUploadSize) {
throw new BusinessException(ErrorCode.UNKNOWN, "文件大小超出限制");
}

View File

@@ -1,6 +1,17 @@
package com.yoyuzh.files.core;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface StoredFileEntityRepository extends JpaRepository<StoredFileEntity, Long> {
@Query("""
select count(distinct relation.storedFile.id)
from StoredFileEntity relation
where relation.fileEntity.storagePolicyId = :storagePolicyId
and relation.fileEntity.entityType = :entityType
""")
long countDistinctStoredFilesByStoragePolicyIdAndEntityType(@Param("storagePolicyId") Long storagePolicyId,
@Param("entityType") FileEntityType entityType);
}

View File

@@ -1,6 +1,8 @@
package com.yoyuzh.files.policy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yoyuzh.common.BusinessException;
import com.yoyuzh.common.ErrorCode;
import com.yoyuzh.config.FileStorageProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
@@ -38,6 +40,19 @@ public class StoragePolicyService implements CommandLineRunner {
}
}
public String writeCapabilities(StoragePolicyCapabilities capabilities) {
try {
return objectMapper.writeValueAsString(capabilities);
} catch (Exception ex) {
throw new IllegalStateException("Storage policy capabilities cannot be serialized", ex);
}
}
public StoragePolicy getRequiredPolicy(Long policyId) {
return storagePolicyRepository.findById(policyId)
.orElseThrow(() -> new BusinessException(ErrorCode.UNKNOWN, "存储策略不存在"));
}
private StoragePolicy createDefaultPolicy() {
if ("s3".equalsIgnoreCase(properties.getProvider())) {
return createDefaultS3Policy();
@@ -95,14 +110,6 @@ public class StoragePolicyService implements CommandLineRunner {
return policy;
}
private String writeCapabilities(StoragePolicyCapabilities capabilities) {
try {
return objectMapper.writeValueAsString(capabilities);
} catch (Exception ex) {
throw new IllegalStateException("Storage policy capabilities cannot be serialized", ex);
}
}
private String extractScopeBucketName(String scope) {
if (!StringUtils.hasText(scope)) {
return null;

View File

@@ -3,6 +3,7 @@ package com.yoyuzh.files.tasks;
public enum BackgroundTaskType {
ARCHIVE,
EXTRACT,
STORAGE_POLICY_MIGRATION,
THUMBNAIL,
MEDIA_META,
REMOTE_DOWNLOAD,

View File

@@ -98,6 +98,7 @@ public class BackgroundTaskWorker {
case ARCHIVE -> "archiving";
case EXTRACT -> "extracting";
case MEDIA_META -> "extracting-metadata";
case STORAGE_POLICY_MIGRATION -> "planning-storage-policy-migration";
default -> "running";
};
}

View File

@@ -0,0 +1,305 @@
package com.yoyuzh.files.tasks;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yoyuzh.common.BusinessException;
import com.yoyuzh.common.ErrorCode;
import com.yoyuzh.files.core.FileBlob;
import com.yoyuzh.files.core.FileBlobRepository;
import com.yoyuzh.files.core.FileEntity;
import com.yoyuzh.files.core.FileEntityRepository;
import com.yoyuzh.files.core.FileEntityType;
import com.yoyuzh.files.core.StoredFileRepository;
import com.yoyuzh.files.policy.StoragePolicy;
import com.yoyuzh.files.policy.StoragePolicyRepository;
import com.yoyuzh.files.policy.StoragePolicyType;
import com.yoyuzh.files.storage.FileContentStorage;
import com.yoyuzh.files.storage.LocalFileContentStorage;
import com.yoyuzh.files.storage.S3FileContentStorage;
import jakarta.transaction.Transactional;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Component
@Transactional
public class StoragePolicyMigrationBackgroundTaskHandler implements BackgroundTaskHandler {
private final StoragePolicyRepository storagePolicyRepository;
private final FileEntityRepository fileEntityRepository;
private final FileBlobRepository fileBlobRepository;
private final StoredFileRepository storedFileRepository;
private final FileContentStorage fileContentStorage;
private final ObjectMapper objectMapper;
public StoragePolicyMigrationBackgroundTaskHandler(StoragePolicyRepository storagePolicyRepository,
FileEntityRepository fileEntityRepository,
FileBlobRepository fileBlobRepository,
StoredFileRepository storedFileRepository,
FileContentStorage fileContentStorage,
ObjectMapper objectMapper) {
this.storagePolicyRepository = storagePolicyRepository;
this.fileEntityRepository = fileEntityRepository;
this.fileBlobRepository = fileBlobRepository;
this.storedFileRepository = storedFileRepository;
this.fileContentStorage = fileContentStorage;
this.objectMapper = objectMapper;
}
@Override
public boolean supports(BackgroundTaskType type) {
return type == BackgroundTaskType.STORAGE_POLICY_MIGRATION;
}
@Override
public BackgroundTaskHandlerResult handle(BackgroundTask task) {
return handle(task, publicStatePatch -> {
});
}
@Override
public BackgroundTaskHandlerResult handle(BackgroundTask task, BackgroundTaskProgressReporter progressReporter) {
Map<String, Object> state = parseState(task.getPrivateStateJson());
Long sourcePolicyId = readLong(state.get("sourcePolicyId"), "sourcePolicyId");
Long targetPolicyId = readLong(state.get("targetPolicyId"), "targetPolicyId");
StoragePolicy sourcePolicy = storagePolicyRepository.findById(sourcePolicyId)
.orElseThrow(() -> new IllegalStateException("storage policy migration source policy not found"));
StoragePolicy targetPolicy = storagePolicyRepository.findById(targetPolicyId)
.orElseThrow(() -> new IllegalStateException("storage policy migration target policy not found"));
validatePolicyPair(sourcePolicy, targetPolicy);
List<FileEntity> entities = fileEntityRepository.findByStoragePolicyIdAndEntityTypeOrderByIdAsc(
sourcePolicyId,
FileEntityType.VERSION
);
long candidateEntityCount = entities.size();
long candidateStoredFileCount = 0L;
for (FileEntity entity : entities) {
validateTargetCapacity(entity, targetPolicy);
candidateStoredFileCount += storedFileRepository.countByBlobId(getRequiredBlob(entity).getId());
}
long processedEntityCount = 0L;
long migratedStoredFileCount = 0L;
List<String> copiedObjectKeys = new ArrayList<>();
LinkedHashSet<String> staleObjectKeys = new LinkedHashSet<>();
progressReporter.report(progressPatch(
sourcePolicy,
targetPolicy,
candidateEntityCount,
candidateStoredFileCount,
0L,
0L,
0L,
"copying-object-data",
false
));
try {
for (FileEntity entity : entities) {
FileBlob blob = getRequiredBlob(entity);
long storedFileCount = storedFileRepository.countByBlobId(blob.getId());
String oldObjectKey = entity.getObjectKey();
String newObjectKey = buildTargetObjectKey(targetPolicy.getId());
String contentType = StringUtils.hasText(entity.getContentType()) ? entity.getContentType() : blob.getContentType();
byte[] content = fileContentStorage.readBlob(oldObjectKey);
copiedObjectKeys.add(newObjectKey);
fileContentStorage.storeBlob(newObjectKey, contentType, content);
entity.setObjectKey(newObjectKey);
entity.setStoragePolicyId(targetPolicy.getId());
fileEntityRepository.save(entity);
blob.setObjectKey(newObjectKey);
fileBlobRepository.save(blob);
staleObjectKeys.add(oldObjectKey);
processedEntityCount += 1;
migratedStoredFileCount += storedFileCount;
progressReporter.report(progressPatch(
sourcePolicy,
targetPolicy,
candidateEntityCount,
candidateStoredFileCount,
processedEntityCount,
processedEntityCount,
migratedStoredFileCount,
"copying-object-data",
false
));
}
} catch (RuntimeException ex) {
cleanupCopiedObjects(copiedObjectKeys);
throw ex;
}
scheduleStaleObjectCleanup(staleObjectKeys);
return new BackgroundTaskHandlerResult(progressPatch(
sourcePolicy,
targetPolicy,
candidateEntityCount,
candidateStoredFileCount,
processedEntityCount,
processedEntityCount,
migratedStoredFileCount,
"completed",
true
));
}
private void validatePolicyPair(StoragePolicy sourcePolicy, StoragePolicy targetPolicy) {
if (sourcePolicy.getId().equals(targetPolicy.getId())) {
throw new BusinessException(ErrorCode.UNKNOWN, "源存储策略和目标存储策略不能相同");
}
if (!targetPolicy.isEnabled()) {
throw new BusinessException(ErrorCode.UNKNOWN, "目标存储策略必须处于启用状态");
}
if (sourcePolicy.getType() != targetPolicy.getType()) {
throw new BusinessException(ErrorCode.UNKNOWN, "当前只支持迁移同类型存储策略");
}
StoragePolicyType runtimeType = resolveRuntimePolicyType();
if (runtimeType != null
&& (sourcePolicy.getType() != runtimeType || targetPolicy.getType() != runtimeType)) {
throw new BusinessException(ErrorCode.UNKNOWN, "当前运行时只支持迁移同类型活动存储后端的策略");
}
}
private StoragePolicyType resolveRuntimePolicyType() {
if (fileContentStorage instanceof LocalFileContentStorage) {
return StoragePolicyType.LOCAL;
}
if (fileContentStorage instanceof S3FileContentStorage) {
return StoragePolicyType.S3_COMPATIBLE;
}
return null;
}
private void validateTargetCapacity(FileEntity entity, StoragePolicy targetPolicy) {
if (targetPolicy.getMaxSizeBytes() > 0 && entity.getSize() != null && entity.getSize() > targetPolicy.getMaxSizeBytes()) {
throw new BusinessException(ErrorCode.UNKNOWN, "目标存储策略容量上限不足以承载待迁移对象");
}
}
private FileBlob getRequiredBlob(FileEntity entity) {
return fileBlobRepository.findByObjectKey(entity.getObjectKey())
.orElseThrow(() -> new IllegalStateException("storage policy migration blob not found"));
}
private String buildTargetObjectKey(Long targetPolicyId) {
return "policies/" + targetPolicyId + "/blobs/" + UUID.randomUUID().toString().replace("-", "");
}
private Map<String, Object> progressPatch(StoragePolicy sourcePolicy,
StoragePolicy targetPolicy,
long candidateEntityCount,
long candidateStoredFileCount,
long processedEntityCount,
long migratedEntityCount,
long migratedStoredFileCount,
String migrationStage,
boolean migrationPerformed) {
Map<String, Object> patch = new LinkedHashMap<>();
patch.put(BackgroundTaskService.STATE_PHASE_KEY, "migrating-storage-policy");
patch.put("worker", "storage-policy-migration");
patch.put("migrationStage", migrationStage);
patch.put("migrationMode", migrationPerformed ? "executed" : "executing");
patch.put("migrationPerformed", migrationPerformed);
patch.put("sourcePolicyId", sourcePolicy.getId());
patch.put("sourcePolicyName", sourcePolicy.getName());
patch.put("targetPolicyId", targetPolicy.getId());
patch.put("targetPolicyName", targetPolicy.getName());
patch.put("candidateEntityCount", candidateEntityCount);
patch.put("candidateStoredFileCount", candidateStoredFileCount);
patch.put("processedEntityCount", processedEntityCount);
patch.put("totalEntityCount", candidateEntityCount);
patch.put("processedStoredFileCount", migratedStoredFileCount);
patch.put("totalStoredFileCount", candidateStoredFileCount);
patch.put("migratedEntityCount", migratedEntityCount);
patch.put("migratedStoredFileCount", migratedStoredFileCount);
patch.put("entityType", FileEntityType.VERSION.name());
patch.put("plannedAt", LocalDateTime.now().toString());
patch.put("progressPercent", calculateProgressPercent(
processedEntityCount,
candidateEntityCount,
migratedStoredFileCount,
candidateStoredFileCount
));
patch.put("message", migrationPerformed
? "storage policy migration moved object data through the active storage backend and updated metadata references"
: "storage policy migration is copying object data and updating metadata references");
return patch;
}
private int calculateProgressPercent(long processedEntityCount,
long totalEntityCount,
long processedStoredFileCount,
long totalStoredFileCount) {
long total = Math.max(0L, totalEntityCount) + Math.max(0L, totalStoredFileCount);
long processed = Math.max(0L, processedEntityCount) + Math.max(0L, processedStoredFileCount);
if (total <= 0L) {
return 100;
}
return (int) Math.min(100L, Math.floor((processed * 100.0d) / total));
}
private void scheduleStaleObjectCleanup(LinkedHashSet<String> staleObjectKeys) {
if (staleObjectKeys.isEmpty() || !TransactionSynchronizationManager.isSynchronizationActive()) {
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
for (String staleObjectKey : staleObjectKeys) {
try {
fileContentStorage.deleteBlob(staleObjectKey);
} catch (RuntimeException ignored) {
// Database state already committed; leave old object cleanup as best effort.
}
}
}
});
}
private void cleanupCopiedObjects(List<String> copiedObjectKeys) {
for (String copiedObjectKey : copiedObjectKeys) {
try {
fileContentStorage.deleteBlob(copiedObjectKey);
} catch (RuntimeException ignored) {
// Best-effort cleanup while metadata rolls back.
}
}
}
private Map<String, Object> parseState(String json) {
if (!StringUtils.hasText(json)) {
return Map.of();
}
try {
return objectMapper.readValue(json, new TypeReference<LinkedHashMap<String, Object>>() {
});
} catch (JsonProcessingException ex) {
throw new IllegalStateException("storage policy migration task state is invalid", ex);
}
}
private Long readLong(Object value, String key) {
if (value instanceof Number number) {
return number.longValue();
}
if (value instanceof String text && StringUtils.hasText(text)) {
return Long.parseLong(text.trim());
}
throw new IllegalStateException("storage policy migration task missing " + key);
}
}

View File

@@ -9,6 +9,7 @@ import com.yoyuzh.config.FileStorageProperties;
import com.yoyuzh.files.core.FileService;
import com.yoyuzh.files.core.StoredFileRepository;
import com.yoyuzh.files.policy.StoragePolicy;
import com.yoyuzh.files.policy.StoragePolicyCapabilities;
import com.yoyuzh.files.policy.StoragePolicyService;
import com.yoyuzh.files.storage.FileContentStorage;
import com.yoyuzh.files.storage.MultipartCompletedPart;
@@ -18,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import java.time.Clock;
import java.time.LocalDateTime;
@@ -78,7 +80,10 @@ public class UploadSessionService {
public UploadSession createSession(User user, UploadSessionCreateCommand command) {
String normalizedPath = normalizeDirectoryPath(command.path());
String filename = normalizeLeafName(command.filename());
validateTarget(user, normalizedPath, filename, command.size());
StoragePolicy policy = storagePolicyService.ensureDefaultPolicy();
StoragePolicyCapabilities capabilities = storagePolicyService.readCapabilities(policy);
validateTarget(user, normalizedPath, filename, command.size(), policy, capabilities);
UploadSessionUploadMode uploadMode = resolveUploadMode(capabilities);
UploadSession session = new UploadSession();
session.setSessionId(UUID.randomUUID().toString());
@@ -88,17 +93,18 @@ public class UploadSessionService {
session.setContentType(command.contentType());
session.setSize(command.size());
session.setObjectKey(createBlobObjectKey());
StoragePolicy policy = storagePolicyService.ensureDefaultPolicy();
session.setStoragePolicyId(policy.getId());
session.setChunkSize(DEFAULT_CHUNK_SIZE);
session.setChunkCount(calculateChunkCount(command.size(), DEFAULT_CHUNK_SIZE));
session.setChunkCount(uploadMode == UploadSessionUploadMode.DIRECT_MULTIPART
? calculateChunkCount(command.size(), DEFAULT_CHUNK_SIZE)
: 1);
session.setUploadedPartsJson("[]");
session.setStatus(UploadSessionStatus.CREATED);
LocalDateTime now = LocalDateTime.ofInstant(clock.instant(), clock.getZone());
session.setCreatedAt(now);
session.setUpdatedAt(now);
session.setExpiresAt(now.plusHours(SESSION_TTL_HOURS));
if (storagePolicyService.readCapabilities(policy).multipartUpload()) {
if (uploadMode == UploadSessionUploadMode.DIRECT_MULTIPART) {
session.setMultipartUploadId(fileContentStorage.createMultipartUpload(session.getObjectKey(), session.getContentType()));
}
return uploadSessionRepository.save(session);
@@ -121,12 +127,30 @@ public class UploadSessionService {
return uploadSessionRepository.save(session);
}
@Transactional(readOnly = true)
public PreparedUpload prepareOwnedUpload(User user, String sessionId) {
UploadSession session = getOwnedSession(user, sessionId);
LocalDateTime now = LocalDateTime.ofInstant(clock.instant(), clock.getZone());
ensureSessionCanReceiveContent(session, now);
if (resolveUploadMode(session) != UploadSessionUploadMode.DIRECT_SINGLE) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传会话未启用单请求直传");
}
return fileContentStorage.prepareBlobUpload(
session.getTargetPath(),
session.getFilename(),
session.getObjectKey(),
session.getContentType(),
session.getSize()
);
}
@Transactional(readOnly = true)
public PreparedUpload prepareOwnedPartUpload(User user, String sessionId, int partIndex) {
UploadSession session = getOwnedSession(user, sessionId);
LocalDateTime now = LocalDateTime.ofInstant(clock.instant(), clock.getZone());
ensureSessionCanReceivePart(session, now);
if (!StringUtils.hasText(session.getMultipartUploadId())) {
if (resolveUploadMode(session) != UploadSessionUploadMode.DIRECT_MULTIPART
|| !StringUtils.hasText(session.getMultipartUploadId())) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传会话未启用 multipart");
}
if (partIndex < 0 || partIndex >= session.getChunkCount()) {
@@ -149,6 +173,9 @@ public class UploadSessionService {
UploadSession session = getOwnedSession(user, sessionId);
LocalDateTime now = LocalDateTime.ofInstant(clock.instant(), clock.getZone());
ensureSessionCanReceivePart(session, now);
if (resolveUploadMode(session) != UploadSessionUploadMode.DIRECT_MULTIPART) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传会话未启用 multipart");
}
if (partIndex < 0 || partIndex >= session.getChunkCount()) {
throw new BusinessException(ErrorCode.UNKNOWN, "分片序号不合法");
}
@@ -172,6 +199,28 @@ public class UploadSessionService {
return uploadSessionRepository.save(session);
}
@Transactional
public UploadSession uploadOwnedContent(User user, String sessionId, MultipartFile file) {
UploadSession session = getOwnedSession(user, sessionId);
LocalDateTime now = LocalDateTime.ofInstant(clock.instant(), clock.getZone());
ensureSessionCanReceiveContent(session, now);
if (resolveUploadMode(session) != UploadSessionUploadMode.PROXY) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传会话未启用代理上传");
}
if (file == null || file.isEmpty()) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传内容不能为空");
}
if (file.getSize() != session.getSize()) {
throw new BusinessException(ErrorCode.UNKNOWN, "上传内容大小与会话不一致");
}
fileContentStorage.uploadBlob(session.getObjectKey(), file);
if (session.getStatus() == UploadSessionStatus.CREATED) {
session.setStatus(UploadSessionStatus.UPLOADING);
}
session.setUpdatedAt(now);
return uploadSessionRepository.save(session);
}
@Transactional
public UploadSession completeOwnedSession(User user, String sessionId) {
UploadSession session = getOwnedSession(user, sessionId);
@@ -194,7 +243,8 @@ public class UploadSessionService {
uploadSessionRepository.save(session);
try {
if (StringUtils.hasText(session.getMultipartUploadId())) {
if (resolveUploadMode(session) == UploadSessionUploadMode.DIRECT_MULTIPART
&& StringUtils.hasText(session.getMultipartUploadId())) {
fileContentStorage.completeMultipartUpload(
session.getObjectKey(),
session.getMultipartUploadId(),
@@ -246,8 +296,40 @@ public class UploadSessionService {
return expiredSessions.size();
}
private void validateTarget(User user, String normalizedPath, String filename, long size) {
public UploadSessionUploadMode resolveUploadMode(UploadSession session) {
if (session.getStoragePolicyId() == null) {
if (StringUtils.hasText(session.getMultipartUploadId()) || session.getChunkCount() > 1) {
return UploadSessionUploadMode.DIRECT_MULTIPART;
}
return UploadSessionUploadMode.PROXY;
}
StoragePolicy policy = storagePolicyService.getRequiredPolicy(session.getStoragePolicyId());
return resolveUploadMode(storagePolicyService.readCapabilities(policy));
}
private UploadSessionUploadMode resolveUploadMode(StoragePolicyCapabilities capabilities) {
if (!capabilities.directUpload()) {
return UploadSessionUploadMode.PROXY;
}
if (capabilities.multipartUpload()) {
return UploadSessionUploadMode.DIRECT_MULTIPART;
}
return UploadSessionUploadMode.DIRECT_SINGLE;
}
private void validateTarget(User user,
String normalizedPath,
String filename,
long size,
StoragePolicy policy,
StoragePolicyCapabilities capabilities) {
long effectiveMaxUploadSize = Math.min(maxFileSize, user.getMaxUploadSizeBytes());
if (policy.getMaxSizeBytes() > 0) {
effectiveMaxUploadSize = Math.min(effectiveMaxUploadSize, policy.getMaxSizeBytes());
}
if (capabilities.maxObjectSize() > 0) {
effectiveMaxUploadSize = Math.min(effectiveMaxUploadSize, capabilities.maxObjectSize());
}
if (size > effectiveMaxUploadSize) {
throw new BusinessException(ErrorCode.UNKNOWN, "文件大小超出限制");
}
@@ -260,6 +342,13 @@ public class UploadSessionService {
}
}
private void ensureSessionCanReceiveContent(UploadSession session, LocalDateTime now) {
ensureSessionCanReceivePart(session, now);
if (session.getStatus() == UploadSessionStatus.UPLOADING && StringUtils.hasText(session.getMultipartUploadId())) {
throw new BusinessException(ErrorCode.UNKNOWN, "multipart 上传会话不能走整体内容上传");
}
}
private void ensureSessionCanReceivePart(UploadSession session, LocalDateTime now) {
if (session.getStatus() == UploadSessionStatus.CANCELLED
|| session.getStatus() == UploadSessionStatus.FAILED

View File

@@ -0,0 +1,7 @@
package com.yoyuzh.files.upload;
public enum UploadSessionUploadMode {
PROXY,
DIRECT_SINGLE,
DIRECT_MULTIPART
}