1 /*
2  * Portions of this file Copyright 1999-2005 University of Chicago
3  * Portions of this file Copyright 1999-2005 The University of Southern California.
4  *
5  * This file or a portion of this file is licensed under the
6  * terms of the Globus Toolkit Public License, found at
7  * http://www.globus.org/toolkit/download/license.html.
8  * If you redistribute this file, with or without
9  * modifications, you must include this notice in the file.
10  */
11
12 #include "globus_common.h"
13 #include "globus_wsrf_resource.h"
14 #include "globus_service_engine.h"
15 #include "globus_service_registry.h"
16 #include "globus_wsrf_core_tools.h"
17 #include "wsnt_NotifyType.h"
18 #include "globus_i_notification_consumer.h"
19 #include "NotificationConsumerService.h"
20 #include "globus_xml_buffer.h"
21 #include "wsnt_NotificationMessageHolderType.h"
22
23 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
24 /**
25  * Topic expressions we know to be string valued. Anything else we
26  * leave alone.
27  */
28 /* @{ */
29 #define SIMPLE_TOPIC_EXPRESSION \
30     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple"
31 #define CONCRETE_TOPIC_EXPRESSION \
32     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Concrete"
33 #define FULL_TOPIC_EXPRESSION \
34     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Full"
35 /* @} */
36
37 static
38 globus_result_t
39 globus_i_notification_consumer_service_deserialize_topic(
40     wsnt_TopicExpressionType *          topic);
41
42 /* Implementation of ConsumerService skeleton functions */
43 extern
44 globus_result_t
45 NotificationConsumerService_init(
46     globus_service_descriptor_t *       service_desc)
47 0 {
48 0     globus_module_activate(GLOBUS_NOTIFICATION_CONSUMER_MODULE);
49
50 0     return GLOBUS_SUCCESS;
51 }
52 /* NotificationConsumerService_init() */
53
54 extern
55 globus_result_t
56 NotificationConsumerService_finalize(
57     globus_service_descriptor_t *       service_desc)
58 0 {
59 0     globus_module_deactivate(GLOBUS_NOTIFICATION_CONSUMER_MODULE);
60
61 0     return GLOBUS_SUCCESS;
62 }
63 /* NotificationConsumerService_finalize() */
64
65 globus_result_t
66 Consumer_Notify_init(
67     globus_service_engine_t             engine,
68     globus_soap_message_handle_t        message,
69     wsnt_NotifyType *                   Notify)
70 0 {
71     /* NOOP */
72 0     return GLOBUS_SUCCESS;
73 }
74 /* Consumer_Notify_init() */
75
76 globus_result_t
77 Consumer_Notify_impl(
78     globus_service_engine_t             engine,
79     globus_soap_message_handle_t        message,
80     globus_service_descriptor_t *       service,
81     wsnt_NotifyType *                   Notify)
82 0 {
83 0     globus_result_t                     result;
84 0     globus_resource_t                   resource;
85 0     globus_i_notification_consumer_t *  consumer;
86 0     int                                 i;
87
88 0     globus_mutex_lock(&globus_i_notification_lock);
89
90 0     result = globus_wsrf_core_get_resource(message, service, &resource);
91
92 0     if (result != GLOBUS_SUCCESS || resource == NULL)
93     {
94 0         result = GLOBUS_NOTIFICATION_CONSUMER_ERROR_UNKNOWN_RESOURCE(
95             result ? globus_error_get(result) : NULL);
96
97 0         goto unlock_out;
98     }
99
100 0     result = globus_resource_get_resource_specific(
101             resource,
102             (void **)&consumer);
103
104 0     if (result != GLOBUS_SUCCESS || consumer == NULL)
105     {
106 0         result = GLOBUS_NOTIFICATION_CONSUMER_ERROR_UNKNOWN_RESOURCE(
107             result ? globus_error_get(result) : NULL);
108
109 0         goto finish_out;
110     }
111
112 0     for (i = 0; i < Notify->NotificationMessage.length; i++)
113     {
114 0         globus_bool_t                   called = GLOBUS_FALSE;
115         wsnt_NotificationMessageHolderType *
116 0                 input = &Notify->NotificationMessage.elements[i];
117
118 0         result = globus_i_notification_consumer_service_deserialize_topic(
119                             &input->Topic);
120
121 0         if (result != GLOBUS_SUCCESS)
122         {
123 0             continue;
124         }
125
126
127 0         while (!called)
128         {
129 0             switch (consumer->state)
130             {
131                 case CONSUMER_OK:
132 0                     consumer->state = CONSUMER_ACTIVE;
133 0                     globus_mutex_unlock(&globus_i_notification_lock);
134
135
136 0                     consumer->callback(
137                             consumer->arg,
138                             &input->Topic,
139                             input->ProducerReference,
140                             &input->Message);
141
142 0                     globus_mutex_lock(&globus_i_notification_lock);
143
144 0                     if (consumer->state == CONSUMER_ACTIVE)
145                     {
146 0                         consumer->state = CONSUMER_OK;
147                     }
148 0                     else if (consumer->state == CONSUMER_ACTIVE_DESTROY_CALLED)
149                     {
150 0                         consumer->state = CONSUMER_DESTROYED;
151                     }
152 0                     called = GLOBUS_TRUE;
153 0                     globus_cond_broadcast(&globus_i_notification_cond);
154
155 0                     break;
156                 case CONSUMER_ACTIVE:
157 0                     globus_cond_wait(
158                             &globus_i_notification_cond,
159                             &globus_i_notification_lock);
160 0                     break;
161                 case CONSUMER_ACTIVE_DESTROY_CALLED:
162                 case CONSUMER_DESTROYED:
163                     /* Can't call it now */
164 0                     called = GLOBUS_TRUE;
165 0                     break;
166             }
167         }
168     }
169
170 finish_out:
171 0     result = globus_resource_finish(resource);
172
173 unlock_out:
174 0     globus_mutex_unlock(&globus_i_notification_lock);
175 0     return result;
176 }
177 /* Consumer_Notify_impl() */
178
179 /**
180  * Deserialize the topic expression into an appropriate xsd type if the
181  * dialect is known to us. Otherwise, leave it alone.
182  *
183  * @param topic
184  *     TopicExpression which will be potentially modified with a new
185  *     value of any if the Dialect attribute refers to one that this
186  *     function knows of.
187  */
188 static
189 globus_result_t
190 globus_i_notification_consumer_service_deserialize_topic(
191     wsnt_TopicExpressionType *          topic)
192 0 {
193 0     xsd_string *                        topic_string = NULL;
194 0     globus_xml_buffer *                 buffer;
195 0     globus_result_t                     result = GLOBUS_SUCCESS;
196 0     globus_soap_message_handle_t        message_handle;
197
198     /* Deserialize the notification topic to a string if it is one of the
199      * known string-based topic dialects.
200      */
201 0     if (topic->_Dialect &&
202         topic->any && topic->any->any_info == &globus_xml_buffer_info &&
203         (strcmp(*topic->_Dialect, SIMPLE_TOPIC_EXPRESSION) == 0 ||
204         strcmp(*topic->_Dialect, CONCRETE_TOPIC_EXPRESSION) == 0 ||
205         strcmp(*topic->_Dialect, FULL_TOPIC_EXPRESSION) == 0))
206     {
207 0         buffer = topic->any->value;
208
209 0         result = globus_soap_message_handle_init_from_memory(
210             &message_handle,
211             buffer->buffer,
212             buffer->length);
213
214 0         if (result != GLOBUS_SUCCESS)
215         {
216 0             goto out;
217         }
218
219 0         result = xsd_string_init(&topic_string);
220
221 0         if (result != GLOBUS_SUCCESS || topic_string == NULL)
222         {
223 0             goto destroy_handle_out;
224         }
225
226 0         result = xsd_string_deserialize_contents(
227             NULL,
228             topic_string,
229             message_handle,
230             0);
231
232 0         if (result != GLOBUS_SUCCESS)
233         {
234 0             goto destroy_topic_out;
235         }
236
237 0         globus_xml_buffer_destroy(buffer);
238
239 0         topic->any->any_info = &xsd_string_contents_info;
240 0         topic->any->value = topic_string;
241
242 0         topic_string = NULL;
243     }
244     else
245     {
246         /* Don't know this dialect, leave it as an xml buffer */
247 0         goto out;
248     }
249
250
251 destroy_topic_out:
252 0     if (topic_string != NULL)
253     {
254 0         xsd_string_destroy(topic_string);
255     }
256 destroy_handle_out:
257 0     globus_soap_message_handle_destroy(message_handle);
258 out:
259 0     if (result != GLOBUS_SUCCESS)
260     {
261 0         return GLOBUS_NOTIFICATION_CONSUMER_ERROR_DESERIALIZING_TOPIC(
262                 globus_error_get(result));
263     }
264     else
265     {
266 0         return result;
267     }
268 }
269 /* globus_i_notification_consumer_service_deserialize_topic() */