Commit b9925cce authored by Evgeni Sladkovskii's avatar Evgeni Sladkovskii

working

parent 8d7fc706
...@@ -4,11 +4,11 @@ spring: ...@@ -4,11 +4,11 @@ spring:
axoniq: axoniq:
axondb: axondb:
domain: axondb domain: local
controldb-backup-location: /controldata/ controldb-backup-location: /controldata/
file: file:
storage: /events storage: /events
max-segment-size: 50000000 max-segment-size: 1_000_000
server: server:
context-path: /axondb context-path: /axondb
\ No newline at end of file
...@@ -6,11 +6,7 @@ Idea behind this client is to schedule AxonDB backups and store them in predefin ...@@ -6,11 +6,7 @@ Idea behind this client is to schedule AxonDB backups and store them in predefin
Client periodically calls AxonDB to check if there is new closed event files. Client periodically calls AxonDB to check if there is new closed event files.
If there is one or more it creates a controlDB backup file and copies it and new events files to preconfigured locations. If there is one or more it creates a controlDB backup file and copies it and new events files to preconfigured locations.
This client should be ran near AxonDB since controlDB backups are stored in root dir. Client can run as a separate pod in kubernetes cluster as long as controldb/events disks are mounted to client in RO mode.
In case of ordinary machine/virtual machine deployment just run it as a separate process.
In case of docker use `nohup java -jar backup.jar --spring.config.location=/axondb-backup.yml &` in your run script to run it in background to AxonDB.
## Restore ## Restore
To restore copy all files events backups to corresponding folders in AxonDB. To restore copy all files events backups to corresponding folders in AxonDB.
......
...@@ -2,18 +2,32 @@ package nl.trifork.axondbbackupclient.backup; ...@@ -2,18 +2,32 @@ package nl.trifork.axondbbackupclient.backup;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static nl.trifork.axondbbackupclient.util.FileUtils.ensureDir;
@Service @Service
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class BackupService { public class BackupService {
private final ControlDBBackupService controlDb; private final ControlDBBackupService controlDb;
private final SegmentsBackupService segments; private final SnapshotsBackupService segments;
private final EventsBackupService events;
private final BackupConfig config;
public void run(BackupJob job) { public void run(BackupJob job) {
ensureDirs(job);
controlDb.backup(job); controlDb.backup(job);
segments.backup(job);
events.backup(job);
}
private void ensureDirs(BackupJob job) {
String eventsDir = config.getBackupPath() + job.getName() + "/events";
ensureDir(eventsDir);
ensureDir(eventsDir + "/closed");
} }
} }
...@@ -10,7 +10,8 @@ import org.springframework.stereotype.Service; ...@@ -10,7 +10,8 @@ import org.springframework.stereotype.Service;
import java.io.File; import java.io.File;
import static com.google.common.base.Stopwatch.createStarted; import static com.google.common.base.Stopwatch.createStarted;
import static nl.trifork.axondbbackupclient.util.FileIoService.copy; import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.filenameFromPath;
@Service @Service
@Slf4j @Slf4j
...@@ -33,10 +34,7 @@ public class ControlDBBackupService { ...@@ -33,10 +34,7 @@ public class ControlDBBackupService {
} }
private File dbDump(BackupJob job, String dumpPath) { private File dbDump(BackupJob job, String dumpPath) {
int fileStart = dumpPath.lastIndexOf('/'); String path = config.getAxonPath() + job.getName() + job.getControlDbPath() + filenameFromPath(dumpPath);
if (fileStart < 0) throw new IllegalArgumentException("Wrong controldb dump path " + dumpPath);
String filename = dumpPath.substring(fileStart + 1);
String path = config.getAxonPath() + job.getName() + job.getControlDbPath() + filename;
return new File(path); return new File(path);
} }
......
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.client.AxonDbRestClient;
import nl.trifork.axondbbackupclient.util.FileUtils;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import static com.google.common.base.Stopwatch.createStarted;
import static java.nio.file.Files.list;
import static java.util.stream.Collectors.toList;
import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.copyWithOverride;
@Service
@RequiredArgsConstructor
@Slf4j
public class EventsBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
//todo extract similar logic to snapshots
public void backup(BackupJob job) {
Stopwatch stopwatch = createStarted();
List<File> closedEvents = closedEvents(job);
log.trace("Closed events size {}", closedEvents.size());
copyClosed(job, closedEvents);
List<File> currentSnapshots = currentEvents(job, closedEvents);
log.trace("Current snapshots {}", currentSnapshots);
copyCurrent(job, currentSnapshots);
log.trace("Copied events in {}", stopwatch);
}
private List<File> closedEvents(BackupJob job) {
List<String> closed = axonDbClient.getClosedEvents(job.getAxondbHost()).stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir", e);
throw new RuntimeException(e);
}
}
private void copyClosed(BackupJob job, List<File> closed) {
String dirTo = config.getBackupPath() + job.getName() + "/events/closed/";
closed.forEach(f -> copy(f, dirTo));
}
private List<File> currentEvents(BackupJob job, List<File> closed) {
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> !closed.contains(f))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir", e);
throw new RuntimeException(e);
}
}
private void copyCurrent(BackupJob job, List<File> currentSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/";
currentSnapshots.forEach(f -> copyWithOverride(f, dirTo));
}
}
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.client.AxonDbRestClient;
import org.springframework.stereotype.Service;
import java.io.File;
import static com.google.common.base.Stopwatch.createStarted;
import static java.util.Arrays.stream;
import static nl.trifork.axondbbackupclient.util.FileIoService.copy;
@Service
@RequiredArgsConstructor
@Slf4j
public class SegmentsBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
private final ControlDBBackupService dbBackupService;
// @Scheduled(fixedDelayString = "${backup.fixedDelayMs}", initialDelayString = "${backup.initialDelayMs}")
public void backup(BackupJob job) {
if (hasNewSegments()) {
dbBackupService.backup(job);
backupEvents();
backupSnapshots();
} else {
log.debug("No new segments");
}
}
private boolean hasNewSegments() {
String[] segments = axonDbClient.getClosedEventSegments();
return stream(segments)
.map(File::new)
.filter(File::exists)
.map(File::getName)
.filter(name -> name.endsWith(".events"))
.map(name -> config.getJobs().get(0).getEventsPath() + name)
.map(File::new)
.filter(f -> !f.exists())
.peek(f -> log.info("Found new segment {}", f.getName()))
.count() > 0;
}
private void backupEvents() {
Stopwatch stopwatch = createStarted();
String[] segments = axonDbClient.getClosedEventSegments();
stream(segments)
.map(File::new)
.filter(File::exists)
.forEach(f -> copy(f, config.getJobs().get(0).getEventsPath()));
log.info("Copied event segments in {}", stopwatch);
}
private void backupSnapshots() {
Stopwatch stopwatch = createStarted();
String[] segments = axonDbClient.getClosedSnapshotsSegments();
stream(segments)
.map(File::new)
.filter(File::exists)
.forEach(f -> copy(f, config.getJobs().get(0).getEventsPath()));
log.info("Copied snapshot segments in {}", stopwatch);
}
}
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.client.AxonDbRestClient;
import nl.trifork.axondbbackupclient.util.FileUtils;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import static com.google.common.base.Stopwatch.createStarted;
import static java.nio.file.Files.list;
import static java.util.stream.Collectors.toList;
import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.copyWithOverride;
@Service
@RequiredArgsConstructor
@Slf4j
public class SnapshotsBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
public void backup(BackupJob job) {
Stopwatch stopwatch = createStarted();
List<File> closedSnapshots = closedSnapshots(job);
log.trace("Closed snapshots size {}", closedSnapshots.size());
copyClosed(job, closedSnapshots);
List<File> currentSnapshots = currentSnapshots(job, closedSnapshots);
log.trace("Current snapshots {}", currentSnapshots);
copyCurrent(job, currentSnapshots);
log.trace("Copied snapshots in {}", stopwatch);
}
private List<File> closedSnapshots(BackupJob job) {
List<String> closed = axonDbClient.getClosedSnapshots(job.getAxondbHost()).stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getSnapshotsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir", e);
throw new RuntimeException(e);
}
}
private void copyClosed(BackupJob job, List<File> closedSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/closed/";
closedSnapshots.forEach(f -> copy(f, dirTo));
}
private List<File> currentSnapshots(BackupJob job, List<File> closedSnapshots) {
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getSnapshotsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> !closedSnapshots.contains(f))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read snapshots dir", e);
throw new RuntimeException(e);
}
}
private void copyCurrent(BackupJob job, List<File> currentSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/";
currentSnapshots.forEach(f -> copyWithOverride(f, dirTo));
}
}
...@@ -9,6 +9,9 @@ import org.springframework.http.ResponseEntity; ...@@ -9,6 +9,9 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.util.List;
import static java.util.Arrays.asList;
import static org.springframework.http.HttpMethod.GET; import static org.springframework.http.HttpMethod.GET;
import static org.springframework.http.HttpMethod.POST; import static org.springframework.http.HttpMethod.POST;
...@@ -20,20 +23,21 @@ public class AxonDbRestClient { ...@@ -20,20 +23,21 @@ public class AxonDbRestClient {
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
private final BackupConfig config; private final BackupConfig config;
public String[] getClosedEventSegments() { public List<String> getClosedEvents(String host) {
return getReadySegments(eventsUrl()); return asList(getReadySegments(eventsUrl(host)));
} }
public String[] getClosedSnapshotsSegments() { public List<String> getClosedSnapshots(String host) {
return getReadySegments(snapshotsUrl()); return asList(getReadySegments(snapshotsUrl(host)));
} }
private String[] getReadySegments(String url) { private String[] getReadySegments(String url) {
try { try {
log.trace("Calling for ready segments on {}", url);
ResponseEntity<String[]> segments = restTemplate.exchange(url, GET, reqEntity(), String[].class); ResponseEntity<String[]> segments = restTemplate.exchange(url, GET, reqEntity(), String[].class);
return segments.getBody(); return segments.getBody();
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.error("Failed to get ready segments on {}", url, e); log.error("Failed to get ready segments from {}", url, e);
return new String[0]; return new String[0];
} }
} }
...@@ -46,12 +50,12 @@ public class AxonDbRestClient { ...@@ -46,12 +50,12 @@ public class AxonDbRestClient {
return dbDumpPath.getBody(); return dbDumpPath.getBody();
} }
private String eventsUrl() { private String eventsUrl(String host) {
return config.getJobs().get(0).getAxondbHost() + config.getEventsUrl(); return host + config.getEventsUrl();
} }
private String snapshotsUrl() { private String snapshotsUrl(String host) {
return config.getJobs().get(0).getAxondbHost() + config.getSnapshotsUrl(); return host + config.getSnapshotsUrl();
} }
private String dbDumpUrl(String host) { private String dbDumpUrl(String host) {
......
...@@ -3,9 +3,11 @@ package nl.trifork.axondbbackupclient.cron; ...@@ -3,9 +3,11 @@ package nl.trifork.axondbbackupclient.cron;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.backup.BackupJob; import nl.trifork.axondbbackupclient.backup.BackupJob;
import nl.trifork.axondbbackupclient.backup.BackupService; import nl.trifork.axondbbackupclient.backup.BackupService;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
...@@ -16,6 +18,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; ...@@ -16,6 +18,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.DONE; import static nl.trifork.axondbbackupclient.cron.BackupStatus.DONE;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.ERROR; import static nl.trifork.axondbbackupclient.cron.BackupStatus.ERROR;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.RUNNING; import static nl.trifork.axondbbackupclient.cron.BackupStatus.RUNNING;
import static nl.trifork.axondbbackupclient.cron.BackupTask.task;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
...@@ -25,6 +28,14 @@ public class BackupRunner { ...@@ -25,6 +28,14 @@ public class BackupRunner {
private final Executor executor = newFixedThreadPool(8); private final Executor executor = newFixedThreadPool(8);
private final List<BackupTask> queue = new CopyOnWriteArrayList<>(); private final List<BackupTask> queue = new CopyOnWriteArrayList<>();
private final BackupService backupService; private final BackupService backupService;
private final BackupConfig config;
@Scheduled(cron = "${backup.cron}")
public void cron() {
BackupTask task = task(config.getJobs());
log.info("Adding cron task {}", task);
add(task);
}
public void add(BackupTask task) { public void add(BackupTask task) {
task.getJobs().forEach(j -> executor.execute(() -> runJob(j, task))); task.getJobs().forEach(j -> executor.execute(() -> runJob(j, task)));
...@@ -45,6 +56,7 @@ public class BackupRunner { ...@@ -45,6 +56,7 @@ public class BackupRunner {
log.error("Failed job {}", job.getName(), e); log.error("Failed job {}", job.getName(), e);
} finally { } finally {
MDC.remove("traceId"); MDC.remove("traceId");
if (task.isFinished()) queue.remove(task);
} }
} }
......
package nl.trifork.axondbbackupclient.cron; package nl.trifork.axondbbackupclient.cron;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public enum BackupStatus { public enum BackupStatus {
SCHEDULLED, SCHEDULLED(false),
RUNNING, RUNNING(false),
DONE, DONE(true),
ERROR ERROR(true);
private final boolean isFinished;
public boolean isFinished() {
return isFinished;
}
} }
...@@ -43,4 +43,8 @@ public class BackupTask { ...@@ -43,4 +43,8 @@ public class BackupTask {
'}'; '}';
} }
public boolean isFinished() {
return status.values().stream().allMatch(BackupStatus::isFinished);
}
} }
package nl.trifork.axondbbackupclient.util;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import static org.apache.commons.io.FileUtils.copyFile;
@Slf4j
public class FileIoService {
public static void copy(File file, String destinationDir) {
try {
String destinationFilePath = destinationDir + file.getName();
File destinationFile = new File(destinationFilePath);
if (destinationFile.exists()) {
log.warn("Destination file already present {}", destinationFile);
return;
}
File tmpDestFile = new File(destinationFilePath + ".tmp");
copyFile(file, tmpDestFile);
tmpDestFile.renameTo(destinationFile);
} catch (IOException e) {
log.error("Failed to copy file {}", file, e);
}
}
}
package nl.trifork.axondbbackupclient.util; package nl.trifork.axondbbackupclient.util;
import lombok.extern.slf4j.Slf4j;
import java.io.File; import java.io.File;
import java.io.IOException;
import static org.apache.commons.io.FileUtils.copyFile;
@Slf4j
public class FileUtils {