Commit d83a2cca authored by Evgeni Sladkovskii's avatar Evgeni Sladkovskii

release 1.1.0 with simplified workflow

parent 7afe8108
......@@ -23,16 +23,12 @@ backup:
# second, minute, hour, day, month, weekday
cron: "0 0 * * * *"
eventsUrl: /v1/backup/filenames?type=Event
snapshotsUrl: /v1/backup/filenames?type=Snapshot
controlDbUrl: /v1/backup/createControlDbBackup
backupPath: /backup/
axonPath: /axon/
backupPath: /backup/events/
backupPathClosed: /backup/events/closed
eventsPath: /axon/events/default/
axondbHost: http://axondb-1:8023/axondb
axondbToken: dummy-token
jobs:
- name: axondb1
axondbToken: dummy-token
axondbHost: http://axondb-1:8023/axondb
eventsPath: /events/default/
snapshotsPath: /events/default/
controlDbPath: /controldata/
\ No newline at end of file
eventsUrl: /v1/backup/filenames?type=Event
snapshotsUrl: /v1/backup/filenames?type=Snapshot
\ No newline at end of file
spring:
datasource:
url: jdbc:h2:/controldata/axondb-controldb
url: jdbc:h2:/controldb/axondb-controldb
axoniq:
axondb:
domain: local
controldb-backup-location: /controldata/
controldb-backup-location: /controldb/
file:
storage: /events
max-segment-size: 1_000_000
......
......@@ -3,12 +3,12 @@ version: '3.4'
services:
axondb-1:
image: eu.gcr.io/tradingengine-194513/blox-infra/axondb:1.3.9-1
image: eu.gcr.io/tradingengine-194513/blox-infra/axondb:1.3.11-1
hostname: axondb-1
volumes:
- ./config/axondb.yml:/axondb.yml
- ./axon/axondb1/events:/events
- ./axon/axondb1/controldata:/controldata
- ./axon/events:/events
- ./axon/controldb:/controldb
ports:
- "8023:8023"
- "8123:8123"
......@@ -19,7 +19,7 @@ services:
APP_OPTS: "--spring.config.location=/application.yaml"
volumes:
- ./config/application.yaml:/application.yaml
- ./axon/axondb1/events:/axon/axondb1/events
- ./axon/axondb1/controldata:/axon/axondb1/controldata
- ./axon/events:/axon/events
- ./backup:/backup
ports:
- "9080:80"
......@@ -2,7 +2,7 @@
REPO="eu.gcr.io/tradingengine-194513/blox-infra/"
NAME="axondb-backup"
TAG="1.0.0"
TAG="1.1.0"
docker build --tag "${REPO}${NAME}:${TAG}" .
docker push "${REPO}${NAME}:${TAG}"
......@@ -16,7 +16,6 @@ public class AxonDbBackupClient {
public static void main(String[] args) {
SpringApplication.run(AxonDbBackupClient.class, args);
log.info("Backup client started");
}
@Bean
......
......@@ -2,14 +2,10 @@ package nl.trifork.axondbbackupclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.backup.BackupJob;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import static nl.trifork.axondbbackupclient.util.FileUtils.ensureDir;
@Data
@ConfigurationProperties(prefix = "backup")
......@@ -17,20 +13,19 @@ import static nl.trifork.axondbbackupclient.util.FileUtils.ensureDir;
@Slf4j
public class BackupConfig {
private String eventsUrl;
private String snapshotsUrl;
private String controlDbUrl;
private String backupPath;
private String axonPath;
private String backupPathClosed;
private String eventsPath;
private List<BackupJob> jobs;
private String axondbHost;
private String axondbToken;
private String eventsUrl;
private String snapshotsUrl;
@PostConstruct
public void print() {
log.info(toString());
ensureDir(backupPath);
jobs.forEach(j -> ensureDir(backupPath + j.getName()));
}
}
}
\ No newline at end of file
package nl.trifork.axondbbackupclient.backup;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class BackupHelper {
private final BackupConfig config;
}
......@@ -5,29 +5,45 @@ import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import org.springframework.stereotype.Service;
import static nl.trifork.axondbbackupclient.util.FileUtils.ensureDir;
import java.io.File;
import java.util.List;
import static nl.trifork.axondbbackupclient.util.FileUtils.*;
@Service
@Slf4j
@RequiredArgsConstructor
public class BackupService {
private final ControlDBBackupService controlDb;
private final SnapshotsBackupService snapshots;
private final EventsBackupService events;
private final BackupConfig config;
public void run(BackupJob job) {
ensureDirs(job);
controlDb.backup(job);
snapshots.backup(job);
events.backup(job);
public void run() {
List<File> closedEvents = events.closedEvents();
List<File> currentEvents = events.currentEvents(closedEvents);
log.info("Closed events {} currents events {}", closedEvents.size(), currentEvents.size());
List<File> closedSnapshots = snapshots.closedSnapshots();
List<File> currentSnapshots = snapshots.currentSnapshots(closedSnapshots);
log.info("Closed snapshots {} currents snapshots {}", closedSnapshots.size(), currentSnapshots.size());
copyClosed(closedEvents);
copyClosed(closedSnapshots);
copyCurrent(currentSnapshots, currentEvents);
}
private void ensureDirs(BackupJob job) {
String eventsDir = config.getBackupPath() + job.getName() + "/events";
ensureDir(eventsDir);
ensureDir(eventsDir + "/closed");
private void copyClosed(List<File> closed) {
String dirTo = config.getBackupPathClosed();
closed.forEach(f -> copyWithoutOverride(f, dirTo));
}
private void copyCurrent(List<File> snapshots, List<File> events) {
String dirTo = config.getBackupPath();
clearFilesInDir(dirTo);
snapshots.forEach(f -> copyWithOverride(f, dirTo));
events.forEach(f -> copyWithOverride(f, dirTo));
}
}
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.client.AxonDbRestClient;
import org.springframework.stereotype.Service;
import java.io.File;
import static com.google.common.base.Stopwatch.createStarted;
import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.filenameFromPath;
@Service
@Slf4j
@RequiredArgsConstructor
public class ControlDBBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
public void backup(BackupJob job) {
Stopwatch stopwatch = createStarted();
String dumpPath = axonDbClient.createControlDbDump(job.getAxondbHost());
File dbDump = dbDump(job, dumpPath);
String pathToSave = pathToSave(job);
if (dbDump.exists()) {
copy(dbDump, pathToSave);
log.info("Created and copied db dump {} for {} to {} in {}", dbDump.getName(), job.getName(), pathToSave, stopwatch);
}
}
private File dbDump(BackupJob job, String dumpPath) {
String path = config.getAxonPath() + job.getName() + job.getControlDbPath() + filenameFromPath(dumpPath);
return new File(path);
}
private String pathToSave(BackupJob job) {
return config.getBackupPath() + job.getName() + "/";
}
}
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
......@@ -14,11 +13,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import static com.google.common.base.Stopwatch.createStarted;
import static java.nio.file.Files.list;
import static java.util.stream.Collectors.toList;
import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.copyWithOverride;
@Service
@RequiredArgsConstructor
......@@ -28,63 +24,39 @@ public class EventsBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
//todo extract similar logic to snapshots
public void backup(BackupJob job) {
Stopwatch stopwatch = createStarted();
List<File> closedEvents = closedEvents(job);
log.trace("Closed events size {}", closedEvents.size());
copyClosed(job, closedEvents);
List<File> currentSnapshots = currentEvents(job, closedEvents);
log.trace("Current snapshots {}", currentSnapshots);
copyCurrent(job, currentSnapshots);
log.trace("Copied events in {}", stopwatch);
}
private List<File> closedEvents(BackupJob job) {
List<String> closed = axonDbClient.getClosedEvents(job.getAxondbHost()).stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
List<File> closedEvents() {
List<String> closed = axonDbClient.getClosedEvents().stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getEventsPath());
Path dir = Paths.get(config.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir");
throw new RuntimeException(e);
}
}
private void copyClosed(BackupJob job, List<File> closed) {
String dirTo = config.getBackupPath() + job.getName() + "/events/closed/";
closed.forEach(f -> copy(f, dirTo));
}
private List<File> currentEvents(BackupJob job, List<File> closed) {
List<File> currentEvents(List<File> closed) {
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getEventsPath());
Path dir = Paths.get(config.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> !closed.contains(f))
.collect(toList());
.map(Path::toFile)
.filter(f -> !f.getName().endsWith(".snapshots"))
.filter(f -> !closed.contains(f))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir");
throw new RuntimeException(e);
}
}
private void copyCurrent(BackupJob job, List<File> currentSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/";
currentSnapshots.forEach(f -> copyWithOverride(f, dirTo));
}
}
package nl.trifork.axondbbackupclient.backup;
import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
......@@ -14,11 +13,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import static com.google.common.base.Stopwatch.createStarted;
import static java.nio.file.Files.list;
import static java.util.stream.Collectors.toList;
import static nl.trifork.axondbbackupclient.util.FileUtils.copy;
import static nl.trifork.axondbbackupclient.util.FileUtils.copyWithOverride;
@Service
@RequiredArgsConstructor
......@@ -28,62 +24,38 @@ public class SnapshotsBackupService {
private final BackupConfig config;
private final AxonDbRestClient axonDbClient;
public void backup(BackupJob job) {
Stopwatch stopwatch = createStarted();
List<File> closedSnapshots = closedSnapshots(job);
log.trace("Closed snapshots size {}", closedSnapshots.size());
copyClosed(job, closedSnapshots);
List<File> currentSnapshots = currentSnapshots(job, closedSnapshots);
log.trace("Current snapshots {}", currentSnapshots);
copyCurrent(job, currentSnapshots);
log.trace("Copied snapshots in {}", stopwatch);
}
private List<File> closedSnapshots(BackupJob job) {
List<String> closed = axonDbClient.getClosedSnapshots(job.getAxondbHost()).stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
List<File> closedSnapshots() {
List<String> closed = axonDbClient.getClosedSnapshots().stream()
.map(FileUtils::filenameFromPath)
.collect(toList());
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getSnapshotsPath());
Path dir = Paths.get(config.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> closed.contains(f.getName()))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read events dir");
log.error("Failed to read snapshots dir");
throw new RuntimeException(e);
}
}
private void copyClosed(BackupJob job, List<File> closedSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/closed/";
closedSnapshots.forEach(f -> copy(f, dirTo));
}
private List<File> currentSnapshots(BackupJob job, List<File> closedSnapshots) {
List<File> currentSnapshots(List<File> closedSnapshots) {
try {
Path dir = Paths.get(config.getAxonPath() + job.getName() + job.getSnapshotsPath());
Path dir = Paths.get(config.getEventsPath());
return list(dir)
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> !closedSnapshots.contains(f))
.collect(toList());
.map(Path::toFile)
.filter(f -> f.getName().endsWith(".snapshots"))
.filter(f -> !closedSnapshots.contains(f))
.collect(toList());
} catch (IOException e) {
log.error("Failed to read snapshots dir");
throw new RuntimeException(e);
}
}
private void copyCurrent(BackupJob job, List<File> currentSnapshots) {
String dirTo = config.getBackupPath() + job.getName() + "/events/";
currentSnapshots.forEach(f -> copyWithOverride(f, dirTo));
}
}
......@@ -13,7 +13,6 @@ import java.util.List;
import static java.util.Arrays.asList;
import static org.springframework.http.HttpMethod.GET;
import static org.springframework.http.HttpMethod.POST;
@Service
@RequiredArgsConstructor
......@@ -23,12 +22,12 @@ public class AxonDbRestClient {
private final RestTemplate restTemplate;
private final BackupConfig config;
public List<String> getClosedEvents(String host) {
return asList(getReadySegments(eventsUrl(host)));
public List<String> getClosedEvents() {
return asList(getReadySegments(eventsUrl(config.getAxondbHost())));
}
public List<String> getClosedSnapshots(String host) {
return asList(getReadySegments(snapshotsUrl(host)));
public List<String> getClosedSnapshots() {
return asList(getReadySegments(snapshotsUrl(config.getAxondbHost())));
}
private String[] getReadySegments(String url) {
......@@ -42,14 +41,6 @@ public class AxonDbRestClient {
}
}
public String createControlDbDump(String host) {
String url = dbDumpUrl(host);
log.trace("Calling for controldb backup on {}", url);
ResponseEntity<String> dbDumpPath = restTemplate.exchange(url, POST, reqEntity(), String.class);
log.trace("Controldb backup created on {}", dbDumpPath.getBody());
return dbDumpPath.getBody();
}
private String eventsUrl(String host) {
return host + config.getEventsUrl();
}
......@@ -58,13 +49,9 @@ public class AxonDbRestClient {
return host + config.getSnapshotsUrl();
}
private String dbDumpUrl(String host) {
return host + config.getControlDbUrl();
}
private HttpEntity reqEntity() {
HttpHeaders headers = new HttpHeaders();
headers.add("Access-Token", config.getJobs().get(0).getAxondbToken());
headers.add("Access-Token", config.getAxondbToken());
HttpEntity<String> entity = new HttpEntity<>(headers);
return entity;
}
......
......@@ -4,21 +4,20 @@ import com.google.common.base.Stopwatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.trifork.axondbbackupclient.BackupConfig;
import nl.trifork.axondbbackupclient.backup.BackupJob;
import nl.trifork.axondbbackupclient.backup.BackupService;
import org.slf4j.MDC;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.DONE;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.ERROR;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.RUNNING;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.*;
import static nl.trifork.axondbbackupclient.cron.BackupTask.task;
import static nl.trifork.axondbbackupclient.util.FileUtils.ensureDir;
@Service
@RequiredArgsConstructor
......@@ -30,33 +29,39 @@ public class BackupRunner {
private final BackupService backupService;
private final BackupConfig config;
@PostConstruct
public void ensureDirs() {
ensureDir(config.getBackupPath());
ensureDir(config.getBackupPathClosed());
}
@Scheduled(cron = "${backup.cron}")
public void cron() {
BackupTask task = task(config.getJobs());
BackupTask task = task();
log.info("Adding cron task {}", task);
add(task);
}
public void add(BackupTask task) {
task.getJobs().forEach(j -> executor.execute(() -> runJob(j, task)));
executor.execute(() -> runTask(task));
queue.add(task);
}
private void runJob(BackupJob job, BackupTask task) {
private void runTask(BackupTask task) {
Stopwatch sw = Stopwatch.createStarted();
MDC.put("traceId", task.getId());
task.getStatus().put(job.getName(), RUNNING);
log.info("Running job {}", job.getName());
task.setStatus(RUNNING);
log.info("Running task...");
try {
backupService.run(job);
task.getStatus().put(job.getName(), DONE);
log.info("Finished job {} for {} in {}", job.getName(), task, sw);
backupService.run();
task.setStatus(DONE);
log.info("Finished task in {}", sw);
} catch (RuntimeException e) {
task.getStatus().put(job.getName(), ERROR);
log.error("Failed job {}", job.getName(), e);
task.setStatus(ERROR);
log.error("Failed task", e);
} finally {
MDC.remove("traceId");
if (task.isFinished()) queue.remove(task);
queue.remove(task);
}
}
......
package nl.trifork.axondbbackupclient.cron;
import lombok.AccessLevel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import nl.trifork.axondbbackupclient.backup.BackupJob;
import lombok.*;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.DONE;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.ERROR;
import static java.util.UUID.randomUUID;
import static nl.trifork.axondbbackupclient.cron.BackupStatus.SCHEDULLED;
@Data
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class BackupTask {
@EqualsAndHashCode.Include
private final String id;
private final String id = randomUUID().toString().substring(24);
private final Instant timestamp = Instant.now();
private final List<BackupJob> jobs;
private final Map<String, BackupStatus> status;
@Setter
private volatile BackupStatus status = SCHEDULLED;
public static BackupTask task(List<BackupJob> jobs) {
String id = UUID.randomUUID().toString().substring(24);
Map<String, BackupStatus> statuses = new HashMap<>();
jobs.forEach(j -> statuses.put(j.getName(), SCHEDULLED));
return new BackupTask(id, jobs, statuses);
public static BackupTask task() {
return new BackupTask();
}
@Override
public String toString() {
return "BackupTask{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", status=" + status +
'}';
"id='" + id + '\'' +
", timestamp=" + timestamp +
", status=" + status +
'}';
}