Commit 4ce1c27e authored by Andreas König's avatar Andreas König
Browse files

replaced kafka by file storage

parent fc0a2d9c
......@@ -54,14 +54,6 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
......
......@@ -5,24 +5,12 @@ import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.bergturbenthal.display.test.controller.DisplayController;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.properties.ApplicationProperties;
import ch.bergturbenthal.display.test.service.impl.DefaultDisplayService;
import springfox.documentation.spi.DocumentationType;
......@@ -40,22 +28,6 @@ public class ApplicationConfiguration {
return new ApplicationProperties();
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(),
new JsonDeserializer<>(Mutation.class, objectMapper));
}
// @Bean
// public Display display() throws UnknownHostException {
// return new RowDisplay(new InetSocketAddress(InetAddress.getByName("192.168.1.51"), 1337));
// }
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper));
}
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(2);
......
......@@ -65,7 +65,7 @@ public class DisplayConsumer implements Consumer<Mutation>, Closeable {
}
@Override
public void accept(final Mutation value) {
public synchronized void accept(final Mutation value) {
if (value instanceof View) {
final View view = (View) value;
final String id = view.getId();
......
package ch.bergturbenthal.display.test.controller;
import java.util.Collection;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import ch.bergturbenthal.display.test.model.RemoveView;
import ch.bergturbenthal.display.test.model.View;
import ch.bergturbenthal.display.test.model.dto.ViewDto;
import ch.bergturbenthal.display.test.service.MutationDispatcher;
import ch.bergturbenthal.display.test.service.ViewOverview;
@RestController
@RequestMapping(path = "view", produces = "application/json")
public class ViewController {
@Autowired
private ViewOverview viewOverview;
private ViewOverview viewOverview;
@Autowired
private MutationDispatcher mutationDispatcher;
@PostMapping
public @ResponseBody String createMapping(@RequestBody final ViewDto view) {
final String id = UUID.randomUUID().toString();
final View createdMutation = view.createMutation(id);
mutationDispatcher.sendEvent(createdMutation);
return id;
}
@DeleteMapping(path = "{id}")
public void deleteView(@PathVariable("id") final String id) {
mutationDispatcher.sendEvent(new RemoveView(id));
}
@GetMapping(path = "{id}")
public View getView(@PathVariable("id") final String id) {
return viewOverview.getView(id);
public ViewDto getView(@PathVariable("id") final String id) {
final View view = viewOverview.getView(id);
if (view == null) {
return null;
}
return ViewDto.createFromView(view);
}
@GetMapping
public Collection<String> listViews() {
return viewOverview.listKnownViews();
}
@PutMapping(path = "{id}")
public void updateView(@PathVariable("id") final String id, @RequestBody final ViewDto view) {
mutationDispatcher.sendEvent(view.createMutation(id));
}
}
......@@ -14,10 +14,10 @@ public class ClockView implements Mutation, View {
@NonNull
private String id;
@NonNull
@JsonFormat(shape = Shape.STRING)
@JsonFormat(shape = Shape.NUMBER_INT)
private Instant begin;
@NonNull
@JsonFormat(shape = Shape.STRING)
@JsonFormat(shape = Shape.NUMBER_INT)
private Duration duration;
@NonNull
private String display;
......
......@@ -11,7 +11,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@JsonTypeInfo(use = Id.MINIMAL_CLASS, include = As.PROPERTY, property = "type")
@JsonSubTypes({ @Type(TextView.class), @Type(ClockView.class) })
public interface View {
public interface View extends Mutation {
Instant getBegin();
String getDisplay();
......
package ch.bergturbenthal.display.test.model.dto;
import java.time.Duration;
import java.time.Instant;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonFormat.Shape;
import ch.bergturbenthal.display.test.model.ClockView;
import ch.bergturbenthal.display.test.model.TextView;
import ch.bergturbenthal.display.test.model.View;
import lombok.NonNull;
import lombok.Value;
@Value
public class ViewDto {
public static enum ViewType {
TEXT, CLOCK
}
public static ViewDto createFromView(final View view) {
if (view instanceof TextView) {
return new ViewDto(ViewType.TEXT, view.getBegin(), view.getDuration(), view.getDisplay(), ((TextView) view).getText(),
view.getPriority());
}
if (view instanceof ClockView) {
return new ViewDto(ViewType.CLOCK, view.getBegin(), view.getDuration(), view.getDisplay(), null, view.getPriority());
}
throw new IllegalArgumentException("View " + view + " unsupported");
}
@NonNull
private ViewType type;
@NonNull
@JsonFormat(shape = Shape.NUMBER_INT)
private Instant begin;
@NonNull
@JsonFormat(shape = Shape.NUMBER_INT)
private Duration duration;
@NonNull
private String display;
private String text;
private int priority;
public View createMutation(final String entry) {
switch (this.getType()) {
case CLOCK:
return new ClockView(entry, this.getBegin(), this.getDuration(), this.getDisplay(), this.getPriority());
case TEXT:
return new TextView(entry, this.getBegin(), this.getDuration(), this.getDisplay(), this.getText(), this.getPriority());
}
throw new IllegalArgumentException("Type " + this.getType() + " unknown");
}
}
package ch.bergturbenthal.display.test.properties;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
......@@ -10,5 +11,6 @@ import lombok.Data;
@Data
@ConfigurationProperties(prefix = "displays")
public class ApplicationProperties {
private Map<String, DisplayProperties> target = new HashMap<>();
private Map<String, DisplayProperties> target = new HashMap<>();
private File storageDirectory = new File("/tmp/events");
}
package ch.bergturbenthal.display.test.service.impl;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.properties.ApplicationProperties;
import ch.bergturbenthal.display.test.service.MutationDispatcher;
import lombok.Cleanup;
@Service
public class DefaultMutationDispatcher implements MutationDispatcher {
private final List<Consumer<Mutation>> mutationConsumers;
private final File storageDirectory;
private final ObjectWriter mutationWriter;
private final Map<String, PrintWriter> openWriteFiles = new HashMap<>();
private final ObjectReader mutationReader;
@Autowired
public DefaultMutationDispatcher(final List<Consumer<Mutation>> mutationConsumers) {
public DefaultMutationDispatcher(final List<Consumer<Mutation>> mutationConsumers, final ApplicationProperties properties,
final ObjectMapper objectMapper) throws IOException {
this.mutationConsumers = mutationConsumers;
storageDirectory = properties.getStorageDirectory();
mutationReader = objectMapper.readerFor(Mutation.class);
mutationWriter = objectMapper.writerFor(Mutation.class);
if (!storageDirectory.exists()) {
storageDirectory.mkdirs();
}
streamPastEvents(m -> doSendEvent(m));
}
@KafkaListener(topics = "display")
public synchronized void processSchedule(final ConsumerRecord<String, Mutation> cr) {
final Mutation value = cr.value();
sendEvent(value);
@Scheduled(fixedDelay = 10 * 60 * 1000)
public synchronized void closeAllFiles() {
for (final Iterator<PrintWriter> iterator = openWriteFiles.values().iterator(); iterator.hasNext();) {
iterator.next().close();
iterator.remove();
}
}
@Override
public void sendEvent(final Mutation mutation) {
private void doSendEvent(final Mutation mutation) {
for (final Consumer<Mutation> consumer : mutationConsumers) {
consumer.accept(mutation);
}
}
@Override
public void sendEvent(final Mutation mutation) {
storeEvent(mutation);
doSendEvent(mutation);
}
private synchronized void storeEvent(final Mutation mutation) {
try {
final Instant now = Instant.now();
final String stringValue = now + " " + mutationWriter.writeValueAsString(mutation);
final String filename = now.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_LOCAL_DATE);
final PrintWriter writer = openWriteFiles.computeIfAbsent(filename, f -> {
final File file = new File(storageDirectory, f);
try {
return new PrintWriter(new FileOutputStream(file, true));
} catch (final FileNotFoundException e) {
throw new RuntimeException("Cannot open file " + file, e);
}
});
writer.println(stringValue);
writer.flush();
} catch (final JsonProcessingException e) {
throw new RuntimeException("Cannot serialize " + mutation);
}
}
private void streamPastEvents(final Consumer<Mutation> sendConsumer) throws IOException {
final File[] files = storageDirectory.listFiles(f -> f.canWrite() && f.isFile());
Arrays.sort(files, 0, files.length, (f1, f2) -> f1.getName().compareTo(f2.getName()));
for (final File file : files) {
@Cleanup
final BufferedReader reader = new BufferedReader(new FileReader(file));
while (true) {
final String line = reader.readLine();
if (line == null) {
break;
}
final String[] parts = line.split(" ", 2);
if (parts.length != 2) {
continue;
}
sendConsumer.accept(mutationReader.readValue(parts[1]));
}
}
}
}
......@@ -4,25 +4,21 @@ import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import ch.bergturbenthal.display.test.model.ClockView;
import ch.bergturbenthal.display.test.model.TextView;
import ch.bergturbenthal.display.test.model.View;
import ch.bergturbenthal.display.test.service.MutationDispatcher;
import lombok.extern.slf4j.Slf4j;
@Slf4j
// @SpringBootApplication
@EnableKafka
@EnableScheduling
@EnableAutoConfiguration
@Import(ApplicationConfiguration.class)
......@@ -35,32 +31,23 @@ public class Application implements CommandLineRunner {
}
@Autowired
private KafkaTemplate<String, View> template;
private final CountDownLatch latch = new CountDownLatch(3);
// @KafkaListener(topics = TOPIC)
// public void listen(final ConsumerRecord<String, Mutation> cr) throws Exception {
// log.info(cr.toString());
// // latch.countDown();
// }
private MutationDispatcher eventDispatcher;
@Override
public void run(final String... args) throws Exception {
Thread.sleep(20);
if (latch.getCount() > 0) {
template.send(TOPIC,
new ClockView(UUID.randomUUID().toString(), Instant.MIN, Duration.between(Instant.MIN, Instant.MAX), "Rittersaal", 1));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(20, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "1. Punkt", 3));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(25, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "noch etwas Wichtiges", 3));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(1, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "Pause", 2));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(2, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "2. Punkt", 3));
log.info("------------------------------------------------------");
log.info("All Sent");
}
eventDispatcher
.sendEvent(new ClockView(UUID.randomUUID().toString(), Instant.MIN, Duration.between(Instant.MIN, Instant.MAX), "Rittersaal", 1));
eventDispatcher.sendEvent(new TextView(UUID.randomUUID().toString(), Instant.now().plus(20, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "1. Punkt", 3));
eventDispatcher.sendEvent(new TextView(UUID.randomUUID().toString(), Instant.now().plus(25, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "noch etwas Wichtiges", 3));
eventDispatcher.sendEvent(new TextView(UUID.randomUUID().toString(), Instant.now().plus(1, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "Pause", 2));
eventDispatcher.sendEvent(new TextView(UUID.randomUUID().toString(), Instant.now().plus(2, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "2. Punkt", 3));
log.info("------------------------------------------------------");
log.info("All Sent");
// latch.await(120, TimeUnit.SECONDS);
// Thread.sleep(30000);
// log.info("All received");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment