1 /*
2 * Portions of this file Copyright 1999-2005 University of Chicago
3 * Portions of this file Copyright 1999-2005 The University of Southern California.
4 *
5 * This file or a portion of this file is licensed under the
6 * terms of the Globus Toolkit Public License, found at
7 * http://www.globus.org/toolkit/download/license.html.
8 * If you redistribute this file, with or without
9 * modifications, you must include this notice in the file.
10 */
11
12 #include "globus_common.h"
13 #include "globus_wsrf_resource.h"
14 #include "globus_service_engine.h"
15 #include "globus_service_registry.h"
16 #include "globus_wsrf_core_tools.h"
17 #include "wsnt_NotifyType.h"
18 #include "globus_i_notification_consumer.h"
19 #include "NotificationConsumerService.h"
20 #include "globus_xml_buffer.h"
21 #include "wsnt_NotificationMessageHolderType.h"
22
23 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
24 /**
25 * Topic expressions we know to be string valued. Anything else we
26 * leave alone.
27 */
28 /* @{ */
29 #define SIMPLE_TOPIC_EXPRESSION \
30 "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple"
31 #define CONCRETE_TOPIC_EXPRESSION \
32 "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Concrete"
33 #define FULL_TOPIC_EXPRESSION \
34 "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Full"
35 /* @} */
36
37 static
38 globus_result_t
39 globus_i_notification_consumer_service_deserialize_topic(
40 wsnt_TopicExpressionType * topic);
41
42 /* Implementation of ConsumerService skeleton functions */
43 extern
44 globus_result_t
45 NotificationConsumerService_init(
46 globus_service_descriptor_t * service_desc)
47 1 {
48 1 globus_module_activate(GLOBUS_NOTIFICATION_CONSUMER_MODULE);
49
50 1 return GLOBUS_SUCCESS;
51 }
52 /* NotificationConsumerService_init() */
53
54 extern
55 globus_result_t
56 NotificationConsumerService_finalize(
57 globus_service_descriptor_t * service_desc)
58 0 {
59 0 globus_module_deactivate(GLOBUS_NOTIFICATION_CONSUMER_MODULE);
60
61 0 return GLOBUS_SUCCESS;
62 }
63 /* NotificationConsumerService_finalize() */
64
65 globus_result_t
66 Consumer_Notify_init(
67 globus_service_engine_t engine,
68 globus_soap_message_handle_t message,
69 wsnt_NotifyType * Notify)
70 10 {
71 /* NOOP */
72 10 return GLOBUS_SUCCESS;
73 }
74 /* Consumer_Notify_init() */
75
76 globus_result_t
77 Consumer_Notify_impl(
78 globus_service_engine_t engine,
79 globus_soap_message_handle_t message,
80 globus_service_descriptor_t * service,
81 wsnt_NotifyType * Notify)
82 10 {
83 globus_result_t result;
84 globus_resource_t resource;
85 globus_i_notification_consumer_t * consumer;
86 int i;
87
88 10 globus_mutex_lock(&globus_i_notification_lock);
89
90 10 result = globus_wsrf_core_get_resource(message, service, &resource);
91
92 10 if (result != GLOBUS_SUCCESS || resource == NULL)
93 {
94 0 result = GLOBUS_NOTIFICATION_CONSUMER_ERROR_UNKNOWN_RESOURCE(
95 result ? globus_error_get(result) : NULL);
96
97 0 goto unlock_out;
98 }
99
100 10 result = globus_resource_get_resource_specific(
101 resource,
102 (void **)&consumer);
103
104 10 if (result != GLOBUS_SUCCESS || consumer == NULL)
105 {
106 0 result = GLOBUS_NOTIFICATION_CONSUMER_ERROR_UNKNOWN_RESOURCE(
107 result ? globus_error_get(result) : NULL);
108
109 0 goto finish_out;
110 }
111
112 20 for (i = 0; i < Notify->NotificationMessage.length; i++)
113 {
114 10 globus_bool_t called = GLOBUS_FALSE;
115 wsnt_NotificationMessageHolderType *
116 10 input = &Notify->NotificationMessage.elements[i];
117
118 10 result = globus_i_notification_consumer_service_deserialize_topic(
119 &input->Topic);
120
121 10 if (result != GLOBUS_SUCCESS)
122 {
123 continue;
124 }
125
126
127 20 while (!called)
128 {
129 10 switch (consumer->state)
130 {
131 case CONSUMER_OK:
132 10 consumer->state = CONSUMER_ACTIVE;
133 10 globus_mutex_unlock(&globus_i_notification_lock);
134
135
136 10 consumer->callback(
137 consumer->arg,
138 &input->Topic,
139 input->ProducerReference,
140 &input->Message);
141
142 10 globus_mutex_lock(&globus_i_notification_lock);
143
144 10 if (consumer->state == CONSUMER_ACTIVE)
145 {
146 10 consumer->state = CONSUMER_OK;
147 }
148 0 else if (consumer->state == CONSUMER_ACTIVE_DESTROY_CALLED)
149 {
150 0 consumer->state = CONSUMER_DESTROYED;
151 }
152 10 called = GLOBUS_TRUE;
153 10 globus_cond_broadcast(&globus_i_notification_cond);
154
155 10 break;
156 case CONSUMER_ACTIVE:
157 0 globus_cond_wait(
158 &globus_i_notification_cond,
159 &globus_i_notification_lock);
160 0 break;
161 case CONSUMER_ACTIVE_DESTROY_CALLED:
162 case CONSUMER_DESTROYED:
163 /* Can't call it now */
164 0 called = GLOBUS_TRUE;
165 break;
166 }
167 }
168 }
169
170 10 finish_out:
171 10 result = globus_resource_finish(resource);
172
173 10 unlock_out:
174 10 globus_mutex_unlock(&globus_i_notification_lock);
175 10 return result;
176 }
177 /* Consumer_Notify_impl() */
178
179 /**
180 * Deserialize the topic expression into an appropriate xsd type if the
181 * dialect is known to us. Otherwise, leave it alone.
182 *
183 * @param topic
184 * TopicExpression which will be potentially modified with a new
185 * value of any if the Dialect attribute refers to one that this
186 * function knows of.
187 */
188 static
189 globus_result_t
190 globus_i_notification_consumer_service_deserialize_topic(
191 wsnt_TopicExpressionType * topic)
192 10 {
193 10 xsd_string * topic_string = NULL;
194 globus_xml_buffer * buffer;
195 10 globus_result_t result = GLOBUS_SUCCESS;
196 globus_soap_message_handle_t message_handle;
197
198 /* Deserialize the notification topic to a string if it is one of the
199 * known string-based topic dialects.
200 */
201 10 if (topic->_Dialect &&
202 topic->any && topic->any->any_info == &globus_xml_buffer_info &&
203 (strcmp(*topic->_Dialect, SIMPLE_TOPIC_EXPRESSION) == 0 ||
204 strcmp(*topic->_Dialect, CONCRETE_TOPIC_EXPRESSION) == 0 ||
205 strcmp(*topic->_Dialect, FULL_TOPIC_EXPRESSION) == 0))
206 {
207 0 buffer = topic->any->value;
208
209 0 result = globus_soap_message_handle_init_from_memory(
210 &message_handle,
211 buffer->buffer,
212 buffer->length);
213
214 0 if (result != GLOBUS_SUCCESS)
215 {
216 0 goto out;
217 }
218
219 0 result = xsd_string_init(&topic_string);
220
221 0 if (result != GLOBUS_SUCCESS || topic_string == NULL)
222 {
223 goto destroy_handle_out;
224 }
225
226 0 result = xsd_string_deserialize_contents(
227 NULL,
228 topic_string,
229 message_handle,
230 0);
231
232 0 if (result != GLOBUS_SUCCESS)
233 {
234 0 goto destroy_topic_out;
235 }
236
237 0 globus_xml_buffer_destroy(buffer);
238
239 0 topic->any->any_info = &xsd_string_contents_info;
240 0 topic->any->value = topic_string;
241
242 0 topic_string = NULL;
243 }
244 else
245 {
246 /* Don't know this dialect, leave it as an xml buffer */
247 goto out;
248 }
249
250
251 0 destroy_topic_out:
252 0 if (topic_string != NULL)
253 {
254 0 xsd_string_destroy(topic_string);
255 }
256 0 destroy_handle_out:
257 0 globus_soap_message_handle_destroy(message_handle);
258 10 out:
259 10 if (result != GLOBUS_SUCCESS)
260 {
261 0 return GLOBUS_NOTIFICATION_CONSUMER_ERROR_DESERIALIZING_TOPIC(
262 globus_error_get(result));
263 }
264 else
265 {
266 10 return result;
267 }
268 }
269 /* globus_i_notification_consumer_service_deserialize_topic() */