AWS File Storage with Local Storage

Just wondering - is it possible to use both methods simultaneously?

Basically I want to process a lot of files. Some of which I want to back up but others delete after processing.
Since it will be a lot of files, there is no point sending them off to AWS and get charged for the ones I wish to delete after processing.

So I thought I would see if this functionality already exists before I go off trying to reinvent the wheel…?

Since this is very specialized behaviour I doubt this already exists exactly as you wish, but you can easily implement your own FileStorageAPI and combine the logic of com.haulmont.cuba.core.app.filestorage.FileStorage and com.haulmont.addon.cubaaws.s3.AmazonS3FileStorage

With this approach I implemented my own read-from-local-and-s3-write-only-to-local file storage for a test environment. :wink:

Don’t forget to configure your bean in the spring.xml:

        <bean name="cuba_FileStorage" class="com.example.AmazonS3ReadOnlyFileStorage"/>
1 Like

Hi,
The ability to have several active file storage implementations in the same project has been implemented in Jmix.

In CUBA you would need to write your own implementation of the FileStorageAPI that delegates operations with files to one or another storage, as @klaus said.

1 Like

Thanks Klaus,

How did you get the FileStorage and AmazonS3FileStorage classes into the new bean? Or did you have to copy the implementations of each?

I have in Spring.xml
<bean name="cuba_FileStorage" class="com.daryn.ecos.core.files.EcosFileStorage"/>

My new bean

public class EcosFileStorage implements FileStorageAPI {
    public static final String NAME = "ecos_EcosFileStorage";

    private final AmazonS3FileStorage amazonS3FileStorage = new AmazonS3FileStorage();
    private final FileStorage fileStorage = new FileStorage();

    @Inject
    protected DataManager dataManager;

    @Override
    public long saveStream(FileDescriptor fileDescr, InputStream inputStream) throws FileStorageException {
       return fileStorage.saveStream((fileDescr), inputStream);
    }

But when I try and upload a file I get null pointer on:

java.lang.NullPointerException: null
	at com.haulmont.cuba.core.app.filestorage.FileStorage.getStorageRoots(FileStorage.java:79) ~[cuba-core-7.2.11.jar:7.2.11]
	at com.haulmont.cuba.core.app.filestorage.FileStorage.saveStream(FileStorage.java:105) ~[cuba-core-7.2.11.jar:7.2.11]
	at com.daryn.ecos.core.files.EcosFileStorage.saveStream(EcosFileStorage.java:26) ~[ecos-core-1.0-SNAPSHOT.jar:na]
	at com.haulmont.cuba.core.sys.remoting.LocalFileExchangeServiceBean.uploadFile(LocalFileExchangeServiceBean.java:40) ~[cuba-core-7.2.11.jar:7.2.11]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.9.RELEASE.jar:5.2.9.RELEASE]

I get another error if I try to save with the Amazon class…

The first lines of my class look like this:

/** Read is performed on S3 and locally, save is performed only locally. */
public class AmazonS3ReadOnlyFileStorage extends FileStorage {
    @Inject
    protected AmazonS3Config amazonS3Config;

    protected AtomicReference<S3Client> s3ClientReference = new AtomicReference<>();

    @EventListener
    protected void initS3Client(AppContextStartedEvent event) {
        refreshS3Client();
    }
    // ... code delegating to super methods and copied from the original AmazonS3FileStorage follows ...

You need to inject your dependencies, instead of new.

Cheers for that…

I couldn’t get it to work injecting them as beans.

So I just copied the source from both implementations FileStorage and AmazonS3FileStorage and modifed the methods. Looks like you did something similar…

Source (in case anyone else needs the same solution)

  1. Files are stored locally by default.
  2. Files can be moved to AWS instead of local storage by method moveFileToAwsStorageBucket
  3. Files are tracked depending on where they are stored.

Spring.xml - override bean
<bean name="cuba_FileStorage" class="com.daryn.ecos.core.FileStorageBeanExt"/>

Add enum to FileDescriptorExt

public enum StorageLocation implements EnumClass<String> {
    LOCAL("LOCAL"),
    AWS("AWS");

Files are saved locally by default. To move a file to AWS. e.g. in a Service

@Inject
protected FileStorageBeanExt fileStorageBeanExt;

@Override
public void moveFileToAwsStorageBucket(FileDescriptor fileDescriptor, boolean persistFileDescriptorAfter) throws FileStorageException {
    fileStorageBeanExt.moveFileToAws(fileDescriptor);
    if(persistFileDescriptorAfter)
        dataManager.commit(fileDescriptor);
}

FileStorageBeanExt

@Component(FileStorageBeanExt.NAME)
public class FileStorageBeanExt implements FileStorageAPI{
    static final String NAME = "ecos_FileStorageBeanExt ";

    private static final Logger log = LoggerFactory.getLogger(FileStorage.class);

    @Inject
    protected UserSessionSource userSessionSource;

    @Inject
    protected TimeSource timeSource;

    @Inject
    protected Configuration configuration;

    protected boolean isImmutableFileStorage;

    protected ExecutorService writeExecutor = Executors.newFixedThreadPool(5,
            new ThreadFactoryBuilder().setNameFormat("FileStorageWriter-%d").build());

    protected volatile File[] storageRoots;

    @PostConstruct
    public void init() {
        this.isImmutableFileStorage = configuration.getConfig(ServerConfig.class).getImmutableFileStorage();
    }

    public void moveFileToAws(FileDescriptor fileDescriptor) throws FileStorageException {
        this.saveFileAWS(fileDescriptor, this.loadFileLocally(fileDescriptor));
        this.removeFileLocally(fileDescriptor);
    }

    @Override
    public long saveStream(final FileDescriptor fileDescr, final InputStream inputStream) throws FileStorageException {
       return this.saveStreamLocally(fileDescr, inputStream);
    }

    @Override
    public void saveFile(final FileDescriptor fileDescr, final byte[] data) throws FileStorageException {
        this.saveFileLocally(fileDescr, data);
    }

    @Override
    public void removeFile(FileDescriptor fileDescr) throws FileStorageException {
        FileDescriptorExt fd = (FileDescriptorExt) fileDescr;
        if(fd.getStorageLocation()!=null && fd.getStorageLocation()== StorageLocation.AWS){
            this.removeFileAWS(fileDescr);
        }else{
            this.removeFileLocally(fileDescr);
        }
    }

    @Override
    public InputStream openStream(FileDescriptor fileDescr) throws FileStorageException {
        FileDescriptorExt fd = (FileDescriptorExt) fileDescr;
        if(fd.getStorageLocation().equals(StorageLocation.AWS)){
            return this.openStreamAWS(fileDescr);
        }else{
            return this.openStreamLocally(fileDescr);
        }
    }

    @Override
    public byte[] loadFile(FileDescriptor fileDescr) throws FileStorageException {
        FileDescriptorExt fd = (FileDescriptorExt) fileDescr;
        if(fd.getStorageLocation().equals(StorageLocation.AWS)){
            return this.loadFileAWS(fileDescr);
        }else{
            return this.loadFileLocally(fileDescr);
        }
    }

    @Override
    public boolean fileExists(FileDescriptor fileDescr) throws FileStorageException {
        FileDescriptorExt fd = (FileDescriptorExt) fileDescr;
        if(fd.getStorageLocation().equals(StorageLocation.AWS)){
            return this.fileExistsAWS(fileDescr);
        }else{
            return this.fileExistsLocally(fileDescr);
        }
    }


    /*
    AWS Processes - Copied from AmazonS3FileStorage
    *
    * */
    @Inject
    protected AmazonS3Config amazonS3Config;

    protected AtomicReference<S3Client> s3ClientReference = new AtomicReference<>();

    @EventListener
    protected void initS3Client(AppContextStartedEvent event) {
        refreshS3Client();
    }

    protected AwsCredentialsProvider getAwsCredentialsProvider() {
        if (getAccessKey() != null && getSecretAccessKey() != null) {
            AwsCredentials awsCredentials = AwsBasicCredentials.create(getAccessKey(), getSecretAccessKey());
            return StaticCredentialsProvider.create(awsCredentials);
        } else {
            return DefaultCredentialsProvider.builder().build();
        }
    }

    public void refreshS3Client() {
        AwsCredentialsProvider awsCredentialsProvider = getAwsCredentialsProvider();
        if (Strings.isNullOrEmpty(amazonS3Config.getEndpointUrl())) {
            s3ClientReference.set(S3Client.builder()
                    .credentialsProvider(awsCredentialsProvider)
                    .region(Region.of(getRegionName()))
                    .build());
        } else {
            s3ClientReference.set(S3Client.builder()
                    .credentialsProvider(awsCredentialsProvider)
                    .endpointOverride(URI.create(amazonS3Config.getEndpointUrl()))
                    .region(Region.of(getRegionName()))
                    .build());
        }
    }


    public long saveStreamAWS(FileDescriptor fileDescr, InputStream inputStream) throws FileStorageException {
        Preconditions.checkNotNullArgument(fileDescr.getSize());
        try {
            saveFileAWS(fileDescr, IOUtils.toByteArray(inputStream));
        } catch (IOException e) {
            String message = String.format("Could not save file %s.",
                    getFileNameAWS(fileDescr));
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, message);
        }
        return fileDescr.getSize();
    }


    public void saveFileAWS(FileDescriptor fd, byte[] data) throws FileStorageException {
        checkNotNullArgument(data, "File content is null");
        FileDescriptorExt fileDescr = (FileDescriptorExt) fd;
        fileDescr.setStorageLocation(StorageLocation.AWS);
        try {
            S3Client s3Client = s3ClientReference.get();
            int chunkSize = amazonS3Config.getChunkSize() * 1024;

            CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
                    .bucket(getBucket()).key(resolveFileName(fileDescr))
                    .build();
            CreateMultipartUploadResponse response = s3Client.createMultipartUpload(createMultipartUploadRequest);

            List<CompletedPart> completedParts = new ArrayList<>();
            for (int i = 0; i * chunkSize < data.length; i++) {
                int partNumber = i + 1;
                UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
                        .bucket(getBucket())
                        .key(resolveFileName(fileDescr))
                        .uploadId(response.uploadId())
                        .partNumber(partNumber)
                        .build();
                int endChunkPosition = Math.min(partNumber * chunkSize, data.length);
                byte[] chunkBytes = getChunkBytes(data, i * chunkSize, endChunkPosition);
                String eTag = s3Client.uploadPart(uploadPartRequest, RequestBody.fromBytes(chunkBytes)).eTag();
                CompletedPart part = CompletedPart.builder()
                        .partNumber(partNumber)
                        .eTag(eTag)
                        .build();
                completedParts.add(part);
            }

            CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build();
            CompleteMultipartUploadRequest completeMultipartUploadRequest =
                    CompleteMultipartUploadRequest.builder()
                            .bucket(getBucket())
                            .key(resolveFileName(fileDescr))
                            .uploadId(response.uploadId())
                            .multipartUpload(completedMultipartUpload).build();
            s3Client.completeMultipartUpload(completeMultipartUploadRequest);
        } catch (SdkException e) {
            String message = String.format("Could not save file %s.", getFileNameAWS(fileDescr));
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, message);
        }
    }

    protected byte[] getChunkBytes(byte[] data, int start, int end) {
        byte[] chunkBytes = new byte[end - start];
        System.arraycopy(data, start, chunkBytes, 0, end - start);
        return chunkBytes;
    }





    public void removeFileAWS(FileDescriptor fileDescr) throws FileStorageException {
        try {
            S3Client s3Client = s3ClientReference.get();
            DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder()
                    .bucket(getBucket())
                    .key(resolveFileName(fileDescr))
                    .build();
            s3Client.deleteObject(deleteObjectRequest);
        } catch (SdkException e) {
            String message = String.format("Could not delete file %s.", getFileNameAWS(fileDescr));
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, message);
        }
    }


    public InputStream openStreamAWS(FileDescriptor fileDescr) throws FileStorageException {
        InputStream is;
        try {
            S3Client s3Client = s3ClientReference.get();
            GetObjectRequest getObjectRequest = GetObjectRequest.builder()
                    .bucket(getBucket())
                    .key(resolveFileName(fileDescr))
                    .build();
            is = s3Client.getObject(getObjectRequest, ResponseTransformer.toInputStream());
        } catch (SdkException e) {
            String message = String.format("Could not load file %s.", getFileNameAWS(fileDescr));
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, message);
        }
        return is;
    }


    public byte[] loadFileAWS(FileDescriptor fileDescr) throws FileStorageException {
        try (InputStream inputStream = openStream(fileDescr)) {
            return IOUtils.toByteArray(inputStream);
        } catch (IOException e) {
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, fileDescr.getId().toString(), e);
        }
    }

    public boolean fileExistsAWS(FileDescriptor fileDescr) {
        S3Client s3Client = s3ClientReference.get();
        ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder()
                .bucket(getBucket())
                .prefix(resolveFileName(fileDescr))
                .maxKeys(1)
                .build();
        ListObjectsV2Response listObjResponse = s3Client.listObjectsV2(listObjectsReqManual);
        return listObjResponse.contents().stream()
                .map(S3Object::key)
                .collect(Collectors.toList())
                .contains(resolveFileName(fileDescr));
    }

    protected String resolveFileName(FileDescriptor fileDescr) {
        return getStorageDir(fileDescr.getCreateDate()) + "/" + getFileName(fileDescr);
    }

    /**
     * INTERNAL. Don't use in application code.
     */
    protected String getStorageDir(Date createDate) {
        Calendar cal = Calendar.getInstance();
        cal.setTime(createDate);
        int year = cal.get(Calendar.YEAR);
        int month = cal.get(Calendar.MONTH) + 1;
        int day = cal.get(Calendar.DAY_OF_MONTH);

        return String.format("%d/%s/%s", year,
                StringUtils.leftPad(String.valueOf(month), 2, '0'),
                StringUtils.leftPad(String.valueOf(day), 2, '0'));
    }

    protected String getFileNameAWS(FileDescriptor fileDescriptor) {
        if (StringUtils.isNotBlank(fileDescriptor.getExtension())) {
            return fileDescriptor.getId().toString() + "." + fileDescriptor.getExtension();
        } else {
            return fileDescriptor.getId().toString();
        }
    }

    protected String getRegionName() {
        return amazonS3Config.getRegionName();
    }

    protected String getBucket() {
        return amazonS3Config.getBucket();
    }

    protected String getSecretAccessKey() {
        return amazonS3Config.getSecretAccessKey();
    }

    protected String getAccessKey() {
        return amazonS3Config.getAccessKey();
    }





    /*
    Local Processes - Copied from FileStorage.class
    *
    * */

    /**
     * INTERNAL. Don't use in application code.
     */
    public File[] getStorageRoots() {
        if (storageRoots == null) {
            String conf = configuration.getConfig(ServerConfig.class).getFileStorageDir();
            if (StringUtils.isBlank(conf)) {
                String dataDir = configuration.getConfig(GlobalConfig.class).getDataDir();
                File dir = new File(dataDir, "filestorage");
                dir.mkdirs();
                storageRoots = new File[]{dir};
            } else {
                List<File> list = new ArrayList<>();
                for (String str : conf.split(",")) {
                    str = str.trim();
                    if (!StringUtils.isEmpty(str)) {
                        File file = new File(str);
                        if (!list.contains(file))
                            list.add(file);
                    }
                }
                storageRoots = list.toArray(new File[list.size()]);
            }
        }
        return storageRoots;
    }


    public long saveStreamLocally(final FileDescriptor fd, final InputStream inputStream) throws FileStorageException {
        checkFileDescriptor(fd);
        FileDescriptorExt fileDescr = (FileDescriptorExt) fd;
        fileDescr.setStorageLocation(StorageLocation.LOCAL);

        File[] roots = getStorageRoots();

        // Store to primary storage

        checkStorageDefined(roots, fileDescr);
        checkPrimaryStorageAccessible(roots, fileDescr);

        File dir = getStorageDir(roots[0], fileDescr);
        dir.mkdirs();
        checkDirectoryExists(dir);

        final File file = new File(dir, getFileName(fileDescr));
        checkFileExists(file);

        long size = 0;
        OutputStream os = null;
        try {
            os = FileUtils.openOutputStream(file);
            size = IOUtils.copyLarge(inputStream, os);
            os.flush();
            writeLog(file, false);
        } catch (IOException e) {
            IOUtils.closeQuietly(os);
            FileUtils.deleteQuietly(file);

            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, file.getAbsolutePath(), e);
        } finally {
            IOUtils.closeQuietly(os);
        }

        // Copy file to secondary storages asynchronously

        final SecurityContext securityContext = AppContext.getSecurityContext();
        for (int i = 1; i < roots.length; i++) {
            if (!roots[i].exists()) {
                log.error("Error saving {} into {} : directory doesn't exist", fileDescr, roots[i]);
                continue;
            }

            File copyDir = getStorageDir(roots[i], fileDescr);
            final File fileCopy = new File(copyDir, getFileName(fileDescr));

            writeExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        AppContext.setSecurityContext(securityContext);
                        FileUtils.copyFile(file, fileCopy, true);
                        writeLog(fileCopy, false);
                    } catch (Exception e) {
                        log.error("Error saving {} into {} : {}", fileDescr, fileCopy.getAbsolutePath(), e.getMessage());
                    } finally {
                        AppContext.setSecurityContext(null);
                    }
                }
            });
        }

        return size;
    }

    protected void checkFileExists(File file) throws FileStorageException {
        if (file.exists() && isImmutableFileStorage)
            throw new FileStorageException(FileStorageException.Type.FILE_ALREADY_EXISTS, file.getAbsolutePath());
    }

    protected void checkDirectoryExists(File dir) throws FileStorageException {
        if (!dir.exists())
            throw new FileStorageException(FileStorageException.Type.STORAGE_INACCESSIBLE, dir.getAbsolutePath());
    }

    protected void checkPrimaryStorageAccessible(File[] roots, FileDescriptor fileDescr) throws FileStorageException {
        if (!roots[0].exists()) {
            log.error("Inaccessible primary storage at {}", roots[0]);
            throw new FileStorageException(FileStorageException.Type.STORAGE_INACCESSIBLE, fileDescr.getId().toString());
        }
    }

    protected void checkStorageDefined(File[] roots, FileDescriptor fileDescr) throws FileStorageException {
        if (roots.length == 0) {
            log.error("No storage directories defined");
            throw new FileStorageException(FileStorageException.Type.STORAGE_INACCESSIBLE, fileDescr.getId().toString());
        }
    }


    public void saveFileLocally(final FileDescriptor fileDescr, final byte[] data) throws FileStorageException {
        checkNotNullArgument(data, "File content is null");
        saveStreamLocally(fileDescr, new ByteArrayInputStream(data));
    }

    protected synchronized void writeLog(File file, boolean remove) {
        File rootDir;
        try {
            rootDir = file.getParentFile().getParentFile().getParentFile().getParentFile();
        } catch (NullPointerException e) {
            log.error("Unable to write log: invalid file storage structure", e);
            return;
        }

        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        UserSession userSession = userSessionSource.getUserSession();
        String userLogin = userSession.getUser().getLogin();
        String userId = userSession.getUser().getId().toString();

        StringBuilder sb = new StringBuilder();
        sb.append(df.format(timeSource.currentTimestamp())).append(" ");

        sb.append("[").append(userLogin).append("--").append(userId).append("] ");
        sb.append(remove ? "REMOVE" : "CREATE").append(" ");
        sb.append("\"").append(file.getAbsolutePath()).append("\"\n");

        File logFile = new File(rootDir, "storage.log");
        try {
            try (FileOutputStream fos = new FileOutputStream(logFile, true)) {
                IOUtils.write(sb.toString(), fos, StandardCharsets.UTF_8);
            }
        } catch (IOException e) {
            log.error("Unable to write log", e);
        }
    }


    public void removeFileLocally(FileDescriptor fileDescr) throws FileStorageException {
        checkFileDescriptor(fileDescr);

        File[] roots = getStorageRoots();
        if (roots.length == 0) {
            log.error("No storage directories defined");
            return;
        }

        for (File root : roots) {
            File dir = getStorageDir(root, fileDescr);
            File file = new File(dir, getFileName(fileDescr));
            if (file.exists()) {
                if (!file.delete()) {
                    throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, "Unable to delete file " + file.getAbsolutePath());
                } else {
                    writeLog(file, true);
                }
            }
        }
    }

    protected void checkFileDescriptor(FileDescriptor fd) {
        if (fd == null || fd.getCreateDate() == null) {
            throw new IllegalArgumentException("A FileDescriptor instance with populated 'createDate' attribute must be provided");
        }
    }


    public InputStream openStreamLocally(FileDescriptor fileDescr) throws FileStorageException {
        checkFileDescriptor(fileDescr);

        File[] roots = getStorageRoots();
        if (roots.length == 0) {
            log.error("No storage directories available");
            throw new FileStorageException(FileStorageException.Type.FILE_NOT_FOUND, fileDescr.getId().toString());
        }

        InputStream inputStream = null;
        for (File root : roots) {
            File dir = getStorageDir(root, fileDescr);

            File file = new File(dir, getFileName(fileDescr));
            if (!file.exists()) {
                log.error("File " + file + " not found");
                continue;
            }

            try {
                inputStream = FileUtils.openInputStream(file);
                break;
            } catch (IOException e) {
                log.error("Error opening input stream for " + file, e);
            }
        }
        if (inputStream != null)
            return inputStream;
        else
            throw new FileStorageException(FileStorageException.Type.FILE_NOT_FOUND, fileDescr.getId().toString());
    }


    public byte[] loadFileLocally(FileDescriptor fileDescr) throws FileStorageException {
        InputStream inputStream = openStream(fileDescr);
        try {
            return IOUtils.toByteArray(inputStream);
        } catch (IOException e) {
            throw new FileStorageException(FileStorageException.Type.IO_EXCEPTION, fileDescr.getId().toString(), e);
        } finally {
            IOUtils.closeQuietly(inputStream);
        }
    }


    public boolean fileExistsLocally(FileDescriptor fileDescr) {
        checkFileDescriptor(fileDescr);

        File[] roots = getStorageRoots();
        for (File root : roots) {
            File dir = getStorageDir(root, fileDescr);
            File file = new File(dir, getFileName(fileDescr));
            if (file.exists()) {
                return true;
            }
        }
        return false;
    }

    /**
     * INTERNAL. Don't use in application code.
     */
    public File getStorageDir(File rootDir, FileDescriptor fileDescriptor) {
        checkNotNullArgument(rootDir);
        checkNotNullArgument(fileDescriptor);

        Calendar cal = Calendar.getInstance();
        cal.setTime(fileDescriptor.getCreateDate());
        int year = cal.get(Calendar.YEAR);
        int month = cal.get(Calendar.MONTH) + 1;
        int day = cal.get(Calendar.DAY_OF_MONTH);

        return new File(rootDir, year + "/"
                + StringUtils.leftPad(String.valueOf(month), 2, '0') + "/"
                + StringUtils.leftPad(String.valueOf(day), 2, '0'));
    }

    public static String getFileName(FileDescriptor fileDescriptor) {
        return fileDescriptor.getId().toString() + "." + fileDescriptor.getExtension();
    }

    @PreDestroy
    protected void stopWriteExecutor() {
        writeExecutor.shutdown();
    }
}