Commit d37d4a9c authored by Andreas König's avatar Andreas König
Browse files

implemented simple view scheduler

parent f673fd06
......@@ -44,6 +44,22 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
......
package ch.bergturbenthal.display.test.consumer;
import java.awt.Color;
import java.awt.Font;
import java.awt.FontMetrics;
import java.awt.Graphics2D;
import java.awt.geom.AffineTransform;
import java.awt.geom.Rectangle2D;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import ch.bergturbenthal.display.test.display.Display;
import ch.bergturbenthal.display.test.display.DrawContext;
import ch.bergturbenthal.display.test.model.ClockView;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.model.RemoveView;
import ch.bergturbenthal.display.test.model.TextView;
import ch.bergturbenthal.display.test.model.View;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DisplayConsumer {
@Value
private static class DisplayEntry {
private String id;
private Instant start;
private Instant end;
private int priority;
private Supplier<String> string;
}
private final DrawContext drawContext;
private final List<DisplayEntry> futureEntries = new LinkedList<>();
private final List<DisplayEntry> currentEntries = new LinkedList<>();
private final int rotationTime = 3;
private String currentVisibleText = "XXXXXX";
@Autowired
public DisplayConsumer(final Display display) {
this.drawContext = display.createContext();
}
private void insertCurrentEntry(final DisplayEntry displayEntry) {
if (displayEntry.getEnd().isBefore(Instant.now())) {
return;
}
for (final ListIterator<DisplayEntry> listIterator = currentEntries.listIterator(); listIterator.hasNext();) {
final DisplayEntry nextEntry = listIterator.next();
if (nextEntry.getEnd().isAfter(displayEntry.getEnd())) {
listIterator.previous();
listIterator.add(displayEntry);
return;
}
}
currentEntries.add(displayEntry);
}
private void insertFutureEntry(final DisplayEntry newEntry) {
for (final ListIterator<DisplayEntry> listIterator = futureEntries.listIterator(); listIterator.hasNext();) {
final DisplayEntry nextEntry = listIterator.next();
if (nextEntry.getStart().isAfter(newEntry.getStart())) {
listIterator.previous();
listIterator.add(newEntry);
return;
}
}
futureEntries.add(newEntry);
}
@KafkaListener(topics = "display")
public synchronized void processSchedule(final ConsumerRecord<String, Mutation> cr) {
final Mutation value = cr.value();
if (value instanceof View) {
final View view = (View) value;
final String id = view.getId();
remove(id);
final Supplier<String> supplier;
if (value instanceof TextView) {
final String text = ((TextView) value).getText();
supplier = () -> text;
} else if (value instanceof ClockView) {
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM);
supplier = () -> formatter.format(Instant.now().atZone(ZoneId.systemDefault()).toLocalTime());
} else {
return;
}
insertFutureEntry(new DisplayEntry(id, view.getBegin(), view.getBegin().plus(view.getDuration()), view.getPriority(), supplier));
updateCurrentEntries();
} else if (value instanceof RemoveView) {
final String id = ((RemoveView) value).getId();
remove(id);
}
updateDisplay();
}
private void remove(final String id) {
futureEntries.removeIf(e -> e.getId().equals(id));
currentEntries.removeIf(e -> e.getId().equals(id));
}
private void show(final String text) {
if (currentVisibleText.equals(text)) {
return;
}
final Graphics2D graphics = drawContext.getGraphics();
for (int fontSize = drawContext.getHeight(); fontSize > 0; fontSize--) {
graphics.setFont(new Font(Font.SANS_SERIF, Font.BOLD, fontSize));
final FontMetrics fontMetrics = graphics.getFontMetrics();
final Rectangle2D bounds = fontMetrics.getStringBounds(text, graphics);
final double textWidth = bounds.getWidth();
if (textWidth > drawContext.getWidth()) {
continue;
}
final double displayCenterX = drawContext.getWidth() / 2.0;
final double displayCenterY = drawContext.getHeight() / 2.0;
final double textCenterX = (int) bounds.getCenterX();
final double textCenterY = (int) bounds.getCenterY();
final double xPos = displayCenterX - textCenterX;
final double yPos = displayCenterY - textCenterY;
graphics.setTransform(AffineTransform.getTranslateInstance(xPos, yPos));
graphics.setColor(Color.BLUE);
graphics.fill(bounds);
graphics.setColor(Color.WHITE);
graphics.drawString(text, 0, 0);
drawContext.show();
currentVisibleText = text;
break;
}
}
private void updateCurrentEntries() {
final Instant now = Instant.now();
for (final ListIterator<DisplayEntry> iterator = futureEntries.listIterator(); iterator.hasNext();) {
if (iterator.next().getStart().isBefore(now)) {
insertCurrentEntry(iterator.previous());
iterator.remove();
} else {
break;
}
}
for (final ListIterator<DisplayEntry> iterator = currentEntries.listIterator(); iterator.hasNext();) {
if (iterator.next().getEnd().isBefore(now)) {
iterator.remove();
} else {
break;
}
}
}
@Scheduled(fixedDelay = 100)
public synchronized void updateDisplay() {
try {
updateCurrentEntries();
int maxPrio = Integer.MIN_VALUE;
final List<DisplayEntry> enabledEntries = new ArrayList<>();
for (final DisplayEntry displayEntry : currentEntries) {
final int currentPrio = displayEntry.getPriority();
if (maxPrio > currentPrio) {
// skip entry
continue;
}
if (maxPrio < currentPrio) {
maxPrio = currentPrio;
enabledEntries.clear();
}
enabledEntries.add(displayEntry);
}
if (enabledEntries.isEmpty()) {
show("");
} else if (enabledEntries.size() == 1) {
show(enabledEntries.get(0).getString().get());
} else {
final int index = (int) ((Instant.now().getEpochSecond() / rotationTime) % enabledEntries.size());
show(enabledEntries.get(index).getString().get());
}
} catch (final Exception ex) {
log.error("Error", ex);
}
}
}
......@@ -54,11 +54,12 @@ public class RowDisplay implements Display {
@Override
public DrawContext createContext() {
final BufferedImage image = new BufferedImage(WIDTH, HEIGHT, BufferedImage.TYPE_INT_RGB);
final Graphics2D graphics = image.createGraphics();
return new DrawContext() {
@Override
public Graphics2D getGraphics() {
final Graphics2D graphics = image.createGraphics();
graphics.setRenderingHint(RenderingHints.KEY_ANTIALIASING, RenderingHints.VALUE_ANTIALIAS_ON);
return graphics;
}
......@@ -80,7 +81,7 @@ public class RowDisplay implements Display {
final DatagramSocket datagramSocket = new DatagramSocket();
final DatagramPacket packet = new DatagramPacket(data, data.length, address);
datagramSocket.send(packet);
graphics.clearRect(0, 0, WIDTH, HEIGHT);
image.createGraphics().clearRect(0, 0, WIDTH, HEIGHT);
} catch (final IOException e) {
throw new RuntimeException("Cannot show content", e);
}
......@@ -93,7 +94,6 @@ public class RowDisplay implements Display {
final BufferedImage outImage = new BufferedImage(8 * 32, 32, BufferedImage.TYPE_INT_RGB);
final Graphics2D outGraphics = outImage.createGraphics();
outGraphics.drawImage(image, colorOperation, 4 * 32, -32);
// outGraphics.drawImage(image, 4 * 32, -32, null);
outGraphics.transform(AffineTransform.getQuadrantRotateInstance(2, image.getWidth() / 2, image.getHeight() / 2));
outGraphics.drawImage(image, colorOperation, 0, 32);
return outImage;
......
package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
import lombok.Value;
@Value
public class ClockView implements Mutation, View {
private String id;
private Instant begin;
private Duration duration;
private String display;
private int priority;
}
package ch.bergturbenthal.display.test.model;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@JsonTypeInfo(use = Id.MINIMAL_CLASS, include = As.PROPERTY, property = "type")
@JsonSubTypes({ @Type(View.class), @Type(RemoveView.class) })
public interface Mutation {
}
package ch.bergturbenthal.display.test.model;
import lombok.Value;
@Value
public class RemoveView implements Mutation {
private String id;
}
package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
import lombok.Value;
@Value
public class TextView implements Mutation, View {
private String id;
private Instant begin;
private Duration duration;
private String display;
private String text;
private int priority;
}
package ch.bergturbenthal.display.test.model;
import java.time.Duration;
import java.time.Instant;
public interface View {
Instant getBegin();
String getDisplay();
Duration getDuration();
String getId();
int getPriority();
}
package ch.bergturbenthal.display.test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.bergturbenthal.display.test.consumer.DisplayConsumer;
import ch.bergturbenthal.display.test.display.Display;
import ch.bergturbenthal.display.test.display.impl.RowDisplay;
import ch.bergturbenthal.display.test.model.ClockView;
import ch.bergturbenthal.display.test.model.Mutation;
import ch.bergturbenthal.display.test.model.TextView;
import ch.bergturbenthal.display.test.model.View;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@SpringBootApplication
@EnableKafka
@EnableScheduling
@ComponentScan(basePackageClasses = DisplayConsumer.class)
public class Application implements CommandLineRunner {
private static final String TOPIC = "display";
public static void main(final String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, View> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Bean
public Display display() throws UnknownHostException {
return new RowDisplay(new InetSocketAddress(InetAddress.getByName("192.168.1.51"), 1337));
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(),
new JsonDeserializer<>(Mutation.class, objectMapper));
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties properties, final ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), new JsonSerializer<>(objectMapper));
}
// @KafkaListener(topics = TOPIC)
// public void listen(final ConsumerRecord<String, Mutation> cr) throws Exception {
// log.info(cr.toString());
// // latch.countDown();
// }
@Override
public void run(final String... args) throws Exception {
Thread.sleep(20);
if (latch.getCount() > 0) {
template.send(TOPIC, new ClockView(UUID.randomUUID().toString(), Instant.MIN, Duration.between(Instant.MIN, Instant.MAX), "", 1));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(20, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "1. Punkt", 3));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(25, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "noch etwas Wichtiges", 3));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(1, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Rittersaal", "Pause", 2));
this.template.send(TOPIC, new TextView(UUID.randomUUID().toString(), Instant.now().plus(2, ChronoUnit.MINUTES),
Duration.of(1, ChronoUnit.MINUTES), "Garten", "2. Punkt", 3));
log.info("------------------------------------------------------");
log.info("All Sent");
}
latch.await(120, TimeUnit.SECONDS);
Thread.sleep(30000);
log.info("All received");
}
}
\ No newline at end of file
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#spring.kafka.bootstrap-servers=10.0.8.2:9092
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment