Skip to content

Package: AbstractClient$Context

AbstractClient$Context

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Red Hat Inc and others.
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Red Hat Inc - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.client.gateway.spi;
14:
15: import org.eclipse.kapua.client.gateway.Application;
16: import org.eclipse.kapua.client.gateway.Client;
17: import org.eclipse.kapua.client.gateway.ErrorHandler;
18: import org.eclipse.kapua.client.gateway.MessageHandler;
19: import org.eclipse.kapua.client.gateway.Payload;
20: import org.eclipse.kapua.client.gateway.Topic;
21: import org.eclipse.kapua.client.gateway.Transport;
22: import org.eclipse.kapua.client.gateway.spi.util.Futures;
23: import org.eclipse.kapua.client.gateway.spi.util.TransportAsync;
24: import org.eclipse.kapua.client.gateway.spi.util.TransportProxy;
25: import org.slf4j.Logger;
26: import org.slf4j.LoggerFactory;
27:
28: import java.util.Collection;
29: import java.util.HashMap;
30: import java.util.HashSet;
31: import java.util.Map;
32: import java.util.Objects;
33: import java.util.Optional;
34: import java.util.Set;
35: import java.util.concurrent.CompletableFuture;
36: import java.util.concurrent.CompletionStage;
37: import java.util.concurrent.ScheduledExecutorService;
38: import java.util.function.Consumer;
39:
40: public abstract class AbstractClient implements Client {
41:
42: private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
43:
44: private class ContextImpl implements Context {
45:
46: private final String applicationId;
47:
48: private final Set<Topic> subscriptions = new HashSet<>();
49:
50: private final TransportProxy transport;
51:
52: private ContextImpl(String applicationId) {
53: this.applicationId = applicationId;
54: transport = TransportProxy.proxy(AbstractClient.this.transport, executor);
55: }
56:
57: @Override
58: public String getId() {
59: return applicationId;
60: }
61:
62: @Override
63: public CompletionStage<?> publish(final Topic topic, final Payload payload) {
64: return internalPublish(this, applicationId, topic, payload);
65: }
66:
67: @Override
68: public CompletionStage<?> subscribe(Topic topic, MessageHandler messageHandler, ErrorHandler<? extends Throwable> errorHandler) {
69: return internalSubscribe(this, applicationId, topic, messageHandler, errorHandler);
70: }
71:
72: @Override
73: public Transport transport() {
74: return transport;
75: }
76:
77: @Override
78: public void close() {
79: try {
80: internalCloseApplication(this, applicationId);
81: } finally {
82: transport.close();
83: }
84: }
85:
86: protected Set<Topic> getSubscriptions() {
87: return subscriptions;
88: }
89: }
90:
91: public static abstract class Builder<T extends Builder<T>> implements Client.Builder {
92:
93: protected abstract T builder();
94:
95: private final Set<Module> modules = new HashSet<>();
96:
97: public T module(final Module module) {
98: Objects.requireNonNull(module);
99:
100: this.modules.add(module);
101: return builder();
102: }
103:
104: public Set<Module> modules() {
105: return this.modules;
106: }
107: }
108:
109: public interface Context {
110:
111: public String getId();
112:
113: public CompletionStage<?> publish(Topic topic, Payload payload);
114:
115: public CompletionStage<?> subscribe(Topic topic, MessageHandler handler, ErrorHandler<? extends Throwable> errorHandler);
116:
117: public void close();
118:
119: public Transport transport();
120: }
121:
122: protected final ScheduledExecutorService executor;
123: private final Set<Module> modules;
124:
125: private final TransportAsync transport;
126:
127: private final Map<String, Context> applications = new HashMap<>();
128:
129: public AbstractClient(final ScheduledExecutorService executor, final Set<Module> modules) {
130: this.executor = executor;
131: this.modules = new HashSet<>(modules);
132:
133: transport = new TransportAsync(executor);
134: }
135:
136: protected void init() {
137: fireModuleEvent(module -> module.initialize(new ModuleContext() {
138:
139: @Override
140: public Client getClient() {
141: return AbstractClient.this;
142: }
143:
144: @Override
145: public <T> Optional<T> adapt(final Class<T> clazz) {
146: Objects.requireNonNull(clazz);
147:
148: return adaptModuleContext(clazz);
149: }
150: }));
151: }
152:
153: protected abstract CompletionStage<?> handleSubscribe(String applicationId, Topic topic, MessageHandler messageHandler, ErrorHandler<? extends Throwable> errorHandler);
154:
155: protected abstract CompletionStage<?> handlePublish(String applicationId, Topic topic, Payload payload);
156:
157: protected abstract void handleUnsubscribe(String applicationId, Collection<Topic> topics) throws Exception;
158:
159: protected <T> Optional<T> adaptModuleContext(final Class<T> clazz) {
160: return Optional.empty();
161: }
162:
163: @Override
164: public Transport transport() {
165: return transport;
166: }
167:
168: private void fireModuleEvent(final Consumer<Module> consumer) {
169: for (final Module module : modules) {
170: try {
171: consumer.accept(module);
172: } catch (final Exception e) {
173: logger.info("Failed to process module event", e);
174: }
175: }
176: }
177:
178: protected void notifyAddApplication(final String applicationId) {
179: fireModuleEvent(module -> module.applicationAdded(applicationId));
180: }
181:
182: protected void notifyRemoveApplication(final String applicationId) {
183: fireModuleEvent(module -> module.applicationRemoved(applicationId));
184: }
185:
186: protected void notifyConnected() {
187: fireModuleEvent(Module::connected);
188: transport.handleConnected();
189: }
190:
191: protected void notifyDisconnected() {
192: fireModuleEvent(Module::disconnected);
193: transport.handleDisconnected();
194: }
195:
196: protected void handleConnected() {
197: logger.info("Connected");
198:
199: notifyConnected();
200: }
201:
202: protected void handleDisconnected() {
203: logger.info("Disconnected");
204:
205: notifyDisconnected();
206: }
207:
208: @Override
209: public Application.Builder buildApplication(final String applicationId) {
210: return new Application.Builder() {
211:
212: @Override
213: public Application build() {
214: return internalBuildApplication(this, applicationId);
215: }
216: };
217: }
218:
219: protected DefaultApplication internalBuildApplication(final Application.Builder builder, final String applicationId) {
220: synchronized (this) {
221: if (applications.containsKey(applicationId)) {
222: throw new IllegalStateException(String.format("An application with the ID '%s' already exists", applicationId));
223: }
224:
225: final Context context = new ContextImpl(applicationId);
226:
227: final DefaultApplication result = createApplication(builder, context);
228:
229: applications.put(applicationId, context);
230: notifyAddApplication(applicationId);
231:
232: return result;
233: }
234: }
235:
236: protected synchronized CompletionStage<?> internalSubscribe(final ContextImpl context, final String applicationId, final Topic topic, final MessageHandler messageHandler,
237: final ErrorHandler<? extends Throwable> errorHandler) {
238: if (applications.get(applicationId) != context) {
239: return Futures.completedExceptionally(new IllegalStateException(String.format("Application '%s' is already closed", applicationId)));
240: }
241:
242: if (!context.getSubscriptions().add(topic)) {
243: return CompletableFuture.completedFuture(null);
244: }
245:
246: return handleSubscribe(applicationId, topic, messageHandler, errorHandler);
247: }
248:
249: protected synchronized CompletionStage<?> internalPublish(final Context context, final String applicationId, final Topic topic, final Payload payload) {
250: if (applications.get(applicationId) != context) {
251: return Futures.completedExceptionally(new IllegalStateException(String.format("Application '%s' is already closed", applicationId)));
252: }
253:
254: return handlePublish(applicationId, topic, payload);
255: }
256:
257: protected DefaultApplication createApplication(final Application.Builder builder, final AbstractClient.Context context) {
258: return new DefaultApplication(context);
259: }
260:
261: protected synchronized void internalCloseApplication(final ContextImpl context, final String applicationId) {
262: if (applications.remove(applicationId, context)) {
263: try {
264: handleUnsubscribe(applicationId, context.getSubscriptions());
265: } catch (Exception e) {
266: logger.warn("Failed to unsubscribe on application close", e);
267: }
268: notifyRemoveApplication(applicationId);
269: }
270: }
271: }