1 /*
2  * Copyright 1999-2006 University of Chicago
3  * 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  * 
8  * http://www.apache.org/licenses/LICENSE-2.0
9  * 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "globus_i_notification_producer.h"
18 #include "globus_wsrf_core_tools.h"
19 #include "globus_xml_buffer.h"
20 #include "libxml/uri.h"
21 #include "NotificationConsumerService_client.h"
22 #include "wsnt_NotificationMessageHolderType.h"
23 #include "wstop_TopicSpaceType.h"
24 #include "wstop_TopicType.h"
25
26 #include "version.h"
27
28 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
29
30 /* Local Prototypes */
31 0 GlobusDebugDefine(GLOBUS_NOTIFICATION_PRODUCER);
32
33 static
34 int
35 globus_l_notification_producer_activate(void);
36
37 static
38 int
39 globus_l_notification_producer_deactivate(void);
40
41 static
42 void
43 globus_l_notification_activate_subscription_manager(void);
44
45 static
46 int
47 globus_l_notification_epr_keyequal(
48     void *                              epr1,
49     void *                              epr2);
50
51 static
52 int
53 globus_l_notification_epr_hash(
54     void *                              key,
55     int                                 limit);
56
57 static
58 globus_result_t
59 globus_l_notification_epr_get_reference_properties(
60     const wsa_EndpointReferenceType *       epr,
61     char **                                 reference_properties);
62
63 static
64 globus_result_t
65 globus_l_notification_producer_find_subscription(
66     const wsa_EndpointReferenceType *   subscription_epr,
67     globus_i_notification_producer_t ** producer,
68     globus_resource_t *                 subscription);
69
70
71 static
72 globus_result_t
73 globus_l_notification_subscription_create(
74     const globus_notification_producer_t
75                                         producer,
76     const globus_service_engine_t       engine,
77     const wsnt_SubscribeType *          Subscribe,
78     wsa_EndpointReferenceType *         epr,
79     globus_resource_t *                 resource);
80
81 static
82 void
83 globus_l_notification_subscription_free(
84     void *                              arg);
85
86 static
87 void
88 globus_l_notify_done(
89     NotificationConsumerService_client_handle_t 
90                                         handle,
91     void *                              callback_arg,
92     globus_result_t                     result);
93
94 static
95 void
96 globus_l_notification_callback_kickout(
97     void *                              user_arg);
98
99 static
100 time_t
101 globus_l_notification_timegm(
102     const struct tm *                   tm);
103
104 /* Local Variables */
105
106 #define WSN_NS "http://docs.oasis-open.org" \
107                "/wsn/2004/06/wsn-WS-BaseNotification-1.2-draft-01.xsd"
108
109 #define SUBSCRIPTIONMANAGERSERVICE_BASE_PATH \
110                "wsrf/services/SubscriptionManagerService" 
111
112 #define SIMPLE_TOPIC_EXPRESSION \
113     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple"
114
115 #define CONCRETE_TOPIC_EXPRESSION \
116     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Concrete"
117
118 #define FULL_TOPIC_EXPRESSION \
119     "http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Full"
120
121 globus_mutex_t
122 globus_i_notification_producer_lock;
123
124 globus_hashtable_t
125 globus_i_notification_subscriptions;
126
127 globus_hashtable_t
128 globus_i_notification_producers;
129
130 static
131 globus_thread_once_t
132 globus_l_subcription_manager_activate_once = GLOBUS_THREAD_ONCE_INIT;
133
134 static
135 globus_service_descriptor_t *
136 globus_l_notification_SubscriptionManager_service_descriptor;
137
138 static
139 globus_extension_handle_t
140 globus_l_notification_SubscriptionManager_handle;
141
142 globus_module_descriptor_t
143 globus_i_notification_producer_module =
144 {
145     "globus_notification_producer",
146     globus_l_notification_producer_activate,
147     globus_l_notification_producer_deactivate,
148     NULL,
149     NULL,
150     &local_version,
151     NULL
152 };
153
154 #endif /* GLOBUS_DONT_DOCUMENT_INTERNAL */
155
156 /**
157  * Create new notification producer state.
158  * @ingroup producer
159  *
160  * This function creates new notification producer state, addressed by the
161  * given @a resource_id string. This state may be manipulated to create new
162  * subscriptions and topics.
163  *
164  * @param resource_id
165  *     Unique resource string which is used to identify this producer. This
166  *     is typically related to the reference properties for a resource. This
167  *     string value is required to identify the state in further notification
168  *     calls.
169  * @param callback
170  *     Callback function to be called whenever a topic is created,
171  *     changed or destroyed.
172  * @param callback_arg
173  *     Application-specific parameter to the callback function.
174  *
175  * @retval GLOBUS_SUCCESS
176  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
177  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
178  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_ALREADY_EXISTS
179  */
180 globus_result_t
181 globus_notification_producer_create(
182     const char *                        resource_id,
183     globus_notification_producer_topic_callback_t
184                                         callback,
185     void *                              callback_arg)
186 {
187 97     globus_result_t                     result = GLOBUS_SUCCESS;
188 97     int                                 rc;
189 97     globus_notification_producer_t      producer;
190
191 97     if (resource_id == NULL)
192     {
193 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
194
195 0         goto out;
196     }
197 97     GlobusNotificationProducerGlobalLock();
198
199 97     producer = globus_hashtable_lookup(
200             &globus_i_notification_producers,
201             (void *) resource_id);
202
203 97     if (producer != NULL)
204     {
205         /* Producer already exists. We'll just set the callback/callback_arg
206          * in it and be done
207          */
208 4         GlobusNotificationProducerLock(producer);
209 4         if (producer->callback == NULL && callback != NULL)
210         {
211 4             producer->callback = callback;
212 4             producer->callback_arg = callback_arg;
213
214 4             producer->callback(
215                     producer->callback_arg,
216                     producer->resource_id,
217                     GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CREATED,
218                     NULL);
219         }
220         else
221         {
222 0             result = GLOBUS_NOTIFICATION_PRODUCER_ALREADY_EXISTS();
223         }
224 4         GlobusNotificationProducerUnlock(producer);
225 4         GlobusNotificationProducerGlobalUnlock();
226
227 4         goto out;
228     }
229
230 93     producer = calloc(1, sizeof(globus_i_notification_producer_t));
231
232 93     if (producer == NULL)
233     {
234 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
235
236 0         goto out;
237     }
238
239 93     producer->callback = callback;
240 93     producer->callback_arg = callback_arg;
241
242 93     producer->resource_id = globus_libc_strdup(resource_id);
243 93     if (producer->resource_id == NULL)
244     {
245 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
246
247 0         goto free_producer;
248     }
249
250 93     result = globus_i_notification_topic_set_init(&producer->topic_set);
251
252 93     rc = globus_mutex_init(&producer->lock, NULL);
253 93     if (rc != GLOBUS_SUCCESS)
254     {
255 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
256
257 0         goto destroy_topic_set_out;
258     }
259
260 93     rc = globus_hashtable_insert(
261             &globus_i_notification_producers,
262             producer->resource_id,
263             producer);
264 93     if (rc != GLOBUS_SUCCESS)
265     {
266 0         result = GLOBUS_NOTIFICATION_PRODUCER_ALREADY_EXISTS();
267
268 0         goto unlock_out;
269     }
270
271 93     GlobusNotificationProducerGlobalUnlock();
272
273 93     return result;
274
275 unlock_out:
276 0     GlobusNotificationProducerGlobalUnlock();
277 destroy_topic_set_out:
278 0     globus_i_notification_topic_set_destroy(producer->topic_set);
279 0     free(producer->resource_id);
280 free_producer:
281 0     free(producer);
282 out:
283 4     return result;
284 }
285 /* globus_notification_producer_create() */
286
287 /**
288  * Destroy notification producer state
289  * @ingroup producer
290  *
291  * This function destroys all producer state associated with a resource. All
292  * topics and subscriptions related to this producer are destroyed as well.
293  *
294  * @param resource_id
295  *     Unique resource string which is used to identify this producer. This
296  *     is typically related to the reference properties for a resource.
297  *
298  * @retval GLOBUS_SUCCESS
299  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
300  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
301  */
302 globus_result_t
303 globus_notification_producer_destroy(
304     const char *                        resource_id)
305 0 {
306 0     globus_notification_producer_t      producer;
307 0     globus_result_t                     result = GLOBUS_SUCCESS;
308
309 0     if (resource_id == NULL)
310     {
311 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
312
313 0         goto out;
314     }
315
316 0     GlobusNotificationProducerGlobalLock();
317
318 0     producer = globus_hashtable_remove(
319             &globus_i_notification_producers,
320             (void *) resource_id);
321
322 0     if (producer == NULL)
323     {
324 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
325
326 0         goto unlock_error;
327     }
328
329 0     GlobusNotificationProducerLock(producer);
330 0     globus_i_notification_topic_set_destroy(producer->topic_set);
331 0     GlobusNotificationProducerUnlock(producer);
332
333 0     globus_mutex_destroy(&producer->lock);
334 0     GlobusNotificationProducerGlobalUnlock();
335
336 0     free(producer->resource_id);
337
338 0     free(producer);
339
340 0     return GLOBUS_SUCCESS;
341
342 unlock_error:
343 0     GlobusNotificationProducerGlobalUnlock();
344 out:
345 0     return result;
346 }
347 /* globus_notification_producer_destroy() */
348
349 /**
350  * Create a Notification Topic
351  * @ingroup notification_topics
352  * Adds a new topic to the TopicSet for a particular notification producer
353  * resource. This new topic may be subscribed to by calling
354  * globus_notification_producer_subscribe(), and
355  * change notifications may be sent to subscribers by calling
356  * globus_notification_producer_topic_changed().
357  *
358  * @param resource_id
359  *     Resource id associated with notification producer state.
360  * @param topic_expression
361  *     An expression indicating the name (or path) of this topic. Currently
362  *     only simple topic expressions are supported
363  *     (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
364  * 
365  * @retval GLOBUS_SUCCESS
366  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
367  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_DIALECT
368  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
369  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
370  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_ALREADY_EXISTS
371  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_INVALID_TOPIC
372  */
373 globus_result_t
374 globus_notification_producer_topic_create(
375     const char *                        resource_id,
376     const wsnt_TopicExpressionType *    topic_expression)
377 650 {
378 650     globus_notification_producer_t      producer;
379 650     globus_result_t                     result = GLOBUS_SUCCESS;
380
381 650     if (resource_id == NULL || topic_expression == NULL)
382     {
383 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
384
385 0         goto out;
386     }
387
388 650     if (strcmp(*topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
389         strcmp(*topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
390     {
391 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_DIALECT();
392
393 0         goto out;
394     }
395
396 650     GlobusNotificationProducerGlobalLock();
397
398 650     producer = globus_hashtable_lookup(
399             &globus_i_notification_producers,
400             (void *) resource_id);
401
402 650     if (producer)
403     {
404 650         GlobusNotificationProducerLock(producer);
405 650         GlobusNotificationProducerGlobalUnlock();
406     }
407     else
408     {
409 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
410
411 0         GlobusNotificationProducerGlobalUnlock();
412 0         goto out;
413     }
414
415 650     result = globus_i_notification_topic_set_add(
416             producer->topic_set,
417             topic_expression);
418
419 650     if (result != GLOBUS_SUCCESS)
420     {
421 0         goto unlock_producer_out;
422     }
423
424 650     if (producer->callback)
425     {
426 642         producer->callback(
427                 producer->callback_arg,
428                 producer->resource_id,
429                 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CREATED,
430                 topic_expression);
431     }
432
433 unlock_producer_out:
434 650     GlobusNotificationProducerUnlock(producer);
435
436 out:
437 650     return result;
438 }
439 /* globus_notification_producer_topic_create() */
440
441
442 /**
443  * Destroy a Notification Topic
444  * @ingroup notification_topics
445  * Destroys all information related to a previously existing topic associated
446  * with a notification producer. The topic and all of the related susbsription
447  * resources are removed from the service.
448  *
449  * @param resource_id
450  *     Resource id associated with notification producer state.
451  * @param topic_expression
452  *     An expression indicating the name (or path) of this topic. Currently
453  *     only simple topic expressions are supported
454  *     (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
455  *
456  * @retval GLOBUS_SUCCESS
457  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
458  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_DIALECT
459  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
460  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
461  */
462 globus_result_t
463 globus_notification_producer_topic_destroy(
464     const char *                        resource_id,
465     const wsnt_TopicExpressionType *    topic_expression)
466 1 {
467 1     globus_result_t                     result = GLOBUS_SUCCESS;
468 1     globus_notification_producer_t      producer;
469
470 1     if (resource_id == NULL || topic_expression == NULL)
471     {
472 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
473
474 0         goto out;
475     }
476 1     if (strcmp(*topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
477         strcmp(*topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
478     {
479 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_DIALECT();
480
481 0         goto out;
482     }
483
484 1     GlobusNotificationProducerGlobalLock();
485
486 1     producer = globus_hashtable_lookup(
487             &globus_i_notification_producers,
488             (void *) resource_id);
489 1     if (producer)
490     {
491 1         GlobusNotificationProducerLock(producer);
492     }
493     else
494     {
495 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
496 0         GlobusNotificationProducerGlobalUnlock();
497
498 0         goto out;
499     }
500
501 1     result = globus_i_notification_topic_set_remove(
502             producer->topic_set,
503             topic_expression);
504
505 1     if (result != GLOBUS_SUCCESS)
506     {
507 0         goto unlock_out;
508     }
509 1     if (producer->callback)
510     {
511 1         producer->callback(
512                 producer->callback_arg,
513                 producer->resource_id,
514                 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_DESTROYED,
515                 topic_expression);
516     }
517
518 unlock_out:
519 1     GlobusNotificationProducerGlobalUnlock();
520 1     GlobusNotificationProducerUnlock(producer);
521 out:
522 1     return result;
523 }
524 /* globus_notification_producer_topic_destroy() */
525
526 /**
527  * Subscribe to a topic handled by a notification producer.
528  * @ingroup subscription
529  *
530  * @param resource_id
531  *     Resource id associated with notification producer state.
532  * @param engine
533  *     A pointer to the WS engine which will serve requests for operations
534  *     on the resulting NotificationManager subscription
535  * @param Subscribe
536  *     Parameters related to the subscription. These are generally passed
537  *     as parameters to the WSN Subscribe operation, but may be generated
538  *     programmatically by a service. Currently only the @a ConsumerReference,
539  *     @a TopicExpression, and @a UseNotify elements are handled. All others
540  *     are ignored by this implementation.
541  * @param SubscribeResponse
542  *     Pointer to a response structure which will be populated by this
543  *     implementation. The contents of this structure will be initialized and
544  *     set if the subscription is created successfully.
545  *
546  * @retval GLOBUS_SUCCESS
547  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
548  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
549  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_UNKNOWN_TOPIC
550  */
551 globus_result_t
552 globus_notification_producer_subscribe(
553     const char *                        resource_id,
554     const globus_service_engine_t       engine,
555     const wsnt_SubscribeType *          Subscribe,
556     wsnt_SubscribeResponseType *        SubscribeResponse)
557 11 {
558 11     globus_resource_t                   subscription_resource;
559 11     globus_result_t                     result = GLOBUS_SUCCESS;
560 11     globus_notification_producer_t      producer;
561 11     globus_list_t *                     topic_list = NULL;
562 11     globus_list_t *                     tmp;
563     globus_i_notification_producer_topic_t *
564 11                                         topic;
565     globus_i_notification_subscription_t *
566 11                                         subscription;
567     globus_i_notification_subscription_entry_t *
568 11                                         subscription_entry;
569 11     wsa_EndpointReferenceType *         tmp_epr;
570 11     int                                 rc;
571 11     int                                 i;
572
573     /* Make sure all parameters make sense */
574 11     if (resource_id == NULL || Subscribe == NULL || SubscribeResponse == NULL)
575     {
576 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
577
578 0         goto out;
579     }
580 11     if (strcmp(*Subscribe->TopicExpression._Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
581         strcmp(*Subscribe->TopicExpression._Dialect, CONCRETE_TOPIC_EXPRESSION) != 0 &&
582         strcmp(*Subscribe->TopicExpression._Dialect, FULL_TOPIC_EXPRESSION) != 0)
583     {
584 1         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_DIALECT();
585
586 1         goto out;
587     }
588
589 10     GlobusNotificationProducerGlobalLock();
590
591 10     producer = globus_hashtable_lookup(
592             &globus_i_notification_producers,
593             (void *) resource_id);
594
595 10     if (producer == NULL)
596     {
597 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
598
599 0         GlobusNotificationProducerGlobalUnlock();
600
601 0         goto out;
602     }
603
604 10     GlobusNotificationProducerLock(producer);
605 10     GlobusNotificationProducerGlobalUnlock();
606
607 10     result = globus_i_notification_topic_set_lookup(
608             producer->topic_set,
609             &Subscribe->TopicExpression,
610             &topic_list);
611
612 10     if (topic_list == NULL)
613     {
614 1         if (result == GLOBUS_SUCCESS)
615         {
616 0             result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_TOPIC();
617         }
618 0         goto unlock_out;
619     }
620
621     /* Internal data containing info about this subscription */
622 9     subscription = malloc(
623             sizeof(globus_i_notification_subscription_t));
624 9     if (subscription == NULL)
625     {
626 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
627
628 0         goto free_topic_list_out;
629     }
630
631 9     result = wsa_EndpointReferenceType_copy(
632             &subscription->consumer_epr,
633             &Subscribe->ConsumerReference);
634 9     if (result != GLOBUS_SUCCESS)
635     {
636 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
637
638 0         goto free_subscription_out;
639     }
640 9     subscription->paused = GLOBUS_FALSE;
641
642 9     result = wsa_EndpointReferenceType_init(
643             &SubscribeResponse->SubscriptionReference);
644
645 9     if (result != GLOBUS_SUCCESS)
646     {
647 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
648
649 0         goto free_subscription_consumer_out;
650     }
651
652     /* Create resource associated with this subscription */
653 9     result = globus_l_notification_subscription_create(
654             producer,
655             engine,
656             Subscribe,
657             SubscribeResponse->SubscriptionReference,
658             &subscription_resource);
659
660 9     if (result != GLOBUS_SUCCESS)
661     {
662 0         goto free_subscription_epr_out;
663     }
664
665 9     result = globus_resource_set_resource_specific(
666             subscription_resource,
667             subscription,
668             globus_l_notification_subscription_free);
669 9     if (result != GLOBUS_SUCCESS)
670     {
671 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
672
673 0         goto destroy_subscription_resource_out;
674     }
675
676 9     subscription_entry = malloc(sizeof(globus_i_notification_subscription_entry_t));
677 9     if (subscription_entry == NULL)
678     {
679 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
680 0         goto destroy_subscription_resource_out;
681     }
682 9     subscription_entry->producer = producer;
683 9     result = wsa_EndpointReferenceType_copy(
684             &subscription_entry->epr,
685             SubscribeResponse->SubscriptionReference);
686
687 9     if (result != GLOBUS_SUCCESS)
688     {
689 0         goto free_subscription_entry_out;
690     }
691 9     subscription->topics = NULL;
692
693     /* Add mapping of subscription to topics and vice versa */
694 9     tmp = topic_list;
695 18     while (!globus_list_empty(tmp))
696     {
697 9         topic = globus_list_first(tmp);
698
699         /* Insert topic pointer into subscription's state */
700 9         globus_list_insert(&subscription->topics, topic);
701
702         /* Add reference to subscription's EPR to the topic */
703 9         tmp_epr = wsa_EndpointReferenceType_array_push(&topic->subscriptions);
704 9         if (tmp_epr == NULL)
705         {
706 0             result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
707
708 0             goto remove_from_topic_list_out;
709         }
710         /* Make a copy of the EPR for usage within this API. The SubscribeResponse
711          * will be freed by the service engine after it is sent as part of the
712          * SOAP response.
713          */
714 9         result = wsa_EndpointReferenceType_copy_contents(
715                 tmp_epr,
716                 SubscribeResponse->SubscriptionReference);
717 9         if (result != GLOBUS_SUCCESS)
718         {
719 0             result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
720
721 0             goto remove_from_topic_list_out;
722         }
723 9         tmp = globus_list_rest(tmp);
724     }
725 9     GlobusNotificationProducerUnlock(producer);
726
727 9     GlobusNotificationProducerGlobalLock();
728
729     /* Add entry in the map of subscription EPRs to producer structures */
730 9     rc = globus_hashtable_insert(
731             &globus_i_notification_subscriptions,
732             subscription_entry->epr,
733             subscription_entry);
734 9     GlobusNotificationProducerGlobalUnlock();
735
736 9     if (rc != GLOBUS_SUCCESS)
737     {
738 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
739
740 0         goto remove_from_topic_list_out;
741     }
742 9     if (topic_list)
743     {
744 9         globus_list_free(topic_list);
745     }
746
747 9     globus_resource_finish(subscription_resource);
748
749 9     return GLOBUS_SUCCESS;
750
751 remove_from_topic_list_out:
752 0     for (i = 0; i < topic->subscriptions.length; i++)
753     {
754 0         if (globus_l_notification_epr_keyequal(
755                     &topic->subscriptions.elements[i],
756                     subscription_entry->epr))
757         {
758 0             wsa_EndpointReferenceType_destroy_contents(&topic->subscriptions.elements[i]);
759 0             memmove(&topic->subscriptions.elements[i],
760                     &topic->subscriptions.elements[i+1],
761                     sizeof(wsa_EndpointReferenceType) *
762                     (topic->subscriptions.length - (i+1)));
763 0             topic->subscriptions.length--;
764 0             break;
765         }
766     }
767 0     globus_list_free(subscription->topics);
768 0     wsa_EndpointReferenceType_destroy(subscription_entry->epr);
769 free_subscription_entry_out:
770 0     free(subscription_entry);
771 destroy_subscription_resource_out:
772 0     globus_resource_destroy(subscription_resource);
773 free_subscription_epr_out:
774 0     wsa_EndpointReferenceType_destroy(
775             SubscribeResponse->SubscriptionReference);
776 free_subscription_consumer_out:
777 0     wsa_EndpointReferenceType_destroy(subscription->consumer_epr);
778 free_subscription_out:
779 0     free(subscription);
780 free_topic_list_out:
781 0     globus_list_free(topic_list);
782 unlock_out:
783 1     GlobusNotificationProducerUnlock(producer);
784 out:
785 2     return result;
786 }
787 /* globus_notification_producer_subscribe() */
788
789 /**
790  * Destroy a subscription resource.
791  * @ingroup subscription
792  *
793  * Destroys the SubscriptionManager resource at the named location, as well
794  * as freeing all memory associated with the subscription. Subscriptions
795  * which are destroyed will not have any additional notification sent to them,
796  * though an in-progress notification may arrive after the subscription is
797  * destroyed.
798  *
799  * When the topic or notification producer state which was used to create the
800  * subscription is destroyed, the subscription will automatically be destroyed.
801  *
802  * @param subscription_epr
803  *     Pointer to the endpoint reference to the SubscriptionManager resource.
804  *
805  * @retval GLOBUS_SUCCESS
806  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
807  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
808  */
809 globus_result_t
810 globus_notification_producer_destroy_subscription(
811     const wsa_EndpointReferenceType *   subscription_epr)
812 0 {
813 0     globus_result_t                     result;
814 0     globus_notification_producer_t      producer;
815 0     globus_resource_t                   resource;
816     globus_i_notification_producer_topic_t *
817 0                                         topic;
818     globus_i_notification_subscription_t *
819 0                                         subscription;
820     globus_i_notification_subscription_entry_t *
821 0                                         subscription_entry;
822 0     int                                 i;
823 0     globus_list_t *                     tmp;
824
825 0     result = globus_l_notification_producer_find_subscription(
826             subscription_epr,
827             &producer,
828             &resource);
829
830 0     if (result != GLOBUS_SUCCESS)
831     {
832 0         goto out;
833     }
834
835 0     subscription_entry = globus_hashtable_remove(
836             &globus_i_notification_subscriptions,
837             (void *) subscription_epr);
838
839 0     wsa_EndpointReferenceType_destroy(subscription_entry->epr);
840 0     free(subscription_entry);
841
842 0     result = globus_resource_get_resource_specific(
843             resource,
844             (void **) &subscription);
845
846 0     if (result != GLOBUS_SUCCESS || subscription == NULL)
847     {
848         /* Should NEVER happen */
849 0         globus_assert(result == GLOBUS_SUCCESS);
850 0         globus_assert(subscription != GLOBUS_NULL);
851
852         /* We fudge a little bit here: the destroy is essentially done at
853          * this point, but for some things which shouldn't happen. From the
854          * application's point of view, the destruction was successful unless 
855          * this is built with debug :)
856          */
857 0         result = GLOBUS_SUCCESS;
858
859 0         goto unlock_out;
860     }
861
862     /* Make sure the subscription is removed from all topics it was associated with */
863 0     tmp = subscription->topics;
864 0     while (tmp != NULL)
865     {
866 0         topic = globus_list_first(tmp);
867 0         for (i = 0; i < topic->subscriptions.length; i++)
868         {
869 0             if (globus_l_notification_epr_keyequal(
870                         &topic->subscriptions.elements[i],
871                         (void*) subscription_epr))
872             {
873 0                 wsa_EndpointReferenceType_destroy_contents(
874                         &topic->subscriptions.elements[i]);
875 0                 memmove(&topic->subscriptions.elements[i],
876                         &topic->subscriptions.elements[i+1],
877                         sizeof(wsa_EndpointReferenceType) *
878                         (topic->subscriptions.length - (i+1)));
879 0                 topic->subscriptions.length--;
880             }
881         }
882 0         tmp = globus_list_rest(tmp);
883     }
884
885 unlock_out:
886 0     globus_resource_destroy(resource);
887 0     GlobusNotificationProducerUnlock(producer);
888 out:
889 0     return result;
890 }
891 /* globus_notification_producer_destroy_subscription() */
892
893 /**
894  * Get Current Message
895  * @ingroup notification_topics
896  *
897  * Returns the last message change sent via a call to
898  * globus_notification_producer_topic_changed() for a particular topic
899  * associated with a producer resource.
900  *
901  * @param resource_id
902  *     Resource id associated with notification producer state.
903  * @param topic_expression
904  *     An expression indicating the name (or path) of this topic. Currently
905  *     only simple topic expressions are supported
906  *     (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
907  * @param any
908  *     A pointer to an xsd:any which will be populated with a copy of the last
909  *     message sent for the given @a topic_expression. If no changes have been
910  *     sent for this topic, then the returned @a any will not contain any data.
911  *
912  * @retval GLOBUS_SUCCESS
913  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
914  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
915  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
916  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
917  */
918 globus_result_t
919 globus_notification_producer_get_current_message(
920     const char *                        resource_id,
921     const wsnt_TopicExpressionType *    topic_expression,
922     xsd_any *                           any)
923 10 {
924 10     globus_result_t                     result = GLOBUS_SUCCESS;
925 10     globus_notification_producer_t      producer;
926 10     globus_list_t *                     topic_list = NULL;
927 10     globus_list_t *                     tmp_list;
928     globus_i_notification_producer_topic_t *
929 10                                         topic;
930 10     xsd_any_array *                     aa;
931 10     xsd_any *                           new_any;
932
933 10     if (any == NULL || resource_id == NULL || topic_expression == NULL)
934     {
935 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
936         
937 0         goto out;
938     }
939 10     result = xsd_any_init_contents(any);
940
941 10     if (strcmp(*topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
942         strcmp(*topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0 &&
943         strcmp(*topic_expression->_Dialect, FULL_TOPIC_EXPRESSION) != 0)
944     {
945 1         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_DIALECT();
946
947 1         goto out;
948     }
949
950 9     GlobusNotificationProducerGlobalLock();
951
952 9     producer = globus_hashtable_lookup(
953             &globus_i_notification_producers,
954             (void *) resource_id);
955
956 9     if (producer == NULL)
957     {
958 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
959
960 0         GlobusNotificationProducerGlobalUnlock();
961
962 0         goto out;
963     }
964
965 9     GlobusNotificationProducerLock(producer);
966 9     GlobusNotificationProducerGlobalUnlock();
967
968 9     result = globus_i_notification_topic_set_lookup(
969             producer->topic_set,
970             topic_expression,
971             &topic_list);
972
973 9     if (topic_list == NULL)
974     {
975 1         goto unlock_out;
976     }
977
978 8     if (result != GLOBUS_SUCCESS)
979     {
980 0         goto unlock_out;
981     }
982
983 8     if (globus_list_size(topic_list) > 1)
984     {
985 0         result = xsd_any_array_init(&aa);
986 0         any->value = aa;
987 0         any->any_info = &xsd_any_array_info;
988
989 0         tmp_list = topic_list;
990 0         while (!globus_list_empty(tmp_list))
991         {
992 0             topic = globus_list_first(tmp_list);
993 0             if (topic->current_message != NULL)
994             {
995 0                 new_any = xsd_any_array_push(aa);
996 0                 if (new_any == NULL)
997                 {
998 0                     result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
999 0                     goto destroy_any_out;
1000                 }
1001 0                 new_any->any_info = topic->current_message->any_info;
1002 0                 result = new_any->any_info->copy(
1003                         (void **) &new_any->value,
1004                         topic->current_message->value);
1005 0                 if (result != GLOBUS_SUCCESS)
1006                 {
1007 0                     result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1008
1009 0                     goto destroy_any_out;
1010                 }
1011             }
1012 0             tmp_list = globus_list_rest(tmp_list);
1013         }
1014     }
1015     else
1016     {
1017 8         topic = globus_list_first(topic_list);
1018 8         if (topic->current_message != NULL)
1019         {
1020 7             any->any_info = topic->current_message->any_info;
1021 7             result = any->any_info->copy(
1022                     (void **) &any->value,
1023                     topic->current_message->value);
1024 7             if (result != GLOBUS_SUCCESS)
1025             {
1026 0                 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1027
1028 0                 goto destroy_any_out;
1029             }
1030         }
1031     }
1032
1033 8     globus_list_free(topic_list);
1034
1035 destroy_any_out:
1036 8     if (result != GLOBUS_SUCCESS)
1037     {
1038 0         xsd_any_destroy(any);
1039     }
1040 unlock_out:
1041 9     GlobusNotificationProducerUnlock(producer);
1042 out:
1043 10     return result;
1044 }
1045 /* globus_notification_producer_get_current_message() */
1046
1047 /**
1048  * Send Topic Change Notifications
1049  * @ingroup notification_topics
1050  *
1051  * For each subscription to the given topic_expression for the given resource,
1052  * the Notify operation will be invoked on the ConsumerReference associated
1053  * with the subscription. When all notifications have been sent, the @a
1054  * notification_done callback will be invoked locally.
1055  *
1056  * The @a message will be copied by the library when the notification begins,
1057  * so the caller may free the value immediately upon returning from this
1058  * function.
1059  *
1060  * @param resource_id
1061  *     Resource id associated with notification producer state.
1062  * @param topic_expression
1063  *     An expression indicating the name (or path) of this topic. Currently
1064  *     only simple topic expressions are supported
1065  *     (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
1066  * @param message
1067  *     Contents of the notification message.
1068  * @param notification_done
1069  *     Optional done callback. If this is non-NULL, then the callback will
1070  *     be called when all of the notification operations have completed, for
1071  *     implementations which would like to enforce sequencing notifications.
1072  * @param callback_arg
1073  *     Optional parameter to the @a notification_done callback.
1074  *
1075  * @retval GLOBUS_SUCCESS
1076  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1077  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1078  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
1079  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
1080  */
1081 globus_result_t
1082 globus_notification_producer_topic_changed(
1083     const char *                        resource_id,
1084     const wsnt_TopicExpressionType *    topic_expression,
1085     const xsd_anyType *                 message,
1086     globus_notification_done_callback_t notification_done,
1087     void *                              callback_arg)
1088 408 {
1089 408     globus_result_t                     result;
1090 408     globus_notification_producer_t      producer;
1091 408     globus_list_t *                     topic_list;
1092     globus_i_notification_producer_topic_t *
1093 408                                         topic;
1094     globus_i_notification_callback_info_t *
1095 408                                         callback_info;
1096 408     int                                 i;
1097     NotificationConsumerService_client_handle_t 
1098 408                                         handle;
1099 408     wsa_EndpointReferenceType *         subscription_epr;
1100 408     globus_resource_t                   resource;
1101     globus_i_notification_subscription_t *
1102 408                                         subscription;
1103 408     wsnt_NotificationMessageHolderType *notify_message;
1104 408     globus_bool_t                       callback_registered = GLOBUS_FALSE;
1105
1106 408     if (resource_id == NULL || topic_expression == NULL || message == NULL)
1107     {
1108 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1109
1110 0         goto out;
1111     }
1112
1113 408     if (strcmp(*topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
1114         strcmp(*topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
1115     {
1116 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_DIALECT();
1117
1118 0         goto out;
1119     }
1120
1121 408     GlobusNotificationProducerGlobalLock();
1122 408     producer = globus_hashtable_lookup(
1123             &globus_i_notification_producers,
1124             (void *) resource_id);
1125
1126 408     if (producer == NULL)
1127     {
1128 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1129
1130 0         goto unlock_notification_out;
1131     }
1132
1133 408     GlobusNotificationProducerLock(producer);
1134
1135 408     result = globus_i_notification_topic_set_lookup(
1136             producer->topic_set,
1137             topic_expression,
1138             &topic_list);
1139
1140 408     if (topic_list == NULL)
1141     {
1142 0         goto unlock_out;
1143     }
1144 408     topic = globus_list_first(topic_list);
1145 408     globus_list_free(topic_list);
1146
1147 408     xsd_anyType_destroy(topic->current_message);
1148 408     topic->current_message = NULL;
1149
1150     /* non-fatal if this fails, so we'll ignore result */
1151 408     (void) xsd_anyType_copy(&topic->current_message, message);
1152
1153
1154 408     callback_info = malloc(
1155             sizeof(globus_i_notification_callback_info_t));
1156 408     if (callback_info == NULL)
1157     {
1158 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1159
1160 0         goto unlock_out;
1161     }
1162 408     callback_info->callback = notification_done;
1163 408     callback_info->callback_arg = callback_arg;
1164 408     callback_info->topic_expression = topic_expression;
1165 408     callback_info->notification_count = 0;
1166 408     result = wsnt_NotifyType_init(&callback_info->Notify);
1167
1168 408     if (result != GLOBUS_SUCCESS)
1169     {
1170 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1171
1172 0         goto free_callback_info_out;
1173     }
1174
1175 408     notify_message = wsnt_NotificationMessageHolderType_array_push(
1176             &callback_info->Notify->NotificationMessage);
1177
1178 408     if (notify_message == NULL)
1179     {
1180 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1181
1182 0         goto free_notify_out;
1183     }
1184
1185 408     result = wsnt_TopicExpressionType_copy_contents(
1186             &notify_message->Topic,
1187             topic_expression);
1188 408     if (result != GLOBUS_SUCCESS)
1189     {
1190 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1191
1192 0         goto free_notify_out;
1193     }
1194 408     notify_message->ProducerReference = NULL;
1195
1196 408     result = xsd_anyType_copy_contents(
1197         &notify_message->Message,
1198         message);
1199 408     if (result != GLOBUS_SUCCESS)
1200     {
1201 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1202
1203 0         goto free_notify_out;
1204     }
1205
1206 408     globus_mutex_init(&callback_info->lock, NULL);
1207 408     globus_mutex_lock(&callback_info->lock);
1208
1209 408     if (topic->subscriptions.length == 0)
1210     {
1211         /* no subscriptions at all, if we have a callback to call, we'll
1212          * register a oneshot to handle it, otherwise clean up the callback
1213          * info.
1214          */
1215 389         if (notification_done != NULL)
1216         {
1217 0             result = globus_callback_register_oneshot(
1218                     NULL,
1219                     &globus_i_reltime_zero,
1220                     globus_l_notification_callback_kickout,
1221                     callback_info);
1222 0             if (result == GLOBUS_SUCCESS)
1223             {
1224 0                 callback_registered = GLOBUS_TRUE;
1225             }
1226             else
1227             {
1228 0                 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1229             }
1230         }
1231 0         goto unlock_callback_info_out;
1232     }
1233
1234 39     for (i = 0; i < topic->subscriptions.length; i++)
1235     {
1236 20         subscription_epr = &topic->subscriptions.elements[i];
1237
1238 20         result = globus_wsrf_core_get_resource_from_epr(
1239                 subscription_epr,
1240                 &resource);
1241
1242 20         if (result != GLOBUS_SUCCESS)
1243         {
1244             /* skip dead subscription for this topic */
1245 0             continue;
1246         }
1247
1248 20         result = globus_resource_get_resource_specific(
1249                 resource,
1250                 (void **) &subscription);
1251
1252 20         if (result != GLOBUS_SUCCESS || subscription == NULL)
1253         {
1254             /* skip broken subscription for this topic */
1255 0             globus_resource_finish(resource);
1256
1257 0             continue;
1258         }
1259
1260 20         if (subscription->paused)
1261         {
1262             /* skip paused subscription */
1263 0             globus_resource_finish(resource);
1264
1265 0             continue;
1266         }
1267
1268 20         result = NotificationConsumerService_client_init(
1269                 &handle,
1270                 NULL,
1271                 NULL);
1272
1273 20         if (result != GLOBUS_SUCCESS)
1274         {
1275 0             globus_resource_finish(resource);
1276
1277 0             result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1278
1279 0             goto unlock_callback_info_out;
1280         }
1281
1282 20         result = Consumer_Notify_epr_register_request(
1283                 handle,
1284                 subscription->consumer_epr,
1285                 callback_info->Notify,
1286                 globus_l_notify_done,
1287                 callback_info);
1288 20         globus_resource_finish(resource);
1289 20         if (result == GLOBUS_SUCCESS)
1290         {
1291 20             callback_info->notification_count++;
1292 20             callback_registered = GLOBUS_TRUE;
1293         }
1294         else
1295         {
1296 0             result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1297
1298 0             goto unlock_callback_info_out;
1299         }
1300     }
1301
1302 19     if (producer->callback)
1303     {
1304 19         producer->callback(
1305                 producer->callback_arg,
1306                 producer->resource_id,
1307                 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CHANGED,
1308                 topic_expression);
1309     }
1310
1311 unlock_callback_info_out:
1312 408     globus_mutex_unlock(&callback_info->lock);
1313 free_notify_out:
1314 408     if (!callback_registered)
1315     {
1316 389         wsnt_NotifyType_destroy(callback_info->Notify);
1317 free_callback_info_out:
1318 389         free(callback_info);
1319     }
1320 unlock_out:
1321 408     GlobusNotificationProducerUnlock(producer);
1322 unlock_notification_out:
1323 408     GlobusNotificationProducerGlobalUnlock();
1324 out:
1325 408     return result;
1326 }
1327 /* globus_notification_producer_topic_changed() */
1328
1329
1330 /**
1331  * Pause a Subscription
1332  * @ingroup subscription
1333  * Disable sending any notification messages to the consumer associated with
1334  * a subscription resource until the subscription is resumed by calling
1335  * globus_notification_subscription_resume().
1336  *
1337  * @param subscription_manager
1338  *     Pointer to the endpoint reference to the SubscriptionManager resource.
1339  * 
1340  * @retval GLOBUS_SUCCESS
1341  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1342  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1343  */
1344 globus_result_t
1345 globus_notification_subscription_pause(
1346     const wsa_EndpointReferenceType *   subscription_manager)
1347 0 {
1348 0     globus_result_t                     result;
1349 0     globus_notification_producer_t      producer;
1350 0     globus_resource_t                   resource;
1351     globus_i_notification_subscription_t *
1352 0                                         subscription;
1353
1354 0     if (subscription_manager == NULL)
1355     {
1356 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1357
1358 0         goto out;
1359     }
1360 0     result = globus_l_notification_producer_find_subscription(
1361             subscription_manager,
1362             &producer,
1363             &resource);
1364
1365 0     if (result != GLOBUS_SUCCESS)
1366     {
1367 0         goto out;
1368     }
1369
1370 0     result = globus_resource_get_resource_specific(
1371             resource,
1372             (void **) &subscription);
1373
1374 0     if (result != GLOBUS_SUCCESS)
1375     {
1376 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1377
1378 0         goto unlock_out;
1379     }
1380
1381 0     subscription->paused = GLOBUS_TRUE;
1382
1383 unlock_out:
1384 0     globus_resource_finish(resource);
1385 0     GlobusNotificationProducerUnlock(producer);
1386 out:
1387 0     return result;
1388 }
1389 /* globus_notification_subscription_pause() */
1390
1391 /**
1392  * Resume a Subscription
1393  * @ingroup subscription
1394  * Reenable sending notification messages to the consumer associated with
1395  * a subscription resource which was paused by by calling
1396  * globus_notification_subscription_pause().
1397  *
1398  * @param subscription_manager
1399  *     Pointer to the endpoint reference to the SubscriptionManager resource.
1400  * 
1401  * @retval GLOBUS_SUCCESS
1402  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1403  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1404  */
1405 globus_result_t
1406 globus_notification_subscription_resume(
1407     const wsa_EndpointReferenceType *   subscription_manager)
1408 0 {
1409 0     globus_result_t                     result;
1410 0     globus_notification_producer_t      producer;
1411 0     globus_resource_t                   resource;
1412     globus_i_notification_subscription_t *
1413 0                                         subscription;
1414
1415 0     if (subscription_manager == NULL)
1416     {
1417 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1418
1419 0         goto out;
1420     }
1421 0     result = globus_l_notification_producer_find_subscription(
1422             subscription_manager,
1423             &producer,
1424             &resource);
1425
1426 0     if (result != GLOBUS_SUCCESS)
1427     {
1428 0         goto out;
1429     }
1430
1431 0     result = globus_resource_get_resource_specific(
1432             resource,
1433             (void **) &subscription);
1434
1435 0     if (result != GLOBUS_SUCCESS)
1436     {
1437 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1438
1439 0         goto unlock_out;
1440     }
1441
1442 0     subscription->paused = GLOBUS_FALSE;
1443
1444 unlock_out:
1445 0     globus_resource_finish(resource);
1446 0     GlobusNotificationProducerUnlock(producer);
1447 out:
1448 0     return result;
1449 }
1450 /* globus_notification_subscription_resume() */
1451
1452 /**
1453  * Initialize a topic expression
1454  * @ingroup notification_topics
1455  * Initializes a topic expression to use the SimpleTopicExpression dialect
1456  * with the QName as the topic expression. The topic_expression can then
1457  * be used for creating topics or subscriptions.
1458  *
1459  * @param topic_expression
1460  *     Pointer to a @a wsnt_TopicExpressionType which will have its
1461  *     contents initialized to the value of @a expression.
1462  * @param expression
1463  *     QName describing the topic.
1464  */
1465 globus_result_t
1466 globus_notification_producer_set_simple_topic_expression_contents(
1467     wsnt_TopicExpressionType *          topic_expression,
1468     const xsd_QName *                   expression)
1469 50 {
1470 50     xsd_string                          tmp;            
1471 50     globus_result_t                     result;
1472
1473 50     result = wsnt_TopicExpressionType_init_contents(topic_expression);
1474 50     if (result != GLOBUS_SUCCESS)
1475     {
1476 0         goto out;
1477     }
1478 50     tmp = globus_libc_strdup(SIMPLE_TOPIC_EXPRESSION);
1479
1480 50     if (tmp == NULL)
1481     {
1482 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1483
1484 0         goto destroy_topic_expression_contents_out;
1485     }
1486 50     result = xsd_anyURI_init_cstr(
1487             &topic_expression->_Dialect,
1488             tmp);
1489 50     if (result != GLOBUS_SUCCESS)
1490     {
1491 0         goto free_tmp_out;
1492     }
1493 50     tmp = NULL;
1494 50     result = xsd_any_init(&topic_expression->any);
1495 50     if (result != GLOBUS_SUCCESS)
1496     {
1497 0         goto destroy_topic_expression_contents_out;
1498     }
1499 50     topic_expression->any->any_info = &xsd_QName_contents_info;
1500 50     result = xsd_QName_copy(
1501             (xsd_QName **) &topic_expression->any->value,
1502             expression);
1503
1504 50     if (result != GLOBUS_SUCCESS)
1505     {
1506 0         goto destroy_topic_expression_contents_out;
1507
1508 destroy_topic_expression_contents_out:
1509 0         wsnt_TopicExpressionType_destroy_contents(topic_expression);
1510     }
1511 free_tmp_out:
1512 50     if (tmp != NULL)
1513     {
1514 0         free(tmp);
1515     }
1516 out:
1517 50     return result;
1518 }
1519 /* globus_notification_producer_set_simple_topic_expression_contents() */
1520
1521 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
1522 static
1523 int
1524 globus_l_notification_producer_activate(void)
1525 73 {
1526 73     int                                 rc;
1527
1528 73     rc = globus_module_activate(GLOBUS_COMMON_MODULE);
1529
1530 73     if (rc != GLOBUS_SUCCESS)
1531     {
1532 0         goto error_out;
1533     }
1534 73     GlobusDebugInit(GLOBUS_NOTIFICATION_PRODUCER, DEBUG INFO TRACE WARN ERROR);
1535
1536 73     rc = globus_module_activate(GLOBUS_WSRF_RESOURCE_MODULE);
1537 73     if (rc != GLOBUS_SUCCESS)
1538     {
1539 0         goto deactivate_common_out;
1540     }
1541
1542 73     globus_mutex_init(&globus_i_notification_producer_lock, NULL);
1543
1544 73     rc = globus_hashtable_init(
1545             &globus_i_notification_subscriptions,
1546             15,
1547             globus_l_notification_epr_hash,
1548             globus_l_notification_epr_keyequal);
1549
1550 73     if (rc != GLOBUS_SUCCESS)
1551     {
1552 0         goto deactivate_wsrf_resource_out;
1553     }
1554
1555 73     rc = globus_hashtable_init(
1556             &globus_i_notification_producers,
1557             15,
1558             globus_hashtable_string_hash,
1559             globus_hashtable_string_keyeq);
1560
1561 73     if (rc != GLOBUS_SUCCESS)
1562     {
1563 0         goto destroy_subscriptions_out;
1564
1565     }
1566
1567 73     globus_l_notification_SubscriptionManager_handle = NULL;
1568 73     globus_l_notification_SubscriptionManager_service_descriptor = NULL;
1569
1570 73     return GLOBUS_SUCCESS;
1571 destroy_subscriptions_out:
1572 0     globus_hashtable_destroy(&globus_i_notification_subscriptions);
1573 deactivate_wsrf_resource_out:
1574 0     globus_module_deactivate(GLOBUS_WSRF_RESOURCE_MODULE);
1575 deactivate_common_out:
1576 0     GlobusDebugDestroy(GLOBUS_NOTIFICATION_PRODUCER);
1577
1578 0     globus_module_deactivate(GLOBUS_COMMON_MODULE);
1579 error_out:
1580 0     return rc;
1581 }
1582 /* globus_l_notification_producer_activate() */
1583
1584 static
1585 int
1586 globus_l_notification_producer_deactivate(void)
1587 0 {
1588 0     globus_hashtable_destroy(&globus_i_notification_subscriptions);
1589 0     globus_hashtable_destroy(&globus_i_notification_producers);
1590 0     if (globus_l_notification_SubscriptionManager_handle != NULL)
1591     {
1592 0         globus_extension_release(
1593                 globus_l_notification_SubscriptionManager_handle);
1594 0         globus_l_notification_SubscriptionManager_handle = NULL;
1595 0         globus_l_notification_SubscriptionManager_service_descriptor = NULL;
1596 0         globus_extension_deactivate(SUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
1597 0         globus_l_subcription_manager_activate_once = GLOBUS_THREAD_ONCE_INIT;
1598     }
1599 0     globus_mutex_destroy(&globus_i_notification_producer_lock);
1600 0     globus_module_activate(GLOBUS_WSRF_RESOURCE_MODULE);
1601
1602 0     GlobusDebugDestroy(GLOBUS_NOTIFICATION_PRODUCER);
1603
1604 0     globus_module_activate(GLOBUS_COMMON_MODULE);
1605
1606 0     return 0;
1607 }
1608 /* globus_l_notification_producer_deactivate() */
1609
1610
1611 static
1612 void
1613 globus_l_notification_activate_subscription_manager(void)
1614 7 {
1615 7     int                                 rc;
1616 7     char *                              service_path;
1617
1618 7     service_path = GLOBUS_SERVICE_ENGINE_MODULE_PATH_PREFIX "/"
1619             SUBSCRIPTIONMANAGERSERVICE_BASE_PATH;
1620
1621 7     rc = globus_extension_activate(service_path);
1622
1623 7     if (rc != GLOBUS_SUCCESS)
1624     {
1625 0         goto out;
1626     }
1627
1628 7     globus_l_notification_SubscriptionManager_service_descriptor =
1629         globus_extension_lookup(
1630                 &globus_l_notification_SubscriptionManager_handle,
1631                 GLOBUS_SERVICE_REGISTRY,
1632                 service_path);
1633 7     if (globus_l_notification_SubscriptionManager_service_descriptor == NULL)
1634     {
1635 0         globus_extension_deactivate(SUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
1636     }
1637 out:
1638 7     return;
1639 }
1640 /* globus_l_notification_activate_subscription_manager() */
1641
1642 static
1643 int
1644 globus_l_notification_epr_hash(
1645     void *                              key,
1646     int                                 limit)
1647 9 {
1648 9     globus_result_t                     result;
1649 9     char *                              reference_properties;
1650 9     int                                 rc = 0;
1651
1652 9     if (key == NULL)
1653     {
1654 0         goto out;
1655     }
1656
1657 9     result = globus_l_notification_epr_get_reference_properties(
1658             key,
1659             &reference_properties);
1660 9     if (result != GLOBUS_SUCCESS)
1661     {
1662 0         goto out;
1663     }
1664
1665 9     rc = globus_hashtable_string_hash(reference_properties, limit);
1666
1667 9     free(reference_properties);
1668 out:
1669 9     return rc;
1670 }
1671 /* globus_l_notification_epr_hash() */
1672
1673 static
1674 int
1675 globus_l_notification_epr_keyequal(
1676     void *                              epr1,
1677     void *                              epr2)
1678 2 {
1679 2     globus_result_t                     result;
1680 2     char *                              reference_properties_1;
1681 2     char *                              reference_properties_2;
1682 2     int                                 rc = 0;
1683
1684 2     if (epr1 == epr2)
1685     {
1686 0         rc = 1;
1687
1688 0         goto out;
1689     }
1690
1691 2     result = globus_l_notification_epr_get_reference_properties(
1692             epr1,
1693             &reference_properties_1);
1694 2     if (result != GLOBUS_SUCCESS)
1695     {
1696 0         goto out;
1697     }
1698
1699 2     result = globus_l_notification_epr_get_reference_properties(
1700             epr2,
1701             &reference_properties_2);
1702 2     if (result != GLOBUS_SUCCESS)
1703     {
1704 0         goto free_reference_properties_1;
1705     }
1706
1707 2     rc = (strcmp(reference_properties_1, reference_properties_2) == 0);
1708
1709 2     free(reference_properties_2);
1710 free_reference_properties_1:
1711 2     free(reference_properties_1);
1712 out:
1713 2     return rc;
1714 }
1715 /* globus_l_notification_epr_keyequal() */
1716
1717 static
1718 globus_result_t
1719 globus_l_notification_epr_get_reference_properties(
1720     const wsa_EndpointReferenceType *       epr,
1721     char **                                 reference_properties)
1722 13 {
1723 13     globus_result_t                         result;
1724 13     xmlURIPtr                               uri = NULL;
1725 13     xmlChar *                               base_path;
1726 13     globus_extension_handle_t               ext;
1727 13     globus_service_descriptor_t *           service;
1728 13     char *                                  tmp_path;
1729
1730 13     *reference_properties = NULL;
1731
1732 13     uri = xmlParseURI(epr->Address.base_value);
1733
1734 13     if (uri == NULL)
1735     {
1736 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1737
1738 0         goto out;
1739     }
1740
1741 13     base_path = (xmlChar*) uri->path;
1742
1743 26     while (*base_path == '/')
1744     {
1745 13         base_path++;
1746     }
1747 13     tmp_path = globus_common_create_string(
1748             GLOBUS_SERVICE_ENGINE_MODULE_PATH_PREFIX "/%s",
1749             base_path);
1750
1751 13     if (tmp_path == NULL)
1752     {
1753 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1754
1755 0         goto free_uri_out;
1756     }
1757 13     service = globus_extension_lookup(
1758             &ext, GLOBUS_SERVICE_REGISTRY, tmp_path);
1759 13     if (service == NULL)
1760     {
1761 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1762
1763 0         goto free_tmp_path_out;
1764     }
1765
1766 13     result = service->get_resource_id(
1767             &epr->ReferenceProperties->any,
1768             reference_properties);
1769
1770 13     if (result != GLOBUS_SUCCESS)
1771     {
1772 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1773
1774         goto release_extension_out;
1775     }
1776
1777 release_extension_out:
1778 13     globus_extension_release(ext);
1779 free_tmp_path_out:
1780 13     free(tmp_path);
1781 free_uri_out:
1782 13     xmlFreeURI(uri);
1783 out:
1784 13     return result;
1785 }
1786 /* globus_l_notification_epr_get_reference_properties() */
1787
1788 /**
1789  * Looks up a subscription EPR.
1790  *
1791  * Finds the data structures associated with a particular EPR, and returns
1792  * locked references to the producer and the SubscriptionManager resource.
1793  * These can then be used to dispatch topic change notifications or to
1794  * destroy the resource.
1795  *
1796  * @param subscription_epr
1797  *     EPR of a SubscriptionManager resource. This is the key to finding
1798  *     the producer and the resource itself.
1799  * @param producer
1800  *     Output parameter which will be updated to point to the producer
1801  *     which created this subscription. The producer's lock will be acquired
1802  *     if this function returns successfully.
1803  * @param subscription
1804  *     Output parameter which will be updated to point to the
1805  *     SubscriptionManager's WSRF resource. This resource reference must
1806  *     be released by calling globus_resource_finish() or
1807  *     globus_resource_destroy().
1808  *
1809  * @retval GLOBUS_SUCCESS
1810  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1811  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_UNKNOWN_RESOURCE
1812  */
1813 static
1814 globus_result_t
1815 globus_l_notification_producer_find_subscription(
1816     const wsa_EndpointReferenceType *   subscription_epr,
1817     globus_i_notification_producer_t ** producer,
1818     globus_resource_t *                 subscription)
1819 0 {
1820 0     globus_result_t                     result = GLOBUS_SUCCESS;
1821     globus_i_notification_subscription_entry_t *
1822 0                                         subscription_entry;
1823
1824 0     if (subscription_epr == NULL || producer == NULL || subscription == NULL)
1825     {
1826 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1827
1828 0         goto out;
1829     }
1830
1831 0     GlobusNotificationProducerGlobalLock();
1832
1833 0     subscription_entry = globus_hashtable_lookup(
1834             &globus_i_notification_subscriptions,
1835             (void *) subscription_epr);
1836
1837 0     if (subscription_entry == NULL)
1838     {
1839 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1840
1841 0         goto unlock_out;
1842     }
1843 0     *producer = subscription_entry->producer;
1844
1845 0     GlobusNotificationProducerLock(*producer);
1846
1847 0     result = globus_wsrf_core_get_resource_from_epr(
1848         (wsa_EndpointReferenceType *) subscription_epr,
1849         subscription);
1850
1851 0     if (result != GLOBUS_SUCCESS)
1852     {
1853 0         result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1854
1855 0         goto unlock_producer_out;
1856     }
1857
1858 0     GlobusNotificationProducerGlobalUnlock();
1859
1860 0     return result;
1861
1862 unlock_producer_out:
1863 0     GlobusNotificationProducerUnlock(*producer);
1864
1865     /* Stale reference to producer---the subscription resource has been
1866      * destroyed
1867      */
1868 0     globus_hashtable_remove(
1869             &globus_i_notification_subscriptions,
1870             (void *) subscription_epr);
1871 0     wsa_EndpointReferenceType_destroy(subscription_entry->epr);
1872 0     free(subscription_entry);
1873 unlock_out:
1874 0     GlobusNotificationProducerGlobalUnlock();
1875
1876 0     *subscription = NULL;
1877 0     *producer = NULL;
1878 out:
1879 0     return result;
1880 }
1881 /* globus_l_notification_producer_find_subscription() */
1882
1883 /**
1884  * Create a new WSRF resource for a subscription.
1885  *
1886  * This function will generate a unique EPR for the new resource and
1887  * return it in the @a epr parameter. It will also modify the @a resource
1888  * parameter to point to a newly allocated (and locked) wsrf resource.
1889
1890  * The EPR's address is derived from the address of the producer which
1891  * is being used to create this subscription. The path of the EPR is
1892  * derived from the path in the service bindings. The reference properties
1893  * are generated using the Globus uuid generator.
1894  * 
1895  * @param producer
1896  *     Producer containing information about the
1897  *     the topics and resources associated with this notification producing
1898  *     service. The lock in this producer must be locked when this function
1899  *     is called.
1900  * @param Subscribe
1901  *     Parameters passed to the Subscribe operation of the NotificationProducer
1902  *     implementation.
1903  * @param epr
1904  *     Pointer to an output parameter which will be modified to contain
1905  *     information for referencing the new resource. The caller is responsible
1906  *     for freeing this.
1907  * @param resource
1908  *     Pointer to an output parameter which will be modified to contain
1909  *     the newly allocated wsrf resource. The caller is responsible for
1910  *     finishing the resource when it is not needed and also for 
1911  *     ultimately destroying it.
1912  *
1913  * @retval GLOBUS_SUCCESS
1914  *     The new EPR was successfully created.
1915  * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
1916  *     Insufficient resources avaiable to create a new EPR.
1917  */
1918 static
1919 globus_result_t
1920 globus_l_notification_subscription_create(
1921     const globus_notification_producer_t
1922                                         producer,
1923     const globus_service_engine_t       engine,
1924     const wsnt_SubscribeType *          Subscribe,
1925     wsa_EndpointReferenceType *         epr,
1926     globus_resource_t *                 resource)
1927 9 {
1928 9     globus_result_t                     result = GLOBUS_SUCCESS;
1929 9     globus_uuid_t                       uuid;
1930 9     xsd_any *                           new_rp;
1931 9     int                                 rc;
1932 9     char *                              contact;
1933
1934 9     if (resource == NULL)
1935     {
1936 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1937
1938 0         goto out;
1939     }
1940
1941 9     *resource = NULL;
1942
1943 9     if (producer == NULL || engine == NULL || Subscribe == NULL || epr == NULL)
1944     {
1945 0         result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1946
1947 0         goto out;
1948     }
1949
1950 9     result = globus_service_engine_get_contact(engine, &contact);
1951
1952 9     if (result != GLOBUS_SUCCESS)
1953     {
1954 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1955
1956 0         goto out;
1957     }
1958     /* Copy the EPR's address, replacing the path part of the EPR's address
1959      * with the path to the SubscriptionManager service
1960      */
1961 9     epr->Address.base_value = globus_libc_realloc(
1962             contact,
1963             strlen(contact) + strlen(SUBSCRIPTIONMANAGERSERVICE_BASE_PATH) + 1);
1964 9     if (epr->Address.base_value == NULL)
1965     {
1966 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1967
1968 0         goto free_contact_out;
1969     }
1970 9     contact = NULL;
1971 9     strcat(epr->Address.base_value, SUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
1972
1973     /* Create reference properties for the resource we'll create */
1974 9     rc = globus_uuid_create(&uuid);
1975
1976 9     if (rc != GLOBUS_SUCCESS)
1977     {
1978 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1979
1980 0         goto destroy_epr_contents_out;
1981     }
1982 9     result = wsa_ReferencePropertiesType_init(
1983             &epr->ReferenceProperties);
1984 9     if (result != GLOBUS_SUCCESS)
1985     {
1986 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1987
1988 0         goto destroy_epr_contents_out;
1989     }
1990
1991 9     new_rp = xsd_any_array_push(&epr->ReferenceProperties->any);
1992 9     if (new_rp == NULL)
1993     {
1994 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1995
1996 0         goto destroy_epr_contents_out;
1997     }
1998
1999 9     new_rp->any_info = &xsd_string_info;
2000
2001 9     xsd_QName_init(&new_rp->element);
2002 9     new_rp->element->Namespace = globus_libc_strdup(
2003             "http://www.globus.org/docs.oasis-open.org/"
2004             "wsn/2004/06/wsn-WS-BaseNotification-1.2-draft-01.xsd");
2005 9     if (new_rp->element->Namespace == NULL)
2006     {
2007 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2008
2009 0         goto destroy_epr_contents_out;
2010     }
2011
2012 9     result = globus_resource_create(uuid.text, resource);
2013 9     if (result != GLOBUS_SUCCESS)
2014     {
2015 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2016
2017 0         goto destroy_epr_contents_out;
2018     }
2019
2020 9     new_rp->element->local = globus_libc_strdup("ResourceID");
2021 9     if (new_rp->element->local == NULL)
2022     {
2023 0         goto destroy_epr_contents_out;
2024     }
2025 9     xsd_string_init((xsd_string **) &new_rp->value);
2026 9     if (new_rp->value == NULL)
2027     {
2028 0         goto destroy_epr_contents_out;
2029     }
2030 9     *(xsd_string *) new_rp->value = globus_libc_strdup(uuid.text);
2031 9     if (*(xsd_string *) new_rp->value == NULL)
2032     {
2033 0         goto destroy_epr_contents_out;
2034     }
2035
2036     /* We'll set the destroy time for the resource here. When we initialize
2037      * the providers' views of the resource, the ScheduledResourceTermination
2038      * provider will pick up this value and seed the appropriate RP
2039      */
2040 9     if (Subscribe->InitialTerminationTime != NULL)
2041     {
2042 0         globus_abstime_t                initial_term_time;
2043         
2044 0         initial_term_time.tv_sec = globus_l_notification_timegm(
2045                 Subscribe->InitialTerminationTime);
2046 0         initial_term_time.tv_nsec = 0L;
2047
2048 0         result = globus_resource_set_destroy_time(
2049                 *resource,
2050                 &initial_term_time);
2051
2052 0         if (result != GLOBUS_SUCCESS)
2053         {
2054 0             result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2055
2056 0             goto destroy_resource_out;
2057         }
2058     }
2059
2060     /* delayed from module activation to avoid recursive activation calls */
2061 9     globus_thread_once(
2062             &globus_l_subcription_manager_activate_once,
2063             globus_l_notification_activate_subscription_manager);
2064
2065 9     if (globus_l_notification_SubscriptionManager_service_descriptor
2066             == NULL)
2067     {
2068 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2069
2070 0         goto destroy_resource_out;
2071     }
2072     /* Initialize resource providers for the SubscriptionManager */
2073 9     result = globus_operation_table_init_resource(
2074             globus_l_notification_SubscriptionManager_service_descriptor->
2075                     operations,
2076             epr);
2077
2078 9     if (result != GLOBUS_SUCCESS)
2079     {
2080 0         result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2081
2082 0         goto destroy_resource_out;
2083     }
2084
2085 9     return result;
2086
2087 destroy_resource_out:
2088 0     globus_resource_destroy(*resource);
2089 0     *resource = NULL;
2090 destroy_epr_contents_out:
2091 0     wsa_EndpointReferenceType_destroy_contents(epr);
2092 free_contact_out:
2093 0     if (contact)
2094     {
2095 0         free(contact);
2096     }
2097 out:
2098 0     return result;
2099 }
2100 /* globus_l_notification_subscription_create() */
2101
2102 static
2103 void
2104 globus_l_notification_subscription_free(
2105     void *                              arg)
2106 1 {
2107     globus_i_notification_subscription_t *
2108 1                                         subscription = arg;
2109
2110 1     wsa_EndpointReferenceType_destroy(subscription->consumer_epr);
2111 1     globus_list_free(subscription->topics);
2112 1     free(subscription);
2113 }
2114 /* globus_l_notification_subscription_free() */
2115
2116 static
2117 void
2118 globus_l_notify_done(
2119     NotificationConsumerService_client_handle_t 
2120                                         handle,
2121     void *                              callback_arg,
2122     globus_result_t                     result)
2123 20 {
2124     globus_i_notification_callback_info_t *
2125 20                                         callback_info = callback_arg;
2126 20     globus_mutex_lock(&callback_info->lock);
2127
2128 20     if (--callback_info->notification_count == 0)
2129     {
2130 19         globus_mutex_unlock(&callback_info->lock);
2131 19         globus_mutex_destroy(&callback_info->lock);
2132
2133 19         if (callback_info->callback)
2134         {
2135 0             callback_info->callback(
2136                     callback_info->callback_arg,
2137                     callback_info->topic_expression,
2138                     callback_info->notification_message);
2139         }
2140
2141 19         wsnt_NotifyType_destroy(callback_info->Notify);
2142
2143 19         free(callback_info);
2144     }
2145 20     NotificationConsumerService_client_destroy(handle);
2146 }
2147 /* globus_l_notify_done() */
2148
2149 static
2150 void
2151 globus_l_notification_callback_kickout(
2152     void *                              user_arg)
2153 0 {
2154     globus_i_notification_callback_info_t *
2155 0                                         callback_info = user_arg;
2156
2157 0     callback_info->callback(
2158             callback_info->callback_arg,
2159             callback_info->topic_expression,
2160             callback_info->notification_message);
2161
2162 0     globus_mutex_destroy(&callback_info->lock);
2163 0     wsnt_NotifyType_destroy(callback_info->Notify);
2164
2165 0     free(callback_info);
2166 }
2167 /* globus_l_notification_callback_kickout() */
2168
2169 /**
2170  * Convert a struct tm to a time_t without accounting for the local timezone
2171  * or modifying the struct tm contents.
2172  *
2173  * This may get a little kooky when the TZ shift occurs near the DST change.
2174  */
2175 static
2176 time_t
2177 globus_l_notification_timegm(
2178     const struct tm *                   tm)
2179 0 {
2180 0     struct tm                           tmp;
2181 0     struct tm                           local_tm;
2182 0     time_t                              local_time;
2183 0     time_t                              shifted_time;
2184
2185 0     memcpy(&tmp, tm, sizeof(struct tm));
2186
2187     /* convert the tm to time_t in the local timezone */
2188 0     local_time = mktime(&tmp);
2189
2190     /* create a new tm with this time (but without the TZ-related parts of the
2191      * tm set) */
2192 0     globus_libc_gmtime_r(&local_time, &local_tm);
2193
2194     /* convert the time to local time again, causing it to be shifted by
2195      * time zone twice
2196      */
2197 0     shifted_time = mktime(&local_tm);
2198
2199     /*
2200      * The difference between shifted_time and local_time is the TZ correction.
2201      */
2202 0     return local_time + (local_time - shifted_time);
2203 }
2204 /* wsrl_l_timegm() */
2205