Skip to content

Package: Transport$ListenerHandle

Transport$ListenerHandle

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2020 Red Hat Inc and others.
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Red Hat Inc - initial API and implementation
11: *******************************************************************************/
12: package org.eclipse.kapua.client.gateway;
13:
14: import java.time.Duration;
15: import java.util.Objects;
16: import java.util.concurrent.Semaphore;
17: import java.util.concurrent.TimeUnit;
18: import java.util.function.Consumer;
19:
20: /**
21: * A control interface on the underlying client transport
22: * <p>
23: * <b>Note:</b> There is only one set of transport events available for the client.
24: * Setting a new set of transport state listeners will clear the previously set listeners.
25: * </p>
26: */
27: public interface Transport {
28:
29: public interface TransportEvents {
30:
31: public void connected(Runnable runnable);
32:
33: public void disconnected(Runnable runnable);
34: }
35:
36: @FunctionalInterface
37: public interface ListenerHandle extends AutoCloseable {
38:
39: @Override
40: public void close();
41: }
42:
43: @FunctionalInterface
44: public interface Listener {
45:
46: public void stateChange(boolean state);
47: }
48:
49: /**
50: * Add a state listener
51: *
52: * <p>
53: * The listener will be called immediately after setting with the
54: * last known state.
55: * </p>
56: *
57: * @param listener
58: * the listener to transport state changes
59: */
60: public ListenerHandle listen(Listener listener);
61:
62: /**
63: * This method allows to atomically set a state listener using simple runnable.
64: *
65: * <p>
66: * This method is intended to be used with Java lambdas where each state change
67: * (connected, disconnected) is mapped to one lambda. However, as the state change
68: * will be initially reported it might happen that the state actually changes between
69: * setting the connect and disconnect handler. This way there would be no way to properly
70: * report the initial state.
71: * </p>
72: * <p>
73: * Setting the event handlers using this methods works by updating
74: * the provided {@link TransportEvents} fields inside the provided consumer. The
75: * consumer will only be called once inside this method. The event listeners will
76: * then be set atomically.
77: * </p>
78: *
79: * <pre>
80: * client.transport().events( events {@code ->} {
81: * events.connected ( () {@code ->} System.out.println ("Connected") );
82: * events.disconnected ( () {@code ->} System.out.println ("Disconnected") );
83: * });
84: * </pre>
85: *
86: * @param events
87: * code to update the {@link TransportEvents}
88: * @return
89: *
90: */
91: public default ListenerHandle events(final Consumer<TransportEvents> events) {
92: class TransportEventsImpl implements TransportEvents {
93:
94: private Runnable connected;
95: private Runnable disconnected;
96:
97: @Override
98: public void connected(final Runnable runnable) {
99: connected = runnable;
100: }
101:
102: @Override
103: public void disconnected(final Runnable runnable) {
104: disconnected = runnable;
105: }
106:
107: }
108:
109: final TransportEventsImpl impl = new TransportEventsImpl();
110:
111: events.accept(impl);
112:
113: return listen(state -> {
114: if (state) {
115: if (impl.connected != null) {
116: impl.connected.run();
117: }
118: } else {
119: if (impl.disconnected != null) {
120: impl.disconnected.run();
121: }
122: }
123: });
124: }
125:
126: /**
127: * Wait for the connection to be established
128: * <p>
129: * <b>Note:</b> This method will reset the transport listeners.
130: * </p>
131: *
132: * @param transport
133: * to wait on
134: * @throws InterruptedException
135: * if the wait got interrupted
136: */
137: public static void waitForConnection(final Transport transport) throws InterruptedException {
138: Objects.requireNonNull(transport);
139:
140: final Semaphore sem = new Semaphore(0);
141:
142: try (ListenerHandle handle = transport.listen(state -> {
143: if (state) {
144: sem.release();
145: }
146: })) {
147: sem.acquire();
148: }
149: }
150:
151: /**
152: * Wait for the connection to be established or the timeout occurs
153: * <p>
154: * <b>Note:</b> This method will reset the transport listeners.
155: * </p>
156: *
157: * @param transport
158: * to wait on
159: * @param timeout
160: * the timeout
161: * @throws InterruptedException
162: * if the wait got interrupted
163: */
164: public static boolean waitForConnection(final Transport transport, final Duration timeout) throws InterruptedException {
165: Objects.requireNonNull(transport);
166: Objects.requireNonNull(timeout);
167:
168: final Semaphore sem = new Semaphore(0);
169:
170: try (ListenerHandle handle = transport.listen(state -> {
171: if (state) {
172: sem.release();
173: }
174: })) {
175: return sem.tryAcquire(timeout.toNanos(), TimeUnit.NANOSECONDS);
176: }
177:
178: }
179: }