Commit fc0a2d9c authored by Andreas König's avatar Andreas König
Browse files

Implemented central dispatcher

parent d37d4a9c
......@@ -15,31 +15,41 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M1</version>
<relativePath/> <!-- lookup parent from repository -->
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<vaadin.version>8.0.5</vaadin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>vaadin-spring-boot-starter</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......@@ -60,19 +70,23 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>vaadin-bom</artifactId>
<version>${vaadin.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
......
package ch.bergturbenthal.display.test;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.bergturbenthal.display.test.controller.DisplayController;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.properties.ApplicationProperties;
import ch.bergturbenthal.display.test.service.impl.DefaultDisplayService;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableScheduling
@ComponentScan(basePackageClasses = { DefaultDisplayService.class, DisplayController.class })
@EnableSwagger2
public class ApplicationConfiguration {
@Bean
public ApplicationProperties applicationProperties() {
return new ApplicationProperties();
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(),
new JsonDeserializer<>(Mutation.class, objectMapper));
}
// @Bean
// public Display display() throws UnknownHostException {
// return new RowDisplay(new InetSocketAddress(InetAddress.getByName("192.168.1.51"), 1337));
// }
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper));
}
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(2);
}
@Bean
public Docket swaggerSpringfoxDocket() {
final Docket swaggerSpringMvcPlugin = new Docket(DocumentationType.SWAGGER_2).directModelSubstitute(Instant.class, String.class)
.directModelSubstitute(Duration.class, Long.class).select().build();
return swaggerSpringMvcPlugin;
}
}
......@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DisplayApplication {
public static void main(String[] args) {
SpringApplication.run(DisplayApplication.class, args);
}
public static void main(final String[] args) {
SpringApplication.run(DisplayApplication.class, args);
}
}
......@@ -6,6 +6,7 @@ import java.awt.FontMetrics;
import java.awt.Graphics2D;
import java.awt.geom.AffineTransform;
import java.awt.geom.Rectangle2D;
import java.io.Closeable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
......@@ -14,13 +15,13 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import ch.bergturbenthal.display.test.display.Display;
import ch.bergturbenthal.display.test.display.DrawContext;
......@@ -33,8 +34,7 @@ import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DisplayConsumer {
public class DisplayConsumer implements Consumer<Mutation>, Closeable {
@Value
private static class DisplayEntry {
private String id;
......@@ -48,11 +48,59 @@ public class DisplayConsumer {
private final List<DisplayEntry> futureEntries = new LinkedList<>();
private final List<DisplayEntry> currentEntries = new LinkedList<>();
private final int rotationTime = 3;
private String currentVisibleText = "XXXXXX";
private String currentVisibleText = "<empty Entry>";
private final String displayName;
private final ScheduledFuture<?> scheduledFuture;
@Autowired
public DisplayConsumer(final Display display) {
public DisplayConsumer(final Display display, final String displayName, final ScheduledExecutorService executorService) {
this.displayName = displayName;
this.drawContext = display.createContext();
scheduledFuture = executorService.scheduleAtFixedRate(() -> {
try {
updateDisplay();
} catch (final Exception ex) {
log.warn("Error updating display", ex);
}
}, 1000, 100, TimeUnit.MILLISECONDS);
}
@Override
public void accept(final Mutation value) {
if (value instanceof View) {
final View view = (View) value;
final String id = view.getId();
remove(id);
if (view.getDisplay().equals(this.displayName)) {
addViewEntry(view, id);
}
} else if (value instanceof RemoveView)
{
final String id = ((RemoveView) value).getId();
remove(id);
}
updateDisplay();
}
private void addViewEntry(final View view, final String id) {
final Supplier<String> supplier;
if (view instanceof TextView) {
final String text = ((TextView) view).getText();
supplier = () -> text;
} else if (view instanceof ClockView) {
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM);
supplier = () -> formatter.format(Instant.now().atZone(ZoneId.systemDefault()).toLocalTime());
} else {
return;
}
insertFutureEntry(new DisplayEntry(id, view.getBegin(), view.getBegin().plus(view.getDuration()), view.getPriority(), supplier));
}
@Override
public void close() {
scheduledFuture.cancel(true);
}
private void insertCurrentEntry(final DisplayEntry displayEntry) {
......@@ -82,33 +130,6 @@ public class DisplayConsumer {
futureEntries.add(newEntry);
}
@KafkaListener(topics = "display")
public synchronized void processSchedule(final ConsumerRecord<String, Mutation> cr) {
final Mutation value = cr.value();
if (value instanceof View) {
final View view = (View) value;
final String id = view.getId();
remove(id);
final Supplier<String> supplier;
if (value instanceof TextView) {
final String text = ((TextView) value).getText();
supplier = () -> text;
} else if (value instanceof ClockView) {
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM);
supplier = () -> formatter.format(Instant.now().atZone(ZoneId.systemDefault()).toLocalTime());
} else {
return;
}
insertFutureEntry(new DisplayEntry(id, view.getBegin(), view.getBegin().plus(view.getDuration()), view.getPriority(), supplier));
updateCurrentEntries();
} else if (value instanceof RemoveView) {
final String id = ((RemoveView) value).getId();
remove(id);
}
updateDisplay();
}
private void remove(final String id) {
futureEntries.removeIf(e -> e.getId().equals(id));
currentEntries.removeIf(e -> e.getId().equals(id));
......
package ch.bergturbenthal.display.test.controller;
import java.util.Collection;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ch.bergturbenthal.display.test.service.DisplayService;
@RestController
@RequestMapping(path = "display", produces = "application/json")
public class DisplayController {
private final DisplayService displayService;
public DisplayController(final DisplayService displayService) {
this.displayService = displayService;
}
@GetMapping
public Collection<String> listDisplays() {
return displayService.listAvailableDisplays();
}
}
package ch.bergturbenthal.display.test.controller;
import java.util.Collection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ch.bergturbenthal.display.test.model.View;
import ch.bergturbenthal.display.test.service.ViewOverview;
@RestController
@RequestMapping(path = "view", produces = "application/json")
public class ViewController {
@Autowired
private ViewOverview viewOverview;
@GetMapping(path = "{id}")
public View getView(@PathVariable("id") final String id) {
return viewOverview.getView(id);
}
@GetMapping
public Collection<String> listViews() {
return viewOverview.listKnownViews();
}
}
......@@ -3,13 +3,23 @@ package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonFormat.Shape;
import lombok.NonNull;
import lombok.Value;
@Value
public class ClockView implements Mutation, View {
@NonNull
private String id;
@NonNull
@JsonFormat(shape = Shape.STRING)
private Instant begin;
@NonNull
@JsonFormat(shape = Shape.STRING)
private Duration duration;
@NonNull
private String display;
private int priority;
}
......@@ -7,7 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@JsonTypeInfo(use = Id.MINIMAL_CLASS, include = As.PROPERTY, property = "type")
@JsonSubTypes({ @Type(View.class), @Type(RemoveView.class) })
@JsonSubTypes({ @Type(RemoveView.class), @Type(TextView.class), @Type(ClockView.class) })
public interface Mutation {
}
package ch.bergturbenthal.display.test.model;
import lombok.NonNull;
import lombok.Value;
@Value
public class RemoveView implements Mutation {
@NonNull
private String id;
}
......@@ -3,14 +3,25 @@ package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonFormat.Shape;
import lombok.NonNull;
import lombok.Value;
@Value
public class TextView implements Mutation, View {
@NonNull
private String id;
@NonNull
@JsonFormat(shape = Shape.NUMBER_INT)
private Instant begin;
@NonNull
@JsonFormat(shape = Shape.NUMBER_INT)
private Duration duration;
@NonNull
private String display;
@NonNull
private String text;
private int priority;
}
......@@ -3,6 +3,14 @@ package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@JsonTypeInfo(use = Id.MINIMAL_CLASS, include = As.PROPERTY, property = "type")
@JsonSubTypes({ @Type(TextView.class), @Type(ClockView.class) })
public interface View {
Instant getBegin();
......
package ch.bergturbenthal.display.test.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Data;
@Data
@ConfigurationProperties(prefix = "displays")
public class ApplicationProperties {
private Map<String, DisplayProperties> target = new HashMap<>();
}
package ch.bergturbenthal.display.test.properties;
import lombok.Data;
@Data
public class DisplayProperties {
private String host;
private int port = 1337;
}
package ch.bergturbenthal.display.test.service;
import java.util.Collection;
public interface DisplayService {
Collection<String> listAvailableDisplays();
}
package ch.bergturbenthal.display.test.service;
import ch.bergturbenthal.display.test.model.Mutation;
public interface MutationDispatcher {
void sendEvent(Mutation mutation);
}
package ch.bergturbenthal.display.test.service;
import java.util.Collection;
import ch.bergturbenthal.display.test.model.View;
public interface ViewOverview {
View getView(String id);
Collection<String> listKnownViews();
}
package ch.bergturbenthal.display.test.service.impl;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import org.springframework.stereotype.Service;
import ch.bergturbenthal.display.test.consumer.DisplayConsumer;
import ch.bergturbenthal.display.test.display.impl.RowDisplay;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.properties.ApplicationProperties;
import ch.bergturbenthal.display.test.properties.DisplayProperties;
import ch.bergturbenthal.display.test.service.DisplayService;
@Service
public class DefaultDisplayService implements DisplayService, Consumer<Mutation> {
private final Map<String, DisplayConsumer> displays;
public DefaultDisplayService(final ApplicationProperties properties, final ScheduledExecutorService executorService) {
displays = new HashMap<>();
for (final Entry<String, DisplayProperties> displayPropertyEntry : properties.getTarget().entrySet()) {
final DisplayProperties displayProperties = displayPropertyEntry.getValue();
final SocketAddress address = new InetSocketAddress(displayProperties.getHost(), displayProperties.getPort());
final String displayName = displayPropertyEntry.getKey();
displays.put(displayName, new DisplayConsumer(new RowDisplay(address), displayName, executorService));
}
}
@Override
public void accept(final Mutation value) {
for (final DisplayConsumer displayConsumer : displays.values()) {
displayConsumer.accept(value);
}
}
@PreDestroy
public void close() {
for (final DisplayConsumer display : displays.values()) {
display.close();
}
}
@Override
public Collection<String> listAvailableDisplays() {
return Collections.unmodifiableSet(displays.keySet());
}
}
package ch.bergturbenthal.display.test.service.impl;
import java.util.List;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.service.MutationDispatcher;
@Service
public class DefaultMutationDispatcher implements MutationDispatcher {
private final List<Consumer<Mutation>> mutationConsumers;
@Autowired
public DefaultMutationDispatcher(final List<Consumer<Mutation>> mutationConsumers) {
this.mutationConsumers = mutationConsumers;
}